Module enrgdaq.daq.jobs.store.memory

Classes

class DAQJobStoreMemory (config: DAQJobStoreMemoryConfig,
**kwargs)
Expand source code
class DAQJobStoreMemory(DAQJobStore):
    config_type = DAQJobStoreMemoryConfig
    allowed_store_config_types = [DAQJobStoreConfigMemory]
    allowed_message_in_types = [DAQJobMessageStore]

    def __init__(self, config: DAQJobStoreMemoryConfig, **kwargs):
        super().__init__(config, **kwargs)
        self._memory = {}

    def handle_message(
        self, message: DAQJobMessageStoreTabular | DAQJobMessageStorePyArrow
    ) -> bool:
        if not super().handle_message(message):
            return False

        if self.config.void_data:
            return True

        timestamp = datetime.now()

        if isinstance(message, DAQJobMessageStorePyArrow):
            table = message.get_table()
            message.release()
            if table.num_rows == 0:
                return True
            self._memory[message.tag] = {
                "timestamp": timestamp,
                "keys": table.column_names,
                "table": table,
            }
        else:
            self._memory[message.tag] = {
                "timestamp": timestamp,
                "keys": message.keys,
                "rows": message.data,
            }
        return True

    def store_loop(self):
        pass

    def get_all(self):
        return self._memory

    def clear(self):
        self._memory.clear()

    def __del__(self):
        self.clear()
        return super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores.

Ancestors

Class variables

var allowed_message_in_types
var allowed_store_config_types
var config_type : type[DAQJobConfig]

Configuration options for storing DAQ jobs in memory.

Methods

def clear(self)
Expand source code
def clear(self):
    self._memory.clear()
def get_all(self)
Expand source code
def get_all(self):
    return self._memory
def store_loop(self)
Expand source code
def store_loop(self):
    pass

Inherited members

class DAQJobStoreMemoryConfig (dispose_after_n_entries: int | None = None,
void_data: bool | None = False,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)
Expand source code
class DAQJobStoreMemoryConfig(DAQJobConfig):
    """
    Configuration options for storing DAQ jobs in memory.
    """

    dispose_after_n_entries: Optional[int] = None
    void_data: Optional[bool] = False

Configuration options for storing DAQ jobs in memory.

Ancestors

Instance variables

var dispose_after_n_entries : int | None
Expand source code
class DAQJobStoreMemoryConfig(DAQJobConfig):
    """
    Configuration options for storing DAQ jobs in memory.
    """

    dispose_after_n_entries: Optional[int] = None
    void_data: Optional[bool] = False
var void_data : bool | None
Expand source code
class DAQJobStoreMemoryConfig(DAQJobConfig):
    """
    Configuration options for storing DAQ jobs in memory.
    """

    dispose_after_n_entries: Optional[int] = None
    void_data: Optional[bool] = False