Module enrgdaq.supervisor

Global variables

var DAQ_JOB_MARK_AS_ALIVE_TIME_SECONDS

Time in seconds to mark a DAQ job as alive after it has been running for that long.

var DAQ_JOB_QUEUE_ACTION_TIMEOUT

Time in seconds to wait for a DAQ job to process a message.

var DAQ_SUPERVISOR_DEFAULT_RESTART_TIME_SECONDS

Default time in seconds to wait before restarting a DAQ job.

var DAQ_SUPERVISOR_HEARTBEAT_MESSAGE_INTERVAL_SECONDS

Time in seconds between sending supervisor heartbeat messages.

var DAQ_SUPERVISOR_LOOP_INTERVAL_SECONDS

Time in seconds between supervisor loop iterations.

var DAQ_SUPERVISOR_ROUTES_MESSAGE_INTERVAL_SECONDS

Time in seconds between sending supervisor routes messages.

var DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS

Time in seconds between sending supervisor stats messages.

Classes

class RestartDAQJobSchedule (daq_job_process: DAQJobProcess,
restart_at: datetime.datetime)
Expand source code
@dataclass
class RestartDAQJobSchedule:
    daq_job_process: DAQJobProcess
    restart_at: datetime

RestartDAQJobSchedule(daq_job_process: enrgdaq.daq.base.DAQJobProcess, restart_at: datetime.datetime)

Instance variables

var daq_job_processDAQJobProcess
var restart_at : datetime.datetime
class Supervisor (config: SupervisorConfig | None = None,
daq_job_processes: list[DAQJobProcess] | None = None,
daq_job_config_path: str = 'configs/')
Expand source code
class Supervisor:
    """
    Supervisor class responsible for managing DAQ job threads, handling their lifecycle,
    and facilitating communication between them.
    Attributes:
        config (SupervisorConfig | None): Configuration for the supervisor.
        daq_job_processes (list[DAQJobProcess]): List of DAQ job processes managed by the supervisor.
        restart_schedules (list[RestartDAQJobSchedule]): List of schedules for restarting DAQ jobs.
        _logger (logging.Logger): Logger instance for logging supervisor activities.
        _last_stats_message_time (datetime): Last time a stats message was sent.
        _cnc_instance (SupervisorCNC | None): CNC instance for the supervisor.

        _last_stats_message_time (datetime): Last time a stats message was sent.
        _last_heartbeat_message_time (datetime): Last time a heartbeat message was sent.
        _log_queue (Queue[logging.LogRecord]): Queue for logging messages.
        _log_listener (QueueListener | None): Log listener for the supervisor.
        _is_stopped (bool): Whether the supervisor is stopped.
        _daq_jobs_to_load (list[DAQJobProcess] | None): List of DAQ jobs to load.
        _daq_job_config_path (str): Path to the DAQ job configuration file.
        _psutil_process_cache (dict[int, psutil.Process]): Cache for process information.
    """

    config: SupervisorConfig | None
    daq_job_processes: list[DAQJobProcess]
    restart_schedules: list[RestartDAQJobSchedule]
    _logger: logging.Logger
    _cnc_instance: SupervisorCNC | None = None

    _last_stats_message_time: datetime
    _last_heartbeat_message_time: datetime
    _last_routes_message_time: datetime
    _log_queue: Any
    _log_listener: QueueListener | None = None
    _is_stopped: bool
    _daq_jobs_to_load: list[DAQJobProcess] | None
    _daq_job_config_path: str
    _psutil_process_cache: dict[int, psutil.Process]

    def __init__(
        self,
        config: SupervisorConfig | None = None,
        daq_job_processes: list[DAQJobProcess] | None = None,
        daq_job_config_path: str = "configs/",
    ):
        self._daq_job_config_path = daq_job_config_path
        self._log_queue = _create_queue()
        self._log_listener = None
        self._logger = logging.getLogger()

        if config is None:
            self.config = self._load_supervisor_config()
        else:
            self.config = config

        self._daq_jobs_to_load = daq_job_processes
        self.daq_job_processes = []
        self.restart_schedules = []
        self.daq_job_remote_stats = {}
        self.daq_job_stats = {}
        self._psutil_process_cache = {}
        if not os.path.exists(self._daq_job_config_path):
            raise ValueError(
                f"DAQ job config path '{self._daq_job_config_path}' does not exist."
            )
        self._is_stopped = False

        self.message_broker = MessageBroker()
        random_id = str(uuid.uuid4())[:8]

        # IPC is not supported on Windows, use TCP with dynamic ports instead
        if sys.platform == "win32":
            xpub_port = get_available_port()
            xsub_port = get_available_port()
            self.supervisor_xpub_url = f"tcp://127.0.0.1:{xpub_port}"
            self.supervisor_xsub_url = f"tcp://127.0.0.1:{xsub_port}"
        else:
            self.supervisor_xpub_url = (
                f"ipc:///tmp/supervisor_{self.supervisor_id}_{random_id}_xpub.ipc"
            )
            self.supervisor_xsub_url = (
                f"ipc:///tmp/supervisor_{self.supervisor_id}_{random_id}_xsub.ipc"
            )

        self.message_broker.add_xpub_socket("supervisor_xpub", self.supervisor_xpub_url)
        self.message_broker.add_xsub_socket("supervisor_xsub", self.supervisor_xsub_url)
        self.message_broker.start_proxy(
            "supervisor_proxy", "supervisor_xpub", "supervisor_xsub"
        )

        self.message_handler = SupervisorMessageHandler(
            xpub_url=self.supervisor_xpub_url,
            supervisor_id=self.supervisor_id,
            on_stats_receive=lambda x: self._update_stats(x.stats),
            on_remote_stats_receive=lambda x: self._update_remote_stats(x.stats),
        )

        self._setup_federation()

        self._last_stats_message_time = datetime.min
        self._last_heartbeat_message_time = datetime.min
        self._last_routes_message_time = datetime.min

    def init(self):
        """
        Initializes the supervisor, loads configuration, starts DAQ job threads, and warns for lack of DAQ jobs.

        You should call this method after creating a new instance of the Supervisor class.
        """

        self._logger.setLevel(
            self.config.verbosity.to_logging_level() if self.config else logging.INFO
        )

        assert self.config is not None

        # Change logging name based on supervisor id
        self._logger.name = f"Supervisor({self.config.info.supervisor_id})"

        if self.config.cnc is not None:
            self._logger.debug("Starting CNC instance...")
            self._cnc_instance = start_supervisor_cnc(
                supervisor=self,
                config=self.config.cnc,
            )

        # Initialize ring buffer for zero-copy PyArrow message transfer
        if sys.platform != "win32":
            from enrgdaq.utils.shared_ring_buffer import get_global_ring_buffer

            try:
                # get hash of supervisor_id to fit name
                assert self.config is not None

                supervisor_id_hash = base64.b64encode(
                    hashlib.md5(self.config.info.supervisor_id.encode()).digest()
                ).decode()
                get_global_ring_buffer(
                    name=f"ring_{supervisor_id_hash}",
                    total_size=self.config.ring_buffer_size_mb * 1024 * 1024,
                    slot_size=self.config.ring_buffer_slot_size_kb * 1024,
                )
                self._logger.info(
                    f"Initialized ring buffer: {self.config.ring_buffer_size_mb}MB total, "
                    f"{self.config.ring_buffer_slot_size_kb}KB per slot"
                )
            except Exception as e:
                self._logger.warning(f"Failed to initialize ring buffer: {e}")

        # Set up log listener to capture logs from child processes
        handlers: list[logging.Handler] = []
        if self._cnc_instance:
            # Find the CNCLogHandler in the CNC instance logger
            cnc_handler = next(
                (
                    h
                    for h in self._cnc_instance._logger.handlers
                    if isinstance(h, CNCLogHandler)
                ),
                None,
            )
            if cnc_handler:
                handlers.append(cnc_handler)
            else:
                # Fallback to root handlers if CNC handler is missing
                handlers.extend(logging.getLogger().handlers)
        else:
            handlers.extend(logging.getLogger().handlers)

        if handlers:
            self._log_listener = QueueListener(self._log_queue, *handlers)
            self._log_listener.start()

        self.restart_schedules = []
        self.daq_job_processes = []
        self._logger.debug("Starting DAQ job processes...")
        self.start_daq_job_processes(self._daq_jobs_to_load or [])
        self._logger.debug(f"Started {len(self.daq_job_processes)} DAQ job processes")
        self.warn_for_lack_of_daq_jobs()

        self.message_handler.start()

        self._last_stats_message_time = datetime.min
        self._last_heartbeat_message_time = datetime.min

    def start_daq_job_processes(self, daq_jobs_to_load: list[DAQJobProcess]):
        assert self.config is not None

        # Start threads from user-provided daq jobs, or by
        # reading the config files like usual
        jobs_to_start = daq_jobs_to_load or load_daq_jobs(
            self._daq_job_config_path, self.config.info
        )

        for job in jobs_to_start:
            job.log_queue = self._log_queue
            job.zmq_xpub_url = self.supervisor_xpub_url
            job.zmq_xsub_url = self.supervisor_xsub_url

        started_jobs = start_daq_jobs(jobs_to_start)
        self.daq_job_processes.extend(started_jobs)

    def run(self):
        """
        Main loop that continuously runs the supervisor, handling job restarts and message passing.
        """
        while not self._is_stopped:
            try:
                self.loop()
                time.sleep(DAQ_SUPERVISOR_LOOP_INTERVAL_SECONDS)
            except KeyboardInterrupt:
                self._logger.warning("KeyboardInterrupt received, stopping")
                self.stop()
                break
        self.message_broker.send(
            DAQJobMessageStop(
                reason="Stopped by supervisor",
                topics={Topic.supervisor_broadcast(self.supervisor_id)},
            )
        )

    def stop(self):
        """
        Stops the supervisor and all its components.
        """
        if self._is_stopped:
            return
        if self._cnc_instance:
            self._cnc_instance.stop()
        if self._log_listener:
            self._log_listener.stop()

        # Clean up ring buffer
        if sys.platform != "win32":
            from enrgdaq.utils.shared_ring_buffer import cleanup_global_ring_buffer

            try:
                cleanup_global_ring_buffer()
                self._logger.debug("Ring buffer cleaned up")
            except Exception as e:
                self._logger.warning(f"Failed to cleanup ring buffer: {e}")

        self._is_stopped = True

    def loop(self):
        """
        A single iteration of the supervisor's main loop.
        """

        # Remove dead threads
        dead_processes = [
            t for t in self.daq_job_processes if t.process and not t.process.is_alive()
        ]
        # Clean up dead threads
        self.daq_job_processes = [
            t for t in self.daq_job_processes if t not in dead_processes
        ]

        # Get restart schedules for dead jobs
        self.restart_schedules.extend(self.get_restart_schedules(dead_processes))

        # Restart jobs that have stopped or are scheduled to restart
        self.restart_daq_jobs()

    def get_status(self) -> SupervisorStatus:
        """
        Gets the status of the supervisor and its DAQ jobs.

        Returns:
            SupervisorStatus: A struct containing the status.
        """
        assert self.config is not None
        return SupervisorStatus(
            daq_jobs=[
                daq_job_process.daq_job_info
                for daq_job_process in self.daq_job_processes
                if daq_job_process.daq_job_info is not None
            ],
            supervisor_info=self.config.info,
            daq_job_stats=self.daq_job_stats,
            daq_job_remote_stats=self.daq_job_remote_stats,
            restart_schedules=[
                RestartScheduleInfo(
                    job=sched.daq_job_process.daq_job_cls.__name__,
                    restart_at=sched.restart_at.isoformat(),
                )
                for sched in self.restart_schedules
            ],
        )

    def get_restart_schedules(
        self, dead_processes: list[DAQJobProcess]
    ) -> list[RestartDAQJobSchedule]:
        """
        Gets the restart schedules for the dead threads.

        Args:
            dead_processes (list[DAQJobProcess]): List of dead processes.

        Returns:
            list[RestartDAQJobSchedule]: List of restart schedules.
        """

        res: list[RestartDAQJobSchedule] = []
        for process in dead_processes:
            # Skip processes that should not be restarted on crash
            if not process.restart_on_crash:
                self._logger.info(
                    f"Removing {process.daq_job_cls.__name__} from process list"
                )
                if process in self.daq_job_processes:
                    self.daq_job_processes.remove(process)
                continue

            restart_offset = getattr(process.daq_job_cls, "restart_offset", None)
            if not isinstance(restart_offset, timedelta):
                restart_offset = timedelta(
                    seconds=DAQ_SUPERVISOR_DEFAULT_RESTART_TIME_SECONDS
                )
            self._logger.info(
                f"Scheduling restart of {process.daq_job_cls.__name__} in {restart_offset.total_seconds()} seconds"
            )
            res.append(
                RestartDAQJobSchedule(
                    daq_job_process=process,
                    restart_at=datetime.now() + restart_offset,
                )
            )
        return res

    def restart_daq_jobs(self):
        """
        Restarts the DAQ jobs that have been scheduled for restart.
        """
        assert self.config is not None

        schedules_to_remove: list[RestartDAQJobSchedule] = []
        for restart_schedule in self.restart_schedules:
            if datetime.now() < restart_schedule.restart_at:
                continue
            new_daq_job_process = rebuild_daq_job(
                restart_schedule.daq_job_process, self.config.info
            )
            # Set ZMQ URLs and log queue for the restarted process
            new_daq_job_process.log_queue = self._log_queue
            new_daq_job_process.zmq_xpub_url = self.supervisor_xpub_url
            new_daq_job_process.zmq_xsub_url = self.supervisor_xsub_url

            self.daq_job_processes.append(start_daq_job(new_daq_job_process))

            schedules_to_remove.append(restart_schedule)

        # Remove processed schedules
        self.restart_schedules = [
            x for x in self.restart_schedules if x not in schedules_to_remove
        ]

    def warn_for_lack_of_daq_jobs(self):
        DAQ_JOB_ABSENT_WARNINGS = {
            DAQJobStore: "No store job found, data will not be stored",
            DAQJobAlert: "No alert job found, alerts will not be sent",
        }

        for daq_job_type, warning_message in DAQ_JOB_ABSENT_WARNINGS.items():
            if not any(
                process
                for process in self.daq_job_processes
                if issubclass(process.daq_job_cls, daq_job_type)
            ):
                self._logger.warning(warning_message)

    def _load_supervisor_config(self):
        supervisor_config_file_path = os.path.join(
            self._daq_job_config_path, SUPERVISOR_CONFIG_FILE_NAME
        )
        if not os.path.exists(supervisor_config_file_path):
            self._logger.warning(
                f"No supervisor config file found at '{supervisor_config_file_path}', using default config"
            )
            return SupervisorConfig(info=SupervisorInfo(supervisor_id=platform.node()))

        with open(supervisor_config_file_path, "rb") as f:
            return msgspec.toml.decode(f.read(), type=SupervisorConfig)

    @cache
    def _get_supervisor_daq_job_info(self):
        assert self.config is not None
        return DAQJobInfo(
            daq_job_type="Supervisor",
            supervisor_info=self.config.info,
            unique_id=self.config.info.supervisor_id,
            instance_id=0,
            config="",
        )

    def _setup_federation(self) -> None:
        """
        Set up federation between supervisors in a star topology.

        If this supervisor is the server:
            - Exposes additional XPUB/XSUB endpoints for clients to connect to
            - Starts a proxy between these endpoints

        If this supervisor is a client:
            - Connects to the server's XPUB to receive messages
            - Connects to the server's XSUB to send messages
            - Starts forwarder threads for bidirectional communication
        """
        if self.config is None or self.config.federation is None:
            return

        fed = self.config.federation

        if fed.is_server:
            # Server mode: add federation binds to existing supervisor sockets
            # This allows remote clients to connect to the same proxy as local jobs
            if fed.server_xpub_url and fed.server_xsub_url:
                self._logger.info(
                    f"Adding federation endpoints: XPUB={fed.server_xpub_url}, XSUB={fed.server_xsub_url}"
                )
                # Bind additional addresses to existing sockets
                xpub_socket = self.message_broker.xpub_sockets["supervisor_xpub"]
                xsub_socket = self.message_broker.xsub_sockets["supervisor_xsub"]
                xpub_socket.bind(fed.server_xpub_url)
                xsub_socket.bind(fed.server_xsub_url)
                self._logger.info("Federation server ready")
            else:
                self._logger.warning(
                    "Federation server mode enabled but XPUB/XSUB URLs not configured"
                )
        else:
            # Client mode: connect to remote server
            if fed.remote_server_xpub_url and fed.remote_server_xsub_url:
                self._logger.info(
                    f"Connecting to federation server XSUB={fed.remote_server_xsub_url}"
                )

                # Create PUB socket to send messages to server
                self._fed_pub_socket = self.message_broker.connect_pub_to_xsub(
                    "federation_pub", fed.remote_server_xsub_url
                )

                # Start forwarder thread (local -> server only)
                self._start_federation_forwarders()

                self._logger.info("Connected to federation server")
            else:
                self._logger.warning(
                    "Federation client mode but remote server URLs not configured"
                )

    def _start_federation_forwarders(self) -> None:
        """
        Start forwarder to push messages from local proxy to federation server.

        Uses ZMQ Poller for efficient message forwarding while being stoppable.

        Note: We only forward LOCAL → SERVER, not the reverse.
        If we forwarded server messages back to local, it would create a loop:
        (local → server → back to local → server again → ...)
        """

        # Create SUB socket to receive from local XPUB
        local_sub = self.message_broker.context.socket(zmq.SUB)
        local_sub.connect(self.supervisor_xpub_url)
        local_sub.setsockopt_string(zmq.SUBSCRIBE, "")

        def forward_to_server():
            self._logger.debug("Federation forwarder (local -> server) started")
            poller = zmq.Poller()
            poller.register(local_sub, zmq.POLLIN)
            try:
                while not self._is_stopped:
                    events = dict(poller.poll(100))  # 100ms timeout
                    if local_sub in events:
                        msg = local_sub.recv_multipart(zmq.NOBLOCK)
                        self._fed_pub_socket.send_multipart(msg)
            except zmq.ContextTerminated:
                pass
            finally:
                local_sub.close()
                self._logger.debug("Federation forwarder (local -> server) stopped")

        self._fed_to_server_thread = threading.Thread(
            target=forward_to_server, daemon=True
        )
        self._fed_to_server_thread.start()

    def _update_stats(self, stats):
        self.daq_job_stats = stats

    def _update_remote_stats(self, stats):
        self.daq_job_remote_stats = stats

    @property
    def supervisor_id(self):
        if self.config is None or self.config.info is None:
            return "unknown"
        return self.config.info.supervisor_id

Supervisor class responsible for managing DAQ job threads, handling their lifecycle, and facilitating communication between them.

Attributes

config : SupervisorConfig | None
Configuration for the supervisor.
daq_job_processes : list[DAQJobProcess]
List of DAQ job processes managed by the supervisor.
restart_schedules : list[RestartDAQJobSchedule]
List of schedules for restarting DAQ jobs.
_logger : logging.Logger
Logger instance for logging supervisor activities.
_last_stats_message_time : datetime
Last time a stats message was sent.
_cnc_instance : SupervisorCNC | None
CNC instance for the supervisor.
_last_stats_message_time : datetime
Last time a stats message was sent.
_last_heartbeat_message_time : datetime
Last time a heartbeat message was sent.
_log_queue : Queue[logging.LogRecord]
Queue for logging messages.
_log_listener : QueueListener | None
Log listener for the supervisor.
_is_stopped : bool
Whether the supervisor is stopped.
_daq_jobs_to_load : list[DAQJobProcess] | None
List of DAQ jobs to load.
_daq_job_config_path : str
Path to the DAQ job configuration file.
_psutil_process_cache : dict[int, psutil.Process]
Cache for process information.

Class variables

var configSupervisorConfig | None
var daq_job_processes : list[DAQJobProcess]
var restart_schedules : list[RestartDAQJobSchedule]

Instance variables

prop supervisor_id
Expand source code
@property
def supervisor_id(self):
    if self.config is None or self.config.info is None:
        return "unknown"
    return self.config.info.supervisor_id

Methods

def get_restart_schedules(self,
dead_processes: list[DAQJobProcess]) ‑> list[RestartDAQJobSchedule]
Expand source code
def get_restart_schedules(
    self, dead_processes: list[DAQJobProcess]
) -> list[RestartDAQJobSchedule]:
    """
    Gets the restart schedules for the dead threads.

    Args:
        dead_processes (list[DAQJobProcess]): List of dead processes.

    Returns:
        list[RestartDAQJobSchedule]: List of restart schedules.
    """

    res: list[RestartDAQJobSchedule] = []
    for process in dead_processes:
        # Skip processes that should not be restarted on crash
        if not process.restart_on_crash:
            self._logger.info(
                f"Removing {process.daq_job_cls.__name__} from process list"
            )
            if process in self.daq_job_processes:
                self.daq_job_processes.remove(process)
            continue

        restart_offset = getattr(process.daq_job_cls, "restart_offset", None)
        if not isinstance(restart_offset, timedelta):
            restart_offset = timedelta(
                seconds=DAQ_SUPERVISOR_DEFAULT_RESTART_TIME_SECONDS
            )
        self._logger.info(
            f"Scheduling restart of {process.daq_job_cls.__name__} in {restart_offset.total_seconds()} seconds"
        )
        res.append(
            RestartDAQJobSchedule(
                daq_job_process=process,
                restart_at=datetime.now() + restart_offset,
            )
        )
    return res

Gets the restart schedules for the dead threads.

Args

dead_processes : list[DAQJobProcess]
List of dead processes.

Returns

list[RestartDAQJobSchedule]
List of restart schedules.
def get_status(self) ‑> SupervisorStatus
Expand source code
def get_status(self) -> SupervisorStatus:
    """
    Gets the status of the supervisor and its DAQ jobs.

    Returns:
        SupervisorStatus: A struct containing the status.
    """
    assert self.config is not None
    return SupervisorStatus(
        daq_jobs=[
            daq_job_process.daq_job_info
            for daq_job_process in self.daq_job_processes
            if daq_job_process.daq_job_info is not None
        ],
        supervisor_info=self.config.info,
        daq_job_stats=self.daq_job_stats,
        daq_job_remote_stats=self.daq_job_remote_stats,
        restart_schedules=[
            RestartScheduleInfo(
                job=sched.daq_job_process.daq_job_cls.__name__,
                restart_at=sched.restart_at.isoformat(),
            )
            for sched in self.restart_schedules
        ],
    )

Gets the status of the supervisor and its DAQ jobs.

Returns

SupervisorStatus
A struct containing the status.
def init(self)
Expand source code
def init(self):
    """
    Initializes the supervisor, loads configuration, starts DAQ job threads, and warns for lack of DAQ jobs.

    You should call this method after creating a new instance of the Supervisor class.
    """

    self._logger.setLevel(
        self.config.verbosity.to_logging_level() if self.config else logging.INFO
    )

    assert self.config is not None

    # Change logging name based on supervisor id
    self._logger.name = f"Supervisor({self.config.info.supervisor_id})"

    if self.config.cnc is not None:
        self._logger.debug("Starting CNC instance...")
        self._cnc_instance = start_supervisor_cnc(
            supervisor=self,
            config=self.config.cnc,
        )

    # Initialize ring buffer for zero-copy PyArrow message transfer
    if sys.platform != "win32":
        from enrgdaq.utils.shared_ring_buffer import get_global_ring_buffer

        try:
            # get hash of supervisor_id to fit name
            assert self.config is not None

            supervisor_id_hash = base64.b64encode(
                hashlib.md5(self.config.info.supervisor_id.encode()).digest()
            ).decode()
            get_global_ring_buffer(
                name=f"ring_{supervisor_id_hash}",
                total_size=self.config.ring_buffer_size_mb * 1024 * 1024,
                slot_size=self.config.ring_buffer_slot_size_kb * 1024,
            )
            self._logger.info(
                f"Initialized ring buffer: {self.config.ring_buffer_size_mb}MB total, "
                f"{self.config.ring_buffer_slot_size_kb}KB per slot"
            )
        except Exception as e:
            self._logger.warning(f"Failed to initialize ring buffer: {e}")

    # Set up log listener to capture logs from child processes
    handlers: list[logging.Handler] = []
    if self._cnc_instance:
        # Find the CNCLogHandler in the CNC instance logger
        cnc_handler = next(
            (
                h
                for h in self._cnc_instance._logger.handlers
                if isinstance(h, CNCLogHandler)
            ),
            None,
        )
        if cnc_handler:
            handlers.append(cnc_handler)
        else:
            # Fallback to root handlers if CNC handler is missing
            handlers.extend(logging.getLogger().handlers)
    else:
        handlers.extend(logging.getLogger().handlers)

    if handlers:
        self._log_listener = QueueListener(self._log_queue, *handlers)
        self._log_listener.start()

    self.restart_schedules = []
    self.daq_job_processes = []
    self._logger.debug("Starting DAQ job processes...")
    self.start_daq_job_processes(self._daq_jobs_to_load or [])
    self._logger.debug(f"Started {len(self.daq_job_processes)} DAQ job processes")
    self.warn_for_lack_of_daq_jobs()

    self.message_handler.start()

    self._last_stats_message_time = datetime.min
    self._last_heartbeat_message_time = datetime.min

Initializes the supervisor, loads configuration, starts DAQ job threads, and warns for lack of DAQ jobs.

You should call this method after creating a new instance of the Supervisor class.

def loop(self)
Expand source code
def loop(self):
    """
    A single iteration of the supervisor's main loop.
    """

    # Remove dead threads
    dead_processes = [
        t for t in self.daq_job_processes if t.process and not t.process.is_alive()
    ]
    # Clean up dead threads
    self.daq_job_processes = [
        t for t in self.daq_job_processes if t not in dead_processes
    ]

    # Get restart schedules for dead jobs
    self.restart_schedules.extend(self.get_restart_schedules(dead_processes))

    # Restart jobs that have stopped or are scheduled to restart
    self.restart_daq_jobs()

A single iteration of the supervisor's main loop.

def restart_daq_jobs(self)
Expand source code
def restart_daq_jobs(self):
    """
    Restarts the DAQ jobs that have been scheduled for restart.
    """
    assert self.config is not None

    schedules_to_remove: list[RestartDAQJobSchedule] = []
    for restart_schedule in self.restart_schedules:
        if datetime.now() < restart_schedule.restart_at:
            continue
        new_daq_job_process = rebuild_daq_job(
            restart_schedule.daq_job_process, self.config.info
        )
        # Set ZMQ URLs and log queue for the restarted process
        new_daq_job_process.log_queue = self._log_queue
        new_daq_job_process.zmq_xpub_url = self.supervisor_xpub_url
        new_daq_job_process.zmq_xsub_url = self.supervisor_xsub_url

        self.daq_job_processes.append(start_daq_job(new_daq_job_process))

        schedules_to_remove.append(restart_schedule)

    # Remove processed schedules
    self.restart_schedules = [
        x for x in self.restart_schedules if x not in schedules_to_remove
    ]

Restarts the DAQ jobs that have been scheduled for restart.

def run(self)
Expand source code
def run(self):
    """
    Main loop that continuously runs the supervisor, handling job restarts and message passing.
    """
    while not self._is_stopped:
        try:
            self.loop()
            time.sleep(DAQ_SUPERVISOR_LOOP_INTERVAL_SECONDS)
        except KeyboardInterrupt:
            self._logger.warning("KeyboardInterrupt received, stopping")
            self.stop()
            break
    self.message_broker.send(
        DAQJobMessageStop(
            reason="Stopped by supervisor",
            topics={Topic.supervisor_broadcast(self.supervisor_id)},
        )
    )

Main loop that continuously runs the supervisor, handling job restarts and message passing.

def start_daq_job_processes(self,
daq_jobs_to_load: list[DAQJobProcess])
Expand source code
def start_daq_job_processes(self, daq_jobs_to_load: list[DAQJobProcess]):
    assert self.config is not None

    # Start threads from user-provided daq jobs, or by
    # reading the config files like usual
    jobs_to_start = daq_jobs_to_load or load_daq_jobs(
        self._daq_job_config_path, self.config.info
    )

    for job in jobs_to_start:
        job.log_queue = self._log_queue
        job.zmq_xpub_url = self.supervisor_xpub_url
        job.zmq_xsub_url = self.supervisor_xsub_url

    started_jobs = start_daq_jobs(jobs_to_start)
    self.daq_job_processes.extend(started_jobs)
def stop(self)
Expand source code
def stop(self):
    """
    Stops the supervisor and all its components.
    """
    if self._is_stopped:
        return
    if self._cnc_instance:
        self._cnc_instance.stop()
    if self._log_listener:
        self._log_listener.stop()

    # Clean up ring buffer
    if sys.platform != "win32":
        from enrgdaq.utils.shared_ring_buffer import cleanup_global_ring_buffer

        try:
            cleanup_global_ring_buffer()
            self._logger.debug("Ring buffer cleaned up")
        except Exception as e:
            self._logger.warning(f"Failed to cleanup ring buffer: {e}")

    self._is_stopped = True

Stops the supervisor and all its components.

def warn_for_lack_of_daq_jobs(self)
Expand source code
def warn_for_lack_of_daq_jobs(self):
    DAQ_JOB_ABSENT_WARNINGS = {
        DAQJobStore: "No store job found, data will not be stored",
        DAQJobAlert: "No alert job found, alerts will not be sent",
    }

    for daq_job_type, warning_message in DAQ_JOB_ABSENT_WARNINGS.items():
        if not any(
            process
            for process in self.daq_job_processes
            if issubclass(process.daq_job_cls, daq_job_type)
        ):
            self._logger.warning(warning_message)