Source code for qudi.core.parentpoller
# -*- coding: utf-8 -*-
"""
Parent poller mechanism from IPython.
Copyright (c) 2015, IPython Development Team
Copyright (c) 2021, the qudi developers. See the AUTHORS.md file at the top-level directory of this
distribution and on <https://github.com/Ulm-IQO/qudi-core/>
This file is part of qudi.
Qudi is free software: you can redistribute it and/or modify it under the terms of
the GNU Lesser General Public License as published by the Free Software Foundation,
either version 3 of the License, or (at your option) any later version.
Qudi is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along with qudi.
If not, see <https://www.gnu.org/licenses/>.
"""
__all__ = ['ParentPollerUnix', 'ParentPollerWindows']
import ctypes
import os
import platform
import time
from threading import Thread
import logging
logger = logging.getLogger(__name__)
[docs]
class ParentPollerUnix(Thread):
"""A Unix-specific daemon thread that terminates the program immediately
when the parent process no longer exists.
"""
[docs]
def __init__(self, quit_function=None):
"""Create the parent poller.
Parameters
----------
quitfunction : callable
Function to run before exiting.
"""
if quit_function is None:
pass
elif not callable(quit_function):
raise TypeError('argument quit_function must be a callable.')
super().__init__()
self.daemon = True
self.quit_function = quit_function
def run(self):
"""Run the parentpoller.
"""
# We cannot use os.waitpid because it works only for child processes.
from errno import EINTR
while True:
try:
if os.getppid() == 1:
if self.quit_function is None:
logger.critical('Parent process died!')
else:
logger.critical('Parent process died! Qudi shutting down...')
self.quit_function()
return
except OSError as e:
if e.errno == EINTR:
continue
raise
time.sleep(1)
[docs]
class ParentPollerWindows(Thread):
"""A Windows-specific daemon thread that listens for a special event that signals an interrupt
and, optionally, terminates the program immediately when the parent process no longer exists.
"""
[docs]
def __init__(self, parent_handle, quit_function=None):
""" Create the parent poller.
Parameters
----------
quit_function : callable
Function to call for shutdown if the parent process is dead.
parent_handle : int
The program will terminate immediately when this handle is signaled.
"""
if quit_function is None:
pass
elif not callable(quit_function):
raise TypeError('argument quit_function must be a callable.')
super().__init__()
self.daemon = True
self.quit_function = quit_function
self.parent_handle = parent_handle
self._stop_requested = False
def run(self):
"""Run the poll loop. This method never returns.
"""
try:
from _winapi import WAIT_OBJECT_0, INFINITE
except ImportError:
from _subprocess import WAIT_OBJECT_0, INFINITE
# Build the list of handle to listen on.
handle_list = [self.parent_handle]
arch = platform.architecture()[0]
c_int = ctypes.c_int64 if arch.startswith('64') else ctypes.c_int
# Listen forever.
while True:
# Return if stop has been requested
if self._stop_requested:
return
result = ctypes.windll.kernel32.WaitForMultipleObjects(
len(handle_list), # nCount
(c_int * len(handle_list))(*handle_list), # lpHandles
False, # bWaitAll
1000) # dwMilliseconds
if result >= len(handle_list):
# Nothing happened. Probably timed out.
continue
elif result < WAIT_OBJECT_0:
# wait failed, just give up and stop polling.
logger.critical("Parent poll failed!!!!!")
return
else:
handle = handle_list[result - WAIT_OBJECT_0]
if handle == self.parent_handle:
if self.quit_function is None:
logger.critical('Parent process died!')
else:
logger.critical('Parent process died! Qudi shutting down...')
self.quit_function()
return
def stop(self) -> None:
self._stop_requested = True
def start(self) -> None:
self._stop_requested = False
return super().start()