Source code for qudi.core.services

# -*- coding: utf-8 -*-
"""
This file contains the qudi tools for remote module sharing via rpyc server.

.. Copyright (c) 2021, the qudi developers. See the AUTHORS.md file at the top-level directory of this
.. distribution and on <https://github.com/Ulm-IQO/qudi-core/>
..
.. This file is part of qudi.
..
.. Qudi is free software: you can redistribute it and/or modify it under the terms of
.. the GNU Lesser General Public License as published by the Free Software Foundation,
.. either version 3 of the License, or (at your option) any later version.
..
.. Qudi is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
.. without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
.. See the GNU Lesser General Public License for more details.
..
.. You should have received a copy of the GNU Lesser General Public License along with qudi.
.. If not, see <https://www.gnu.org/licenses/>.
"""

__all__ = ("RemoteModulesService", "QudiNamespaceService")

import logging

import rpyc
import weakref
from functools import wraps
from inspect import signature, isfunction, ismethod

from qudi.util.mutex import Mutex
from qudi.util.models import DictTableModel
from qudi.util.network import netobtain
from qudi.core.logger import get_logger

logger = get_logger(__name__)


class _SharedModulesModel(DictTableModel):
    """Derived dict model for GUI display elements"""

    def __init__(self):
        super().__init__(headers="Shared Module")

    def data(self, index, role):
        """Get data from the model for a given cell. Data can have a role that affects display.

        Parameters
        ----------
        index : QModelIndex
            Cell for which data is requested.
        role : ItemDataRole
            Role for which data is requested.

        Returns
        -------
        QVariant
            Data for the given cell and role.
        """
        data = super().data(index, role)
        if data is None:
            return None
        # second column returns weakref.ref object
        if index.column() == 1:
            data = data()
        return data


[docs] class RemoteModulesService(rpyc.Service): """An RPyC service that has a module list.""" ALIASES = ["RemoteModules"] def __init__(self, *args, force_remote_calls_by_value=False, **kwargs): super().__init__(*args, **kwargs) self._thread_lock = Mutex() self.shared_modules = _SharedModulesModel() self._force_remote_calls_by_value = force_remote_calls_by_value
[docs] def share_module(self, module): with self._thread_lock: if module.name in self.shared_modules: logger.warning(f'Module "{module.name}" already shared') return self.shared_modules[module.name] = weakref.ref(module) weakref.finalize(module, self.remove_shared_module, module.name)
[docs] def remove_shared_module(self, module): with self._thread_lock: name = module if isinstance(module, str) else module.name self.shared_modules.pop(name, None)
[docs] def on_connect(self, conn): """code that runs when a connection is created""" host, port = conn._config["endpoints"][1] logger.info( f"Client connected to remote modules service from [{host}]:{port:d}" )
[docs] def on_disconnect(self, conn): """code that runs when the connection is closing""" host, port = conn._config["endpoints"][1] logger.info( f"Client [{host}]:{port:d} disconnected from remote modules service" )
[docs] def exposed_get_module_instance(self, name, activate=False): """Return reference to a module in the shared module list. Parameters ---------- name : str Unique module name. Returns ------- object Reference to the module. """ with self._thread_lock: try: module = self.shared_modules.get(name, None)() except TypeError: logger.error( f'Client requested a module ("{name}") that is not shared.' ) return None if activate: if not module.activate(): logger.error( f'Unable to share requested module "{name}" with client. Module ' f"can not be activated." ) return None if self._force_remote_calls_by_value: return ModuleRpycProxy(module.instance) return module.instance
[docs] def exposed_get_available_module_names(self): """Returns the currently shared module names independent of the current module state. Returns ------- tuple Names of the currently shared modules. """ with self._thread_lock: return tuple(self.shared_modules)
[docs] def exposed_get_loaded_module_names(self): """Returns the currently shared module names for all modules that have been loaded (instantiated). Returns ------- tuple Names of the currently shared and loaded modules. """ with self._thread_lock: all_modules = {name: ref() for name, ref in self.shared_modules.items()} return tuple( name for name, mod in all_modules.items() if mod is not None and mod.instance is not None )
[docs] def exposed_get_active_module_names(self): """ Returns the currently shared module names for all modules that are active. Returns ------- tuple Names of the currently shared active modules. """ with self._thread_lock: all_modules = {name: ref() for name, ref in self.shared_modules.items()} return tuple( name for name, mod in all_modules.items() if mod is not None and mod.is_active )
[docs] class QudiNamespaceService(rpyc.Service): """An RPyC service providing a namespace dict containing references to all active qudi module instances as well as a reference to the qudi application itself. """ ALIASES = ["QudiNamespace"] def __init__(self, *args, qudi, force_remote_calls_by_value=False, **kwargs): super().__init__(*args, **kwargs) self.__qudi_ref = weakref.ref(qudi) self._notifier_callbacks = dict() self._force_remote_calls_by_value = force_remote_calls_by_value @property def _qudi(self): qudi = self.__qudi_ref() if qudi is None: raise RuntimeError("Dead qudi application reference encountered") return qudi @property def _module_manager(self): manager = self._qudi.module_manager if manager is None: raise RuntimeError("No module manager initialized in qudi application") return manager
[docs] def on_connect(self, conn): """code that runs when a connection is created""" try: self._notifier_callbacks[conn] = rpyc.async_(conn.root.modules_changed) except AttributeError: pass host, port = conn._config["endpoints"][1] logger.info(f"Client connected to local module service from [{host}]:{port:d}")
[docs] def on_disconnect(self, conn): """code that runs when the connection is closing""" self._notifier_callbacks.pop(conn, None) host, port = conn._config["endpoints"][1] logger.info(f"Client [{host}]:{port:d} disconnected from local module service")
[docs] def notify_module_change(self): logger.debug( "Local module server has detected a module state change and sends async " "notifier signals to all clients" ) for callback in self._notifier_callbacks.values(): callback()
[docs] def exposed_get_namespace_dict(self): """Returns the instances of the currently active modules as well as a reference to the qudi application itself. Returns ------- dict Names (keys) and object references (values). """ if self._force_remote_calls_by_value: mods = { name: ModuleRpycProxy(mod.instance) for name, mod in self._module_manager.items() if mod.is_active } else: mods = { name: mod.instance for name, mod in self._module_manager.items() if mod.is_active } mods["qudi"] = self._qudi return mods
[docs] def exposed_get_logger(self, name: str) -> logging.Logger: """Returns a logger object for remote processes to log into the qudi logging facility""" return get_logger(name)
class ModuleRpycProxy: """Instances of this class serve as proxies for qudi modules accessed via RPyC. It currently wraps all API methods (none- and single-underscore methods) to only receive parameters "by value", i.e. using qudi.util.network.netobtain. This will only work if all method arguments are "pickle-able". In addition all values passed to __setattr__ are also received "by value". Proxy class concept heavily inspired by this python recipe under PSF License: https://code.activestate.com/recipes/496741-object-proxying/ """ __slots__ = ["_obj_ref", "__weakref__"] def __init__(self, obj): object.__setattr__(self, "_obj_ref", weakref.ref(obj)) # proxying (special cases) def __getattribute__(self, name): obj = object.__getattribute__(self, "_obj_ref")() attr = getattr(obj, name) if not name.startswith("__") and ismethod(attr) or isfunction(attr): sig = signature(attr) if len(sig.parameters) > 0: @wraps(attr) def wrapped(*args, **kwargs): sig.bind(*args, **kwargs) args = [netobtain(arg) for arg in args] kwargs = {name: netobtain(arg) for name, arg in kwargs.items()} return attr(*args, **kwargs) wrapped.__signature__ = sig return wrapped return attr def __delattr__(self, name): obj = object.__getattribute__(self, "_obj_ref")() return delattr(obj, name) def __setattr__(self, name, value): obj = object.__getattribute__(self, "_obj_ref")() return setattr(obj, name, netobtain(value)) # factories _special_names = ( "__abs__", "__add__", "__and__", "__call__", "__cmp__", "__coerce__", "__contains__", "__delitem__", "__delslice__", "__div__", "__divmod__", "__eq__", "__float__", "__floordiv__", "__ge__", "__getitem__", "__getslice__", "__gt__", "__hash__", "__hex__", "__iadd__", "__iand__", "__idiv__", "__idivmod__", "__ifloordiv__", "__ilshift__", "__imod__", "__imul__", "__int__", "__invert__", "__ior__", "__ipow__", "__irshift__", "__isub__", "__iter__", "__itruediv__", "__ixor__", "__le__", "__len__", "__long__", "__lshift__", "__lt__", "__mod__", "__mul__", "__ne__", "__neg__", "__oct__", "__or__", "__pos__", "__pow__", "__radd__", "__rand__", "__rdiv__", "__rdivmod__", "__reduce__", "__reduce_ex__", "__repr__", "__reversed__", "__rfloorfiv__", "__rlshift__", "__rmod__", "__rmul__", "__ror__", "__rpow__", "__rrshift__", "__rshift__", "__rsub__", "__rtruediv__", "__rxor__", "__setitem__", "__setslice__", "__sub__", "__truediv__", "__xor__", "next", "__str__", "__nonzero__", ) @classmethod def _create_class_proxy(cls, theclass): """creates a proxy for the given class""" def make_method(method_name): def method(self, *args, **kw): obj = object.__getattribute__(self, "_obj_ref")() args = [netobtain(arg) for arg in args] kw = {key: netobtain(val) for key, val in kw.items()} return getattr(obj, method_name)(*args, **kw) return method # Add all special names to this wrapper class if they are present in the original class namespace = dict() for name in cls._special_names: if hasattr(theclass, name): namespace[name] = make_method(name) return type(f"{cls.__name__}({theclass.__name__})", (cls,), namespace) def __new__(cls, obj, *args, **kwargs): """creates an proxy instance referencing `obj`. (obj, *args, **kwargs) are passed to this class' __init__, so deriving classes can define an __init__ method of their own. note: _class_proxy_cache is unique per class (each deriving class must hold its own cache) """ theclass = cls._create_class_proxy(obj.__class__) return object.__new__(theclass)