Source code for qudi.logic.taskrunner

# -*- coding: utf-8 -*-
"""
This file contains the Qudi task runner module.

.. Copyright (c) 2021, the qudi developers. See the AUTHORS.md file at the top-level directory of this
.. distribution and on <https://github.com/Ulm-IQO/qudi-core/>
..
.. This file is part of qudi.
..
.. Qudi is free software: you can redistribute it and/or modify it under the terms of
.. the GNU Lesser General Public License as published by the Free Software Foundation,
.. either version 3 of the License, or (at your option) any later version.
..
.. Qudi is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
.. without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
.. See the GNU Lesser General Public License for more details.
..
.. You should have received a copy of the GNU Lesser General Public License along with qudi.
.. If not, see <https://www.gnu.org/licenses/>.
"""

from functools import partial
from PySide2 import QtCore
from typing import Any, Type, Mapping, List, Dict

from qudi.util.mutex import Mutex
from qudi.core.module import LogicBase
from qudi.core.scripting.moduletask import ModuleTask
from qudi.core.scripting.modulescript import import_module_script
from qudi.core.configoption import ConfigOption


[docs] class TaskRunnerLogic(LogicBase): """This module keeps a collection of available ModuleTask subclasses (defined by config) and respective initialized instances that can be run. Handles module connections to tasks and allows monitoring of task states and results. """ _module_task_configs = ConfigOption( name='module_tasks', default=dict(), missing='warn' ) sigTaskStarted = QtCore.Signal(str) # task name sigTaskStateChanged = QtCore.Signal(str, str) # task name, task state sigTaskFinished = QtCore.Signal( str, object, bool ) # task name, result, success flag _sigStartTask = QtCore.Signal(str, dict) # task name, args, kwargs
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._thread_lock = Mutex() self._running_tasks = dict() self._configured_task_types = dict() self._consecutive_activation = False # Flag indicating consecutive activations
[docs] def on_activate(self) -> None: """Initialise task runner""" self._running_tasks = dict() self._configured_task_types = dict() for name, task_cfg in self._module_task_configs.items(): if name in self._configured_task_types: raise KeyError(f'Duplicate task name "{name}" encountered in config') module, cls = task_cfg['module.Class'].rsplit('.', 1) task = import_module_script( module, cls, reload=self._consecutive_activation ) if not issubclass(task, ModuleTask): raise TypeError('Configured task is not a ModuleTask (sub)class') self._configured_task_types[name] = task self._sigStartTask.connect(self._run_task, QtCore.Qt.QueuedConnection) self._consecutive_activation = True
[docs] def on_deactivate(self) -> None: """Shut down task runner""" self._sigStartTask.disconnect() for task in self._running_tasks.values(): task.interrupt self._configured_task_types = dict()
@property def running_tasks(self) -> List[str]: with self._thread_lock: return list(self._running_tasks) @property def task_states(self) -> Dict[str, str]: with self._thread_lock: states = dict() for task_name in self._configured_task_types: try: states[task_name] = self._running_tasks[task_name].state except KeyError: states[task_name] = 'stopped' return states @property def configured_task_types(self) -> Dict[str, Type[ModuleTask]]: return self._configured_task_types.copy()
[docs] def run_task(self, name: str, arguments: Mapping[str, Any]) -> None: with self._thread_lock: self._sigStartTask.emit(name, dict(arguments))
[docs] def interrupt_task(self, name: str) -> None: with self._thread_lock: task = self._running_tasks.get(name, None) if task is None: raise RuntimeError(f'No ModuleTask with name "{name}" running') task.interrupt()
@QtCore.Slot(str, dict) def _run_task(self, name: str, arguments: Mapping[str, Any]) -> None: with self._thread_lock: task = self.__init_task(name) self.__set_task_arguments(task, arguments) self.__activate_connect_task_modules(name, task) self.__move_task_into_thread(name, task) self.__connect_task_signals(name, task) self.__start_task(name, task) self.sigTaskStarted.emit(name) def _task_finished_callback(self, name: str) -> None: """Called every time a task finishes""" with self._thread_lock: task = self._running_tasks.get(name, None) if task is not None: task.sigFinished.disconnect() task.sigStateChanged.disconnect() task.disconnect_modules() thread_manager = self._qudi_main.thread_manager thread_manager.quit_thread(task.thread()) thread_manager.join_thread(task.thread()) def _thread_finished_callback(self, name: str) -> None: with self._thread_lock: task = self._running_tasks.pop(name, None) if task is not None: self.sigTaskFinished.emit(name, task.result, task.success) def _task_state_changed_callback(self, state: str, name: str) -> None: self.sigTaskStateChanged.emit(name, state) def __init_task(self, name: str) -> ModuleTask: """Create a ModuleTask instance""" try: if name in self._running_tasks: raise RuntimeError(f'ModuleTask "{name}" is already initialized') return self._configured_task_types[name]() except: self.log.exception( f'Exception during initialization of ModuleTask "{name}":' ) raise def __set_task_arguments( self, task: ModuleTask, arguments: Mapping[str, Any] ) -> None: """Set arguments for ModuleTask instance""" try: if not ( isinstance(arguments, Mapping) and all(isinstance(a, str) for a in arguments) ): raise TypeError('ModuleTask kwargs must be mapping with str type keys') task.kwargs = arguments except: self.log.exception('Exception during setting of arguments for ModuleTask:') raise def __activate_connect_task_modules(self, name: str, task: ModuleTask) -> None: """Activate and connect all configured module connectors for ModuleTask""" try: module_manager = self._qudi_main.module_manager connect_targets = dict() for conn_name, module_name in self._module_task_configs[name][ 'connect' ].items(): module = module_manager[module_name] module.activate() connect_targets[conn_name] = module.instance task.connect_modules(connect_targets) except: self.log.exception( f'Exception during modules connection for ModuleTask "{name}":' ) task.disconnect_modules() raise def __move_task_into_thread(self, name: str, task: ModuleTask) -> None: """Create a new QThread via qudi thread manager and move ModuleTask instance into it""" try: thread = self._qudi_main.thread_manager.get_new_thread( name=f'ModuleTask-{name}' ) if thread is None: raise RuntimeError( f'Unable to create QThread with name "ModuleTask-{name}"' ) except RuntimeError: self.log.exception('Exception during thread creation:') raise task.moveToThread(thread) thread.started.connect(task.run, QtCore.Qt.QueuedConnection) thread.finished.connect(partial(self._thread_finished_callback, name=name)) def __connect_task_signals(self, name: str, task: ModuleTask) -> None: task.sigFinished.connect( partial(self._task_finished_callback, name=name), QtCore.Qt.QueuedConnection ) task.sigStateChanged.connect( partial(self._task_state_changed_callback, name=name), QtCore.Qt.QueuedConnection, ) def __start_task(self, name: str, task: ModuleTask) -> None: self._running_tasks[name] = task task.thread().start()