Module enrgdaq.supervisor

Global variables

var DAQ_JOB_MARK_AS_ALIVE_TIME_SECONDS

Time in seconds to mark a DAQ job as alive after it has been running for that long.

var DAQ_JOB_QUEUE_ACTION_TIMEOUT

Time in seconds to wait for a DAQ job to process a message.

var DAQ_SUPERVISOR_SLEEP_TIME_SECONDS

Time in seconds to sleep between iterations of the supervisor's main loop.

var DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS

Time in seconds between sending supervisor stats messages.

Classes

class RestartDAQJobSchedule (daq_job_type: type[DAQJob],
daq_job_config: DAQJobConfig,
restart_at: datetime.datetime)
Expand source code
@dataclass
class RestartDAQJobSchedule:
    daq_job_type: type[DAQJob]
    daq_job_config: DAQJobConfig
    restart_at: datetime

RestartDAQJobSchedule(daq_job_type: type[enrgdaq.daq.base.DAQJob], daq_job_config: enrgdaq.daq.models.DAQJobConfig, restart_at: datetime.datetime)

Class variables

var daq_job_configDAQJobConfig
var daq_job_type : type[DAQJob]
var restart_at : datetime.datetime
class Supervisor
Expand source code
class Supervisor:
    """
    Supervisor class responsible for managing DAQ job threads, handling their lifecycle,
    and facilitating communication between them.
    Attributes:
        daq_job_threads (list[DAQJobThread]): List of DAQ job threads managed by the supervisor.
        daq_job_stats (DAQJobStatsDict): Dictionary holding statistics for each DAQ job type.
        restart_schedules (list[RestartDAQJobSchedule]): List of schedules for restarting DAQ jobs.
        _logger (logging.Logger): Logger instance for logging supervisor activities.
        _last_stats_message_time (datetime): Last time a stats message was sent.
    """

    daq_job_threads: list[DAQJobThread]
    daq_job_stats: DAQJobStatsDict
    restart_schedules: list[RestartDAQJobSchedule]
    _logger: logging.Logger

    _last_stats_message_time: datetime

    def init(self):
        """
        Initializes the supervisor, loads configuration, starts DAQ job threads, and warns for lack of DAQ jobs.

        You should call this method after creating a new instance of the Supervisor class.
        """

        self._logger = logging.getLogger()
        self.config = self._load_supervisor_config()

        # Change logging name based on supervisor id
        self._logger.name = f"Supervisor({self.config.supervisor_id})"

        self.restart_schedules = []
        self.daq_job_threads = self.start_daq_job_threads()
        self.daq_job_stats: DAQJobStatsDict = {
            type(thread.daq_job): DAQJobStats() for thread in self.daq_job_threads
        }
        self.warn_for_lack_of_daq_jobs()

        self._last_stats_message_time = datetime.now() - timedelta(
            seconds=DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS
        )

    def start_daq_job_threads(self) -> list[DAQJobThread]:
        return start_daq_jobs(load_daq_jobs("configs/", self.config))

    def run(self):
        """
        Main loop that continuously runs the supervisor, handling job restarts and message passing.
        """
        while True:
            try:
                self.loop()
                time.sleep(DAQ_SUPERVISOR_SLEEP_TIME_SECONDS)
            except KeyboardInterrupt:
                self._logger.warning("KeyboardInterrupt received, cleaning up")
                for daq_job_thread in self.daq_job_threads:
                    daq_job_thread.daq_job.__del__()
                break

    def loop(self):
        """
        A single iteration of the supervisor's main loop.
        """

        # Remove dead threads
        dead_threads = [t for t in self.daq_job_threads if not t.thread.is_alive()]
        # Clean up dead threads
        self.daq_job_threads = [
            t for t in self.daq_job_threads if t not in dead_threads
        ]

        # Get restart schedules for dead jobs
        self.restart_schedules.extend(self.get_restart_schedules(dead_threads))

        # Restart jobs that have stopped or are scheduled to restart
        self.restart_daq_jobs()

        # Handle thread alive stats for dead & alive threads
        self.handle_thread_alive_stats(dead_threads)

        # Get messages from DAQ Jobs
        daq_messages_out = self.get_messages_from_daq_jobs()

        # Add supervisor messages
        daq_messages_out.extend(self.get_supervisor_messages())

        # Send messages to appropriate DAQ Jobs
        self.send_messages_to_daq_jobs(daq_messages_out)

    def handle_thread_alive_stats(self, dead_threads: list[DAQJobThread]):
        """
        Handles the alive stats for the dead threads.

        Args:
            dead_threads (list[DAQJobThread]): List of dead threads.
        """

        for thread in self.daq_job_threads:
            if datetime.now() - thread.start_time > timedelta(
                seconds=DAQ_JOB_MARK_AS_ALIVE_TIME_SECONDS
            ):
                self.get_daq_job_stats(
                    self.daq_job_stats, type(thread.daq_job)
                ).is_alive = True

        for thread in dead_threads:
            self.get_daq_job_stats(
                self.daq_job_stats, type(thread.daq_job)
            ).is_alive = False

    def get_restart_schedules(self, dead_threads: list[DAQJobThread]):
        """
        Gets the restart schedules for the dead threads.

        Args:
            dead_threads (list[DAQJobThread]): List of dead threads.

        Returns:
            list[RestartDAQJobSchedule]: List of restart schedules.
        """

        res = []
        for thread in dead_threads:
            restart_offset = getattr(thread.daq_job, "restart_offset", None)
            if not isinstance(restart_offset, timedelta):
                restart_offset = timedelta(seconds=0)
            else:
                self._logger.info(
                    f"Scheduling restart of {type(thread.daq_job).__name__} in {restart_offset.total_seconds()} seconds"
                )
            res.append(
                RestartDAQJobSchedule(
                    daq_job_type=type(thread.daq_job),
                    daq_job_config=thread.daq_job.config,
                    restart_at=datetime.now() + restart_offset,
                )
            )
            thread.daq_job.free()
        return res

    def restart_daq_jobs(self):
        """
        Restarts the DAQ jobs that have been scheduled for restart.
        """

        schedules_to_remove = []
        for restart_schedule in self.restart_schedules:
            if datetime.now() < restart_schedule.restart_at:
                continue
            self.daq_job_threads.append(
                restart_daq_job(
                    restart_schedule.daq_job_type,
                    restart_schedule.daq_job_config,
                    self.config,
                )
            )

            # Update restart stats
            self.get_daq_job_stats(
                self.daq_job_stats, restart_schedule.daq_job_type
            ).restart_stats.increase()
            schedules_to_remove.append(restart_schedule)

        # Remove processed schedules
        self.restart_schedules = [
            x for x in self.restart_schedules if x not in schedules_to_remove
        ]

    def get_supervisor_messages(self) -> list[DAQJobMessage]:
        """
        Gets the supervisor messages to be sent to the DAQ jobs.

        Returns:
            list[DAQJobMessage]: List of supervisor messages.
        """

        messages = []

        # Send stats message
        if datetime.now() > self._last_stats_message_time + timedelta(
            seconds=DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS
        ):
            self._last_stats_message_time = datetime.now()
            messages.append(
                DAQJobMessageStats(
                    stats=self.daq_job_stats,
                    daq_job_info=self._get_supervisor_daq_job_info(),
                )
            )
        return messages

    def get_daq_job_stats(
        self, daq_job_stats: DAQJobStatsDict, daq_job_type: type[DAQJob]
    ) -> DAQJobStats:
        if daq_job_type not in daq_job_stats:
            daq_job_stats[daq_job_type] = DAQJobStats()
        return daq_job_stats[daq_job_type]

    def get_messages_from_daq_jobs(self) -> list[DAQJobMessage]:
        res = []
        for thread in self.daq_job_threads:
            try:
                while True:
                    msg = thread.daq_job.message_out.get_nowait()
                    if msg.daq_job_info is None:
                        msg.daq_job_info = thread.daq_job.info
                    res.append(msg)
                    # Update stats
                    self.get_daq_job_stats(
                        self.daq_job_stats, type(thread.daq_job)
                    ).message_out_stats.increase()
            except Empty:
                pass
        return res

    def send_messages_to_daq_jobs(self, daq_messages: list[DAQJobMessage]):
        """
        Sends messages to the DAQ jobs.

        Args:
            daq_messages (list[DAQJobMessage]): List of messages to send.
        """

        for message in daq_messages:
            for daq_job_thread in self.daq_job_threads:
                daq_job = daq_job_thread.daq_job
                # Send if message is allowed for this DAQ Job
                if any(
                    isinstance(message, msg_type)
                    for msg_type in daq_job.allowed_message_in_types
                ):
                    # Drop message type that is not supported by DAQJobStore
                    if isinstance(daq_job, DAQJobStore) and not daq_job.can_store(
                        message
                    ):
                        continue
                    daq_job.message_in.put(
                        message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT
                    )

                    # Update stats
                    self.get_daq_job_stats(
                        self.daq_job_stats, type(daq_job)
                    ).message_in_stats.increase()

    def warn_for_lack_of_daq_jobs(self):
        DAQ_JOB_ABSENT_WARNINGS = {
            DAQJobStore: "No store job found, data will not be stored",
            DAQJobAlert: "No alert job found, alerts will not be sent",
        }

        for daq_job_type, warning_message in DAQ_JOB_ABSENT_WARNINGS.items():
            if not any(
                x for x in self.daq_job_threads if isinstance(x.daq_job, daq_job_type)
            ):
                self._logger.warning(warning_message)

    def _load_supervisor_config(self):
        if not os.path.exists(SUPERVISOR_CONFIG_FILE_PATH):
            self._logger.warning(
                f"No supervisor config file found at '{SUPERVISOR_CONFIG_FILE_PATH}', using default config"
            )
            return SupervisorConfig(supervisor_id=platform.node())

        with open(SUPERVISOR_CONFIG_FILE_PATH, "rb") as f:
            return msgspec.toml.decode(f.read(), type=SupervisorConfig)

    def _get_supervisor_daq_job_info(self):
        return DAQJobInfo(
            daq_job_type="Supervisor",
            daq_job_class_name="Supervisor",
            supervisor_config=self.config,
            unique_id=self.config.supervisor_id,
            instance_id=0,
        )

Supervisor class responsible for managing DAQ job threads, handling their lifecycle, and facilitating communication between them.

Attributes

daq_job_threads : list[DAQJobThread]
List of DAQ job threads managed by the supervisor.
daq_job_stats : DAQJobStatsDict
Dictionary holding statistics for each DAQ job type.
restart_schedules : list[RestartDAQJobSchedule]
List of schedules for restarting DAQ jobs.
_logger : logging.Logger
Logger instance for logging supervisor activities.
_last_stats_message_time : datetime
Last time a stats message was sent.

Class variables

var daq_job_stats : Dict[type[DAQJob], DAQJobStats]
var daq_job_threads : list[DAQJobThread]
var restart_schedules : list[RestartDAQJobSchedule]

Methods

def get_daq_job_stats(self,
daq_job_stats: Dict[type[DAQJob], DAQJobStats],
daq_job_type: type[DAQJob]) ‑> DAQJobStats
Expand source code
def get_daq_job_stats(
    self, daq_job_stats: DAQJobStatsDict, daq_job_type: type[DAQJob]
) -> DAQJobStats:
    if daq_job_type not in daq_job_stats:
        daq_job_stats[daq_job_type] = DAQJobStats()
    return daq_job_stats[daq_job_type]
def get_messages_from_daq_jobs(self) ‑> list[DAQJobMessage]
Expand source code
def get_messages_from_daq_jobs(self) -> list[DAQJobMessage]:
    res = []
    for thread in self.daq_job_threads:
        try:
            while True:
                msg = thread.daq_job.message_out.get_nowait()
                if msg.daq_job_info is None:
                    msg.daq_job_info = thread.daq_job.info
                res.append(msg)
                # Update stats
                self.get_daq_job_stats(
                    self.daq_job_stats, type(thread.daq_job)
                ).message_out_stats.increase()
        except Empty:
            pass
    return res
def get_restart_schedules(self,
dead_threads: list[DAQJobThread])
Expand source code
def get_restart_schedules(self, dead_threads: list[DAQJobThread]):
    """
    Gets the restart schedules for the dead threads.

    Args:
        dead_threads (list[DAQJobThread]): List of dead threads.

    Returns:
        list[RestartDAQJobSchedule]: List of restart schedules.
    """

    res = []
    for thread in dead_threads:
        restart_offset = getattr(thread.daq_job, "restart_offset", None)
        if not isinstance(restart_offset, timedelta):
            restart_offset = timedelta(seconds=0)
        else:
            self._logger.info(
                f"Scheduling restart of {type(thread.daq_job).__name__} in {restart_offset.total_seconds()} seconds"
            )
        res.append(
            RestartDAQJobSchedule(
                daq_job_type=type(thread.daq_job),
                daq_job_config=thread.daq_job.config,
                restart_at=datetime.now() + restart_offset,
            )
        )
        thread.daq_job.free()
    return res

Gets the restart schedules for the dead threads.

Args

dead_threads : list[DAQJobThread]
List of dead threads.

Returns

list[RestartDAQJobSchedule]
List of restart schedules.
def get_supervisor_messages(self) ‑> list[DAQJobMessage]
Expand source code
def get_supervisor_messages(self) -> list[DAQJobMessage]:
    """
    Gets the supervisor messages to be sent to the DAQ jobs.

    Returns:
        list[DAQJobMessage]: List of supervisor messages.
    """

    messages = []

    # Send stats message
    if datetime.now() > self._last_stats_message_time + timedelta(
        seconds=DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS
    ):
        self._last_stats_message_time = datetime.now()
        messages.append(
            DAQJobMessageStats(
                stats=self.daq_job_stats,
                daq_job_info=self._get_supervisor_daq_job_info(),
            )
        )
    return messages

Gets the supervisor messages to be sent to the DAQ jobs.

Returns

list[DAQJobMessage]
List of supervisor messages.
def handle_thread_alive_stats(self,
dead_threads: list[DAQJobThread])
Expand source code
def handle_thread_alive_stats(self, dead_threads: list[DAQJobThread]):
    """
    Handles the alive stats for the dead threads.

    Args:
        dead_threads (list[DAQJobThread]): List of dead threads.
    """

    for thread in self.daq_job_threads:
        if datetime.now() - thread.start_time > timedelta(
            seconds=DAQ_JOB_MARK_AS_ALIVE_TIME_SECONDS
        ):
            self.get_daq_job_stats(
                self.daq_job_stats, type(thread.daq_job)
            ).is_alive = True

    for thread in dead_threads:
        self.get_daq_job_stats(
            self.daq_job_stats, type(thread.daq_job)
        ).is_alive = False

Handles the alive stats for the dead threads.

Args

dead_threads : list[DAQJobThread]
List of dead threads.
def init(self)
Expand source code
def init(self):
    """
    Initializes the supervisor, loads configuration, starts DAQ job threads, and warns for lack of DAQ jobs.

    You should call this method after creating a new instance of the Supervisor class.
    """

    self._logger = logging.getLogger()
    self.config = self._load_supervisor_config()

    # Change logging name based on supervisor id
    self._logger.name = f"Supervisor({self.config.supervisor_id})"

    self.restart_schedules = []
    self.daq_job_threads = self.start_daq_job_threads()
    self.daq_job_stats: DAQJobStatsDict = {
        type(thread.daq_job): DAQJobStats() for thread in self.daq_job_threads
    }
    self.warn_for_lack_of_daq_jobs()

    self._last_stats_message_time = datetime.now() - timedelta(
        seconds=DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS
    )

Initializes the supervisor, loads configuration, starts DAQ job threads, and warns for lack of DAQ jobs.

You should call this method after creating a new instance of the Supervisor class.

def loop(self)
Expand source code
def loop(self):
    """
    A single iteration of the supervisor's main loop.
    """

    # Remove dead threads
    dead_threads = [t for t in self.daq_job_threads if not t.thread.is_alive()]
    # Clean up dead threads
    self.daq_job_threads = [
        t for t in self.daq_job_threads if t not in dead_threads
    ]

    # Get restart schedules for dead jobs
    self.restart_schedules.extend(self.get_restart_schedules(dead_threads))

    # Restart jobs that have stopped or are scheduled to restart
    self.restart_daq_jobs()

    # Handle thread alive stats for dead & alive threads
    self.handle_thread_alive_stats(dead_threads)

    # Get messages from DAQ Jobs
    daq_messages_out = self.get_messages_from_daq_jobs()

    # Add supervisor messages
    daq_messages_out.extend(self.get_supervisor_messages())

    # Send messages to appropriate DAQ Jobs
    self.send_messages_to_daq_jobs(daq_messages_out)

A single iteration of the supervisor's main loop.

def restart_daq_jobs(self)
Expand source code
def restart_daq_jobs(self):
    """
    Restarts the DAQ jobs that have been scheduled for restart.
    """

    schedules_to_remove = []
    for restart_schedule in self.restart_schedules:
        if datetime.now() < restart_schedule.restart_at:
            continue
        self.daq_job_threads.append(
            restart_daq_job(
                restart_schedule.daq_job_type,
                restart_schedule.daq_job_config,
                self.config,
            )
        )

        # Update restart stats
        self.get_daq_job_stats(
            self.daq_job_stats, restart_schedule.daq_job_type
        ).restart_stats.increase()
        schedules_to_remove.append(restart_schedule)

    # Remove processed schedules
    self.restart_schedules = [
        x for x in self.restart_schedules if x not in schedules_to_remove
    ]

Restarts the DAQ jobs that have been scheduled for restart.

def run(self)
Expand source code
def run(self):
    """
    Main loop that continuously runs the supervisor, handling job restarts and message passing.
    """
    while True:
        try:
            self.loop()
            time.sleep(DAQ_SUPERVISOR_SLEEP_TIME_SECONDS)
        except KeyboardInterrupt:
            self._logger.warning("KeyboardInterrupt received, cleaning up")
            for daq_job_thread in self.daq_job_threads:
                daq_job_thread.daq_job.__del__()
            break

Main loop that continuously runs the supervisor, handling job restarts and message passing.

def send_messages_to_daq_jobs(self,
daq_messages: list[DAQJobMessage])
Expand source code
def send_messages_to_daq_jobs(self, daq_messages: list[DAQJobMessage]):
    """
    Sends messages to the DAQ jobs.

    Args:
        daq_messages (list[DAQJobMessage]): List of messages to send.
    """

    for message in daq_messages:
        for daq_job_thread in self.daq_job_threads:
            daq_job = daq_job_thread.daq_job
            # Send if message is allowed for this DAQ Job
            if any(
                isinstance(message, msg_type)
                for msg_type in daq_job.allowed_message_in_types
            ):
                # Drop message type that is not supported by DAQJobStore
                if isinstance(daq_job, DAQJobStore) and not daq_job.can_store(
                    message
                ):
                    continue
                daq_job.message_in.put(
                    message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT
                )

                # Update stats
                self.get_daq_job_stats(
                    self.daq_job_stats, type(daq_job)
                ).message_in_stats.increase()

Sends messages to the DAQ jobs.

Args

daq_messages : list[DAQJobMessage]
List of messages to send.
def start_daq_job_threads(self) ‑> list[DAQJobThread]
Expand source code
def start_daq_job_threads(self) -> list[DAQJobThread]:
    return start_daq_jobs(load_daq_jobs("configs/", self.config))
def warn_for_lack_of_daq_jobs(self)
Expand source code
def warn_for_lack_of_daq_jobs(self):
    DAQ_JOB_ABSENT_WARNINGS = {
        DAQJobStore: "No store job found, data will not be stored",
        DAQJobAlert: "No alert job found, alerts will not be sent",
    }

    for daq_job_type, warning_message in DAQ_JOB_ABSENT_WARNINGS.items():
        if not any(
            x for x in self.daq_job_threads if isinstance(x.daq_job, daq_job_type)
        ):
            self._logger.warning(warning_message)