Source code for qudi.core.parentpoller

# -*- coding: utf-8 -*-
"""
Parent poller mechanism from IPython.

.. Copyright (c) 2021, the qudi developers. See the AUTHORS.md file at the top-level directory of this
.. distribution and on <https://github.com/Ulm-IQO/qudi-core/>
..
.. This file is part of qudi.
..
.. Qudi is free software: you can redistribute it and/or modify it under the terms of
.. the GNU Lesser General Public License as published by the Free Software Foundation,
.. either version 3 of the License, or (at your option) any later version.
..
.. Qudi is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
.. without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
.. See the GNU Lesser General Public License for more details.
..
.. You should have received a copy of the GNU Lesser General Public License along with qudi.
.. If not, see <https://www.gnu.org/licenses/>.
"""

__all__ = ["ParentPollerUnix", "ParentPollerWindows"]

import ctypes
import os
import platform
import time
from threading import Thread
import logging

logger = logging.getLogger(__name__)


[docs] class ParentPollerUnix(Thread): """A Unix-specific daemon thread that terminates the program immediately when the parent process no longer exists. """ def __init__(self, quit_function=None): """Create the parent poller. Parameters ---------- quitfunction : callable Function to run before exiting. """ if quit_function is None: pass elif not callable(quit_function): raise TypeError("argument quit_function must be a callable.") super().__init__() self.daemon = True self.quit_function = quit_function
[docs] def run(self): """Run the parentpoller.""" # We cannot use os.waitpid because it works only for child processes. from errno import EINTR while True: try: if os.getppid() == 1: if self.quit_function is None: logger.critical("Parent process died!") else: logger.critical("Parent process died! Qudi shutting down...") self.quit_function() return except OSError as e: if e.errno == EINTR: continue raise time.sleep(1)
[docs] class ParentPollerWindows(Thread): """A Windows-specific daemon thread that listens for a special event that signals an interrupt and, optionally, terminates the program immediately when the parent process no longer exists. """ def __init__(self, parent_handle, quit_function=None): """Create the parent poller. Parameters ---------- quit_function : callable Function to call for shutdown if the parent process is dead. parent_handle : int The program will terminate immediately when this handle is signaled. """ if quit_function is None: pass elif not callable(quit_function): raise TypeError("argument quit_function must be a callable.") super().__init__() self.daemon = True self.quit_function = quit_function self.parent_handle = parent_handle self._stop_requested = False
[docs] def run(self): """Run the poll loop. This method never returns.""" try: from _winapi import WAIT_OBJECT_0, INFINITE except ImportError: from _subprocess import WAIT_OBJECT_0 # Build the list of handle to listen on. handle_list = [self.parent_handle] arch = platform.architecture()[0] c_int = ctypes.c_int64 if arch.startswith("64") else ctypes.c_int # Listen forever. while True: # Return if stop has been requested if self._stop_requested: return result = ctypes.windll.kernel32.WaitForMultipleObjects( len(handle_list), # nCount (c_int * len(handle_list))(*handle_list), # lpHandles False, # bWaitAll 1000, ) # dwMilliseconds if result >= len(handle_list): # Nothing happened. Probably timed out. continue elif result < WAIT_OBJECT_0: # wait failed, just give up and stop polling. logger.critical("Parent poll failed!!!!!") return else: handle = handle_list[result - WAIT_OBJECT_0] if handle == self.parent_handle: if self.quit_function is None: logger.critical("Parent process died!") else: logger.critical("Parent process died! Qudi shutting down...") self.quit_function() return
[docs] def stop(self) -> None: self._stop_requested = True
[docs] def start(self) -> None: self._stop_requested = False return super().start()