Module enrgdaq.daq.daq_job

Functions

def build_daq_job(toml_config: bytes,
supervisor_config: SupervisorConfig) ‑> DAQJob
Expand source code
def build_daq_job(toml_config: bytes, supervisor_config: SupervisorConfig) -> DAQJob:
    generic_daq_job_config = msgspec.toml.decode(toml_config, type=DAQJobConfig)
    daq_job_class = get_daq_job_class(
        generic_daq_job_config.daq_job_type, warn_deprecated=True
    )

    if daq_job_class is None:
        raise Exception(f"Invalid DAQ job type: {generic_daq_job_config.daq_job_type}")

    # Get DAQ config clase based on daq_job_type
    daq_job_config_class: DAQJobConfig = daq_job_class.config_type

    # Load the config in
    config = msgspec.toml.decode(toml_config, type=daq_job_config_class)

    return daq_job_class(config, supervisor_config=supervisor_config.clone())
def load_daq_jobs(job_config_dir: str,
supervisor_config: SupervisorConfig) ‑> list[DAQJob]
Expand source code
def load_daq_jobs(
    job_config_dir: str, supervisor_config: SupervisorConfig
) -> list[DAQJob]:
    jobs = []
    job_files = glob.glob(os.path.join(job_config_dir, "*.toml"))
    for job_file in job_files:
        # Skip the supervisor config file
        if Path(job_file) == Path(SUPERVISOR_CONFIG_FILE_PATH):
            continue

        with open(job_file, "rb") as f:
            job_config_raw = f.read()

        jobs.append(build_daq_job(job_config_raw, supervisor_config))

    return jobs
def restart_daq_job(daq_job_type: type[DAQJob],
daq_job_config: DAQJobConfig,
supervisor_config: SupervisorConfig) ‑> DAQJobThread
Expand source code
def restart_daq_job(
    daq_job_type: type[DAQJob],
    daq_job_config: DAQJobConfig,
    supervisor_config: SupervisorConfig,
) -> DAQJobThread:
    logging.info(f"Restarting {daq_job_type.__name__}")
    new_daq_job = daq_job_type(
        daq_job_config, supervisor_config=supervisor_config.clone()
    )
    thread = threading.Thread(target=new_daq_job.start, daemon=True)
    thread.start()
    return DAQJobThread(new_daq_job, thread)
def start_daq_job(daq_job: DAQJob) ‑> DAQJobThread
Expand source code
def start_daq_job(daq_job: DAQJob) -> DAQJobThread:
    logging.info(f"Starting {type(daq_job).__name__}")
    thread = threading.Thread(target=daq_job.start, daemon=True)
    thread.start()

    return DAQJobThread(daq_job, thread)
def start_daq_jobs(daq_jobs: list[DAQJob]) ‑> list[DAQJobThread]
Expand source code
def start_daq_jobs(daq_jobs: list[DAQJob]) -> list[DAQJobThread]:
    threads = []
    for daq_job in daq_jobs:
        threads.append(start_daq_job(daq_job))

    return threads