Module enrgdaq.daq.types

Functions

def get_all_daq_job_types()
Expand source code
def get_all_daq_job_types():
    return all_subclasses(DAQJob)
def get_daq_job_class(daq_job_type: str, warn_deprecated: bool = False) ‑> type[DAQJob] | None
Expand source code
def get_daq_job_class(
    daq_job_type: str, warn_deprecated: bool = False
) -> Optional[type[DAQJob]]:
    """
    Gets the DAQJob class for a given DAQ job type.

    Args:
        daq_job_type (str): The type of the DAQ job.
        warn_deprecated (bool): Whether to warn if the DAQ job type is deprecated.

    Returns:
        Optional[type[DAQJob]]: The DAQJob class for the given DAQ job type, or None if not found.
    """

    daq_job_class = None
    if daq_job_type in DAQ_JOB_TYPE_TO_CLASS:
        daq_job_class = DAQ_JOB_TYPE_TO_CLASS[daq_job_type]
        if warn_deprecated:
            logging.warning(
                f"DAQ job type '{daq_job_type}' is deprecated, please use '{daq_job_class.__name__}' instead"
            )
    else:
        for daq_job in get_all_daq_job_types():
            if daq_job.__name__ == daq_job_type:
                daq_job_class = daq_job
    return daq_job_class

Gets the DAQJob class for a given DAQ job type.

Args

daq_job_type : str
The type of the DAQ job.
warn_deprecated : bool
Whether to warn if the DAQ job type is deprecated.

Returns

Optional[type[DAQJob]]
The DAQJob class for the given DAQ job type, or None if not found.