Module enrgdaq.daq.jobs.caen.n1081b
Classes
class DAQJobN1081B (config: DAQJobN1081BConfig,
**kwargs)-
Expand source code
class DAQJobN1081B(DAQJob): config_type = DAQJobN1081BConfig device: N1081B config: DAQJobN1081BConfig def __init__(self, config: DAQJobN1081BConfig, **kwargs): super().__init__(config, **kwargs) self.device = N1081BPatched(f"{config.host}:{config.port}?") for section in config.sections_to_store: if section not in N1081B.Section.__members__: raise Exception(f"Invalid section: {section}") def handle_message(self, message: DAQJobMessage): super().handle_message(message) # Do not handle the rest of the messages if the connection is not established if not self._is_connected(): return False def start(self): while True: self.consume() # Log in if not connected if not self._is_connected(): self._logger.info("Connecting to the device...") self._connect_to_device() self._logger.info("Connected!") # Poll sections self._poll_sections() time.sleep(N1081B_QUERY_INTERVAL_SECONDS) def _is_connected(self) -> bool: return isinstance(self.device.ws, WebSocket) and self.device.ws.connected def _connect_to_device(self): if not self.device.connect(): raise Exception("Connection failed") if not self.device.login(self.config.password): raise Exception("Login failed") if isinstance(self.device.ws, WebSocket): self.device.ws.settimeout(N1081B_WEBSOCKET_TIMEOUT_SECONDS) else: raise Exception("Websocket not found") def _poll_sections(self): for section in self.config.sections_to_store: section = N1081B.Section[section] res = self.device.get_function_results(section) if not res: raise Exception("No results") data = res["data"] if "counters" not in data: raise Exception(f"No counters in section {section}") self._send_store_message(data, section.name) def _send_store_message(self, data: dict, section): keys = ["timestamp", *[f"lemo_{x['lemo']}" for x in data["counters"]]] values = [ get_now_unix_timestamp_ms(), # unix timestamp in milliseconds *[x["value"] for x in data["counters"]], ] self._put_message_out( DAQJobMessageStoreTabular( store_config=self.config.store_config, tag=section, keys=keys, data=[values], ) )
DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.
Attributes
allowed_message_in_types
:list[type[DAQJobMessage]]
- List of allowed message types for input.
config_type
:Any
- Type of the configuration.
config
:Any
- Configuration object.
message_in
:Queue[DAQJobMessage]
- Queue for incoming messages.
message_out
:Queue[DAQJobMessage]
- Queue for outgoing messages.
instance_id
:int
- Unique instance identifier.
unique_id
:str
- Unique identifier for the job.
restart_offset
:timedelta
- Offset for restarting the job.
info
:DAQJobInfo
- Information about the job.
_has_been_freed
:bool
- Flag indicating if the job has been freed.
_logger
:logging.Logger
- Logger instance for the job.
Ancestors
Class variables
var config : DAQJobN1081BConfig
var config_type : Any
-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
var device : N1081B.N1081B
Methods
def start(self)
-
Expand source code
def start(self): while True: self.consume() # Log in if not connected if not self._is_connected(): self._logger.info("Connecting to the device...") self._connect_to_device() self._logger.info("Connected!") # Poll sections self._poll_sections() time.sleep(N1081B_QUERY_INTERVAL_SECONDS)
Inherited members
class DAQJobN1081BConfig (store_config: DAQJobStoreConfig,
host: str,
port: str,
password: str,
sections_to_store: list[str],
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)-
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig): host: str port: str password: str sections_to_store: list[str]
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Ancestors
- StorableDAQJobConfig
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var host : str
-
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig): host: str port: str password: str sections_to_store: list[str]
var password : str
-
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig): host: str port: str password: str sections_to_store: list[str]
var port : str
-
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig): host: str port: str password: str sections_to_store: list[str]
var sections_to_store : list[str]
-
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig): host: str port: str password: str sections_to_store: list[str]
class N1081BPatched (ip)
-
Expand source code
class N1081BPatched(N1081B): def __init__(self, ip): super().__init__(ip) def connect(self): self.ws = create_connection( self.API_ENDPOINT, timeout=N1081B_WEBSOCKET_TIMEOUT_SECONDS ) return self.ws.connected
Ancestors
- N1081B.N1081B
Methods
def connect(self)
-
Expand source code
def connect(self): self.ws = create_connection( self.API_ENDPOINT, timeout=N1081B_WEBSOCKET_TIMEOUT_SECONDS ) return self.ws.connected