Module enrgdaq.daq.jobs.caen.n1081b

Classes

class DAQJobN1081B (config: DAQJobN1081BConfig,
**kwargs)
Expand source code
class DAQJobN1081B(DAQJob):
    config_type = DAQJobN1081BConfig
    device: N1081B
    config: DAQJobN1081BConfig

    def __init__(self, config: DAQJobN1081BConfig, **kwargs):
        super().__init__(config, **kwargs)

        self.device = N1081BPatched(f"{config.host}:{config.port}?")

        for section in config.sections_to_store:
            if section not in N1081B.Section.__members__:
                raise Exception(f"Invalid section: {section}")

    def handle_message(self, message: DAQJobMessage):
        super().handle_message(message)

        # Do not handle the rest of the messages if the connection is not established
        if not self._is_connected():
            return False

    def start(self):
        while True:
            self.consume()

            # Log in if not connected
            if not self._is_connected():
                self._logger.info("Connecting to the device...")
                self._connect_to_device()
                self._logger.info("Connected!")

            # Poll sections
            self._poll_sections()

            time.sleep(N1081B_QUERY_INTERVAL_SECONDS)

    def _is_connected(self) -> bool:
        return isinstance(self.device.ws, WebSocket) and self.device.ws.connected

    def _connect_to_device(self):
        if not self.device.connect():
            raise Exception("Connection failed")

        if not self.device.login(self.config.password):
            raise Exception("Login failed")

        if isinstance(self.device.ws, WebSocket):
            self.device.ws.settimeout(N1081B_WEBSOCKET_TIMEOUT_SECONDS)
        else:
            raise Exception("Websocket not found")

    def _poll_sections(self):
        for section in self.config.sections_to_store:
            section = N1081B.Section[section]

            res = self.device.get_function_results(section)
            if not res:
                raise Exception("No results")

            data = res["data"]
            if "counters" not in data:
                raise Exception(f"No counters in section {section}")

            self._send_store_message(data, section.name)

    def _send_store_message(self, data: dict, section):
        keys = ["timestamp", *[f"lemo_{x['lemo']}" for x in data["counters"]]]
        values = [
            get_now_unix_timestamp_ms(),  # unix timestamp in milliseconds
            *[x["value"] for x in data["counters"]],
        ]
        self._put_message_out(
            DAQJobMessageStoreTabular(
                store_config=self.config.store_config,
                tag=section,
                keys=keys,
                data=[values],
            )
        )

DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.

Attributes

allowed_message_in_types : list[type[DAQJobMessage]]
List of allowed message types for input.
config_type : Any
Type of the configuration.
config : Any
Configuration object.
message_in : Queue[DAQJobMessage]
Queue for incoming messages.
message_out : Queue[DAQJobMessage]
Queue for outgoing messages.
instance_id : int
Unique instance identifier.
unique_id : str
Unique identifier for the job.
restart_offset : timedelta
Offset for restarting the job.
info : DAQJobInfo
Information about the job.
_has_been_freed : bool
Flag indicating if the job has been freed.
_logger : logging.Logger
Logger instance for the job.

Ancestors

Class variables

var configDAQJobN1081BConfig
var config_type : Any

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.
var device : N1081B.N1081B

Methods

def start(self)
Expand source code
def start(self):
    while True:
        self.consume()

        # Log in if not connected
        if not self._is_connected():
            self._logger.info("Connecting to the device...")
            self._connect_to_device()
            self._logger.info("Connected!")

        # Poll sections
        self._poll_sections()

        time.sleep(N1081B_QUERY_INTERVAL_SECONDS)

Inherited members

class DAQJobN1081BConfig (store_config: DAQJobStoreConfig,
host: str,
port: str,
password: str,
sections_to_store: list[str],
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig):
    host: str
    port: str
    password: str
    sections_to_store: list[str]

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.

Ancestors

Instance variables

var host : str
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig):
    host: str
    port: str
    password: str
    sections_to_store: list[str]
var password : str
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig):
    host: str
    port: str
    password: str
    sections_to_store: list[str]
var port : str
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig):
    host: str
    port: str
    password: str
    sections_to_store: list[str]
var sections_to_store : list[str]
Expand source code
class DAQJobN1081BConfig(StorableDAQJobConfig):
    host: str
    port: str
    password: str
    sections_to_store: list[str]
class N1081BPatched (ip)
Expand source code
class N1081BPatched(N1081B):
    def __init__(self, ip):
        super().__init__(ip)

    def connect(self):
        self.ws = create_connection(
            self.API_ENDPOINT, timeout=N1081B_WEBSOCKET_TIMEOUT_SECONDS
        )
        return self.ws.connected

Ancestors

  • N1081B.N1081B

Methods

def connect(self)
Expand source code
def connect(self):
    self.ws = create_connection(
        self.API_ENDPOINT, timeout=N1081B_WEBSOCKET_TIMEOUT_SECONDS
    )
    return self.ws.connected