Module enrgdaq.daq.daq_job
Functions
def build_daq_job(toml_config: bytes,
supervisor_info: SupervisorInfo) ‑> DAQJobProcess-
Expand source code
def build_daq_job(toml_config: bytes, supervisor_info: SupervisorInfo) -> DAQJobProcess: generic_daq_job_config = msgspec.toml.decode(toml_config, type=DAQJobConfig) daq_job_class = get_daq_job_class( generic_daq_job_config.daq_job_type, warn_deprecated=True ) if daq_job_class is None: raise Exception(f"Invalid DAQ job type: {generic_daq_job_config.daq_job_type}") # Get DAQ config clase based on daq_job_type daq_job_config_class: type[DAQJobConfig] = daq_job_class.config_type # Load the config in config = msgspec.toml.decode(toml_config, type=daq_job_config_class) return _create_daq_job_process( daq_job_class, config, supervisor_info, toml_config.decode() ) def load_daq_jobs(job_config_dir: str,
supervisor_info: SupervisorInfo) ‑> list[DAQJobProcess]-
Expand source code
def load_daq_jobs( job_config_dir: str, supervisor_info: SupervisorInfo ) -> list[DAQJobProcess]: jobs = [] job_files = glob.glob(os.path.join(job_config_dir, "*.toml")) for job_file in job_files: # Skip the supervisor config file if os.path.basename(job_file) == SUPERVISOR_CONFIG_FILE_NAME: continue with open(job_file, "rb") as f: job_config_raw = f.read() jobs.append(build_daq_job(job_config_raw, supervisor_info)) return jobs def rebuild_daq_job(daq_job_process: DAQJobProcess,
supervisor_info: SupervisorInfo) ‑> DAQJobProcess-
Expand source code
def rebuild_daq_job( daq_job_process: DAQJobProcess, supervisor_info: SupervisorInfo ) -> DAQJobProcess: return _create_daq_job_process( daq_job_process.daq_job_cls, daq_job_process.config, supervisor_info, log_queue=daq_job_process.log_queue, ) def start_daq_job(daq_job_process: DAQJobProcess) ‑> DAQJobProcess-
Expand source code
def start_daq_job(daq_job_process: DAQJobProcess) -> DAQJobProcess: logging.info(f"Starting {daq_job_process.daq_job_cls.__name__}") job_multiprocessing_method = getattr( daq_job_process.daq_job_cls, "multiprocessing_method", "default" ) # Use 'fork' method on Unix systems (including macOS) by default to avoid semaphore lock issues # when pickling/unpickling Queue objects during process spawn, but allow individual jobs to override if platform.system() in ["Darwin", "Linux"]: if job_multiprocessing_method == "spawn": # Use default Process (which will use spawn on macOS) process = Process(target=daq_job_process.start, daemon=True) elif job_multiprocessing_method == "fork": # Explicitly use fork context ctx = get_context("fork") process = ctx.Process(target=daq_job_process.start, daemon=True) else: # default behavior # Use fork for better compatibility with most DAQ jobs ctx = get_context("fork") process = ctx.Process(target=daq_job_process.start, daemon=True) else: # Use default Process on Windows (which doesn't support fork) process = Process(target=daq_job_process.start, daemon=True) process.start() daq_job_process.process = process # type: ignore try: daq_job_info_message = daq_job_process.job_started_queue.get(timeout=5000) if isinstance(daq_job_info_message, DAQJobMessageJobStarted): daq_job_process.daq_job_info = daq_job_info_message.daq_job_info else: raise Exception("Initial message of DAQJob was not DAQJobMessageJobStarted") except Exception as e: logging.error( f"Could not get DAQ job info for {daq_job_process.daq_job_cls.__name__}: {e}", exc_info=True, ) return daq_job_process def start_daq_jobs(daq_job_processes: list[DAQJobProcess]) ‑> list[DAQJobProcess]-
Expand source code
def start_daq_jobs(daq_job_processes: list[DAQJobProcess]) -> list[DAQJobProcess]: processes = [] for daq_job in daq_job_processes: processes.append(start_daq_job(daq_job)) return processes