Source code for qudi.core.modulemanager

# -*- coding: utf-8 -*-
"""
This file contains the Qudi Manager class.

.. Copyright (c) 2021, the qudi developers. See the AUTHORS.md file at the top-level directory of this
.. distribution and on <https://github.com/Ulm-IQO/qudi-core/>
..
.. This file is part of qudi.
..
.. Qudi is free software: you can redistribute it and/or modify it under the terms of
.. the GNU Lesser General Public License as published by the Free Software Foundation,
.. either version 3 of the License, or (at your option) any later version.
..
.. Qudi is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
.. without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
.. See the GNU Lesser General Public License for more details.
..
.. You should have received a copy of the GNU Lesser General Public License along with qudi.
.. If not, see <https://www.gnu.org/licenses/>.
"""

import os
import importlib
import copy
import weakref
import fysom

from typing import FrozenSet
from functools import partial
from PySide2 import QtCore

from qudi.util.mutex import (
    RecursiveMutex,
)  # provides access serialization between threads
from qudi.core.logger import get_logger
from qudi.core.servers import get_remote_module_instance
from qudi.core.module import Base, get_module_app_data_path

logger = get_logger(__name__)


[docs] class ModuleManager(QtCore.QObject): """ """ _instance = None # Only class instance created will be stored here as weakref _lock = RecursiveMutex() sigModuleStateChanged = QtCore.Signal(str, str, str) sigModuleAppDataChanged = QtCore.Signal(str, str, bool) sigManagedModulesChanged = QtCore.Signal(dict) def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None or cls._instance() is None: obj = super().__new__(cls, *args, **kwargs) cls._instance = weakref.ref(obj) return obj raise RuntimeError( 'ModuleManager is a singleton. An instance has already been created in this ' 'process. Please use ModuleManager.instance() instead.' ) def __init__(self, *args, qudi_main, **kwargs): super().__init__(*args, **kwargs) self._qudi_main_ref = weakref.ref(qudi_main, self._qudi_main_ref_dead_callback) self._modules = dict()
[docs] @classmethod def instance(cls): with cls._lock: if cls._instance is None: return None return cls._instance()
def __len__(self): with self._lock: return len(self._modules) def __getitem__(self, key): with self._lock: return self._modules.__getitem__(key) def __setitem__(self, key, value): with self._lock: if value.name != key: raise NameError('ManagedModule.name attribute does not match key') self.add_module(value, allow_overwrite=True) def __delitem__(self, key): self.remove_module(key) def __contains__(self, item): with self._lock: return self._modules.__contains__(item)
[docs] def clear(self): with self._lock: for module_name in tuple(self._modules): self.remove_module(module_name, ignore_missing=True, emit_change=False) self.sigManagedModulesChanged.emit(self.modules)
[docs] def get(self, *args): with self._lock: return self._modules.get(*args)
[docs] def items(self): return self._modules.copy().items()
[docs] def values(self): return self._modules.copy().values()
[docs] def keys(self): return self._modules.copy().keys()
@property def module_names(self): with self._lock: return tuple(self._modules) @property def module_states(self): with self._lock: return {name: mod.state for name, mod in self._modules.items()} @property def module_instances(self): with self._lock: return { name: mod.instance for name, mod in self._modules.items() if mod.instance is not None } @property def modules(self): return self._modules.copy()
[docs] def remove_module(self, module_name, ignore_missing=False, emit_change=True): with self._lock: module = self._modules.pop(module_name, None) if module is None and not ignore_missing: raise KeyError(f'No module with name "{module_name}" registered.') module.deactivate() module.sigStateChanged.disconnect(self.sigModuleStateChanged) module.sigAppDataChanged.disconnect(self.sigModuleAppDataChanged) if module.allow_remote_access: remote_modules_server = self._qudi_main_ref().remote_modules_server if remote_modules_server is not None: remote_modules_server.remove_shared_module(module_name) self.refresh_module_links() if emit_change: self.sigManagedModulesChanged.emit(self.modules)
[docs] def add_module( self, name, base, configuration, allow_overwrite=False, emit_change=True ): with self._lock: if not isinstance(name, str) or not name: raise TypeError('module name must be non-empty str type') if base not in ('gui', 'logic', 'hardware'): raise ValueError( f'No valid module base "{base}". ' f'Unable to create qudi module "{name}".' ) if allow_overwrite: self.remove_module(name, ignore_missing=True) elif name in self._modules: raise ValueError(f'Module with name "{name}" already registered.') module = ManagedModule(self._qudi_main_ref, name, base, configuration) module.sigStateChanged.connect(self.sigModuleStateChanged) module.sigAppDataChanged.connect(self.sigModuleAppDataChanged) self._modules[name] = module self.refresh_module_links() # Register module in remote module service if module should be shared if module.allow_remote_access: remote_modules_server = self._qudi_main_ref().remote_modules_server if remote_modules_server is None: raise RuntimeError( f'Unable to share qudi module "{module.name}" as remote module. No remote ' f'module server running in this qudi process.' ) else: logger.info( f'Start sharing qudi module "{module.name}" via remote module server.' ) remote_modules_server.share_module(module) if emit_change: self.sigManagedModulesChanged.emit(self.modules)
[docs] def activate_module(self, module_name): with self._lock: if module_name not in self._modules: raise KeyError( f'No module named "{module_name}" found in managed qudi modules. ' f'Module activation aborted.' ) self._modules[module_name].activate()
[docs] def deactivate_module(self, module_name): with self._lock: if module_name not in self._modules: raise KeyError( f'No module named "{module_name}" found in managed qudi modules. ' f'Module deactivation aborted.' ) self._modules[module_name].deactivate()
[docs] def reload_module(self, module_name): with self._lock: if module_name not in self._modules: raise KeyError( f'No module named "{module_name}" found in managed qudi modules. ' f'Module reload aborted.' ) return self._modules[module_name].reload()
[docs] def clear_module_app_data(self, module_name): with self._lock: if module_name not in self._modules: raise KeyError( f'No module named "{module_name}" found in managed qudi modules. ' f'Can not clear module app status.' ) return self._modules[module_name].clear_module_app_data()
[docs] def has_app_data(self, module_name): with self._lock: if module_name not in self._modules: raise KeyError( f'No module named "{module_name}" found in managed qudi modules. ' f'Can not check for app status file.' ) return self._modules[module_name].has_app_data()
[docs] def start_all_modules(self): with self._lock: for module in self._modules.values(): module.activate()
[docs] def stop_all_modules(self): with self._lock: for module in self._modules.values(): module.deactivate()
def _module_ref_dead_callback(self, dead_ref, module_name): self.remove_module(module_name, ignore_missing=True) def _qudi_main_ref_dead_callback(self): logger.error( 'Qudi main reference no longer valid. This should never happen. Tearing down ' 'ModuleManager.' ) self.clear()
[docs] class ManagedModule(QtCore.QObject): """Object representing a qudi module (gui, logic or hardware) to be managed by the qudi Manager object. Contains status properties and handles initialization, state transitions and connection of the module. """ sigStateChanged = QtCore.Signal(str, str, str) sigAppDataChanged = QtCore.Signal(str, str, bool) _lock = RecursiveMutex() # Single mutex shared across all ManagedModule instances __state_poll_interval = ( 1 # Max interval in seconds to poll module_state of remote modules ) def __init__(self, qudi_main_ref, name, base, configuration): if not isinstance(qudi_main_ref, weakref.ref): raise TypeError('qudi_main_ref must be weakref to qudi main instance.') if not name or not isinstance(name, str): raise ValueError('Module name must be a non-empty string.') if base not in ('gui', 'logic', 'hardware'): raise ValueError('Module base must be one of ("gui", "logic", "hardware").') super().__init__() if self.thread() is not QtCore.QCoreApplication.instance().thread(): raise RuntimeError( 'ManagedModules can only be owned by the application main thread.' ) self._qudi_main_ref = qudi_main_ref # Weak reference to qudi main instance self._name = name # Each qudi module needs a unique string identifier self._base = base # Remember qudi module base self._instance = None # Store the module instance later on cfg = copy.deepcopy(configuration) # Extract module and class name self._module, self._class = cfg.get('module.Class', 'REMOTE.REMOTE').rsplit( '.', 1 ) # Remember connections by name self._connect_cfg = cfg.get('connect', dict()) # See if remotemodules access to this module is allowed self._allow_remote_access = cfg.get('allow_remote', False) # Extract remote modules URL and certificate if this module is run on a remote machine self._remote_module_name = cfg.get('native_module_name', None) self._remote_address = cfg.get('address', None) self._remote_port = cfg.get('port', None) self._remote_certfile = cfg.get('certfile', None) self._remote_keyfile = cfg.get('keyfile', None) if any( attr is None for attr in [ self._remote_module_name, self._remote_address, self._remote_port, ] ): self._remote_url = None else: self._remote_url = f'rpyc://{self._remote_address}:{self._remote_port:d}/{self._remote_module_name}/' # Do not propagate remotemodules access self._allow_remote_access = False # The rest are config options self._options = cfg.get('options', dict()) self._required_modules = frozenset() self._dependent_modules = frozenset() self.__poll_timer = None self.__last_state = None def __call__(self): return self.instance @property def name(self): return self._name @property def module_base(self): return self._base @property def class_name(self): return self._class
[docs] def module_name(self): return self._module
@property def options(self): """.""" return copy.deepcopy(self._options) @property def instance(self): with self._lock: return self._instance @property def status_file_path(self): return get_module_app_data_path(self.class_name, self.module_base, self.name) @property def is_loaded(self): with self._lock: return self._instance is not None @property def is_active(self): with self._lock: return ( self._instance is not None and self._instance.module_state() != 'deactivated' ) @property def is_busy(self): with self._lock: return self.is_active and self._instance.module_state() != 'idle' @property def is_remote(self): return bool(self._remote_url) @property def allow_remote_access(self): return self._allow_remote_access @property def remote_url(self): return self._remote_url @property def remote_key_path(self): return self._remote_keyfile @property def remote_cert_path(self): return self._remote_certfile @property def state(self): try: return self._instance.module_state() except AttributeError: return 'not loaded' except: return 'BROKEN' @property def connection_cfg(self): return self._connect_cfg.copy() @property def required_modules(self) -> FrozenSet[weakref.ref]: return self._required_modules @required_modules.setter def required_modules(self, module_iter): for module in module_iter: if not isinstance(module, weakref.ref): raise TypeError( 'items in required_modules must be weakref.ref instances.' ) if not isinstance(module(), ManagedModule): if module() is None: logger.error( f'Dead weakref passed as required module to ManagedModule "{self._name}"' ) return raise TypeError( 'required_modules must be iterable of ManagedModule instances ' '(or weakref to same instances)' ) self._required_modules = frozenset(module_iter) @property def dependent_modules(self): return self._dependent_modules @dependent_modules.setter def dependent_modules(self, module_iter): dep_modules = set() for module in module_iter: if not isinstance(module, weakref.ref): raise TypeError( 'items in dependent_modules must be weakref.ref instances.' ) if not isinstance(module(), ManagedModule): if module() is None: logger.error( f'Dead weakref passed as dependent module to ManagedModule "{self._name}"' ) return raise TypeError( 'dependent_modules must be iterable of ManagedModule instances ' '(or weakref to same instances)' ) dep_modules.add(module) self._dependent_modules = frozenset(dep_modules) @property def ranking_active_dependent_modules(self): with self._lock: active_dependent_modules = set() for module_ref in self.dependent_modules: module = module_ref() if module is None: logger.warning( f'Dead dependent module weakref encountered in ManagedModule ' f'"{self._name}".' ) continue if module.is_active: active_modules = module.ranking_active_dependent_modules if active_modules: active_dependent_modules.update(active_modules) else: active_dependent_modules.add(module_ref) return active_dependent_modules @property def module_thread_name(self): return f'mod-{self._base}-{self._name}' @property def has_app_data(self): with self._lock: return os.path.exists(self.status_file_path)
[docs] @QtCore.Slot() def clear_module_app_data(self): with self._lock: try: os.remove(self.status_file_path) except OSError: pass finally: self.sigAppDataChanged.emit(self._base, self._name, self.has_app_data)
[docs] @QtCore.Slot() def activate(self) -> None: # Switch to the main thread if this method was called from another thread if QtCore.QThread.currentThread() is not self.thread(): QtCore.QMetaObject.invokeMethod( self, 'activate', QtCore.Qt.BlockingQueuedConnection ) if not self.is_active: raise RuntimeError( f'Failed to activate {self.module_base} module "{self.name}"!' ) return with self._lock: if not self.is_loaded: self._load() # Return early if already active if self.is_active: # If it is a GUI module, show it again. if self.module_base == 'gui': self._instance.show() return if self.is_remote: logger.info( f'Activating remote {self.module_base} module "{self.remote_url}"' ) else: logger.info( f'Activating {self.module_base} module "{self.module_name}.{self.class_name}"' ) # Recursive activation of required modules for module_ref in self.required_modules: module = module_ref() if module is None: raise ReferenceError( f'Dead required module weakref encountered in ' f'ManagedModule "{self._name}".' ) module.activate() # Establish module interconnections via Connector meta object in qudi module instance self._connect() # Activate this module if self._instance.is_module_threaded: thread_name = self.module_thread_name thread_manager = self._qudi_main_ref().thread_manager thread = thread_manager.get_new_thread(thread_name) self._instance.moveToThread(thread) thread.start() try: QtCore.QMetaObject.invokeMethod( self._instance.module_state, 'activate', QtCore.Qt.BlockingQueuedConnection, ) finally: # Cleanup if activation was not successful if not self.is_active: QtCore.QMetaObject.invokeMethod( self._instance, 'move_to_main_thread', QtCore.Qt.BlockingQueuedConnection, ) thread_manager.quit_thread(thread_name) thread_manager.join_thread(thread_name) else: try: self._instance.module_state.activate() except fysom.Canceled: pass self.__last_state = self.state self.sigStateChanged.emit(self._base, self._name, self.__last_state) self.sigAppDataChanged.emit(self._base, self._name, self.has_app_data) # Raise exception if by some reason no exception propagated to here and the activation # is still unsuccessful. if not self.is_active: try: self._disconnect() except: pass raise RuntimeError( f'Failed to activate {self.module_base} module "{self.name}"!' ) if self.is_remote: self.__poll_timer = QtCore.QTimer(self) self.__poll_timer.setInterval( int(round(self.__state_poll_interval * 1000)) ) self.__poll_timer.setSingleShot(True) self.__poll_timer.timeout.connect(self._poll_module_state) self.__poll_timer.start() else: self._instance.module_state.sigStateChanged.connect( self._state_change_callback )
@QtCore.Slot() def _poll_module_state(self): with self._lock: state = self.state if state != self.__last_state: self.__last_state = state self.sigStateChanged.emit(self._base, self._name, state) try: self.__poll_timer.start() except AttributeError: pass @QtCore.Slot(object) def _state_change_callback(self, event=None): self.sigStateChanged.emit(self._base, self._name, self.state)
[docs] @QtCore.Slot() def deactivate(self): # Switch to the main thread if this method was called from another thread if QtCore.QThread.currentThread() is not self.thread(): QtCore.QMetaObject.invokeMethod( self, 'deactivate', QtCore.Qt.BlockingQueuedConnection ) if self.is_active: raise RuntimeError( f'Failed to deactivate {self.module_base} module "{self.name}"!' ) return with self._lock: if not self.is_active: return if self.is_remote: logger.info( f'Deactivating remote {self.module_base} module "{self.remote_url}"' ) else: logger.info( f'Deactivating {self.module_base} module "{self.module_name}.{self.class_name}"' ) # Recursively deactivate dependent modules for module_ref in self.dependent_modules: module = module_ref() if module is None: raise ReferenceError( f'Dead dependent module weakref encountered in ManagedModule "{self.name}".' ) module.deactivate() # Disable state updated try: self.__poll_timer.stop() self.__poll_timer.timeout.disconnect() except AttributeError: self._instance.module_state.sigStateChanged.disconnect( self._state_change_callback ) finally: self.__poll_timer = None # Actual deactivation of this module if self._instance.is_module_threaded: thread_name = self.module_thread_name thread_manager = self._qudi_main_ref().thread_manager try: QtCore.QMetaObject.invokeMethod( self._instance.module_state, 'deactivate', QtCore.Qt.BlockingQueuedConnection, ) finally: QtCore.QMetaObject.invokeMethod( self._instance, 'move_to_main_thread', QtCore.Qt.BlockingQueuedConnection, ) thread_manager.quit_thread(thread_name) thread_manager.join_thread(thread_name) else: try: self._instance.module_state.deactivate() except fysom.Canceled: pass QtCore.QCoreApplication.instance().processEvents() # ToDo: Is this still needed? # Disconnect modules from this module self._disconnect() self.__last_state = self.state self.sigStateChanged.emit(self._base, self._name, self.__last_state) self.sigAppDataChanged.emit(self._base, self._name, self.has_app_data) # Raise exception if by some reason no exception propagated to here and the deactivation # is still unsuccessful. if self.is_active: raise RuntimeError( f'Failed to deactivate {self.module_base} module "{self.name}"!' )
[docs] @QtCore.Slot() def reload(self): # Switch to the main thread if this method was called from another thread if QtCore.QThread.currentThread() is not self.thread(): QtCore.QMetaObject.invokeMethod( self, 'reload', QtCore.Qt.BlockingQueuedConnection ) return with self._lock: # Deactivate if active was_active = self.is_active mod_to_activate = None if was_active: mod_to_activate = self.ranking_active_dependent_modules self.deactivate() # reload module self._load(reload=True) # re-activate all modules that have been active before if was_active: if mod_to_activate: for module_ref in mod_to_activate: module = module_ref() if module is None: continue module.activate() else: self.activate()
def _load(self, reload=False): """ """ with self._lock: try: # Do nothing if already loaded and no reload is requested if self.is_loaded and not reload: return if self.is_remote: try: self._instance = get_remote_module_instance( self.remote_url, certfile=self._remote_certfile, keyfile=self._remote_keyfile, ) except BaseException as e: self._instance = None raise RuntimeError( f'Error during initialization of remote ' f'{self.module_base} module {self.remote_url}' ) from e else: # qudi module import and reload mod = importlib.import_module(f'qudi.{self._base}.{self._module}') importlib.reload(mod) # Try getting qudi module class from imported module mod_class = getattr(mod, self._class, None) if mod_class is None: raise AttributeError( f'No module class "{self._class}" found in module ' f'"qudi.{self._base}.{self._module}"' ) # Check if imported class is a valid qudi module class if not issubclass(mod_class, Base): raise TypeError( f'Qudi module class "{mod_class}" is no subclass of ' f'"qudi.core.module.Base"' ) # Try to instantiate the imported qudi module class try: self._instance = mod_class( qudi_main_weakref=self._qudi_main_ref, name=self._name, config=self._options, ) except BaseException: self._instance = None raise RuntimeError( f'Error during initialization of qudi module ' f'"qudi.{self._base}.{self._module}.{self._class}"' ) finally: self.__last_state = self.state self.sigStateChanged.emit(self._base, self._name, self.__last_state) def _connect(self): with self._lock: # Check if module has already been loaded/instantiated if not self.is_loaded: raise RuntimeError( f'Connection failed. No module instance found for module ' f'"{self._base}.{self._name}".' ) # Collect all module instances required by connector config module_instances = { module_ref().name: module_ref().instance for module_ref in self.required_modules } module_connections = { conn_name: module_instances[mod_name] for conn_name, mod_name in self._connect_cfg.items() } # Apply module connections self._instance.connect_modules(module_connections) def _disconnect(self): with self._lock: self._instance.disconnect_modules()