Module enrgdaq.daq.jobs.store.mysql

Classes

class DAQJobStoreMySQL (config: DAQJobStoreMySQLConfig,
**kwargs)
Expand source code
class DAQJobStoreMySQL(DAQJobStore):
    config_type = DAQJobStoreMySQLConfig
    allowed_store_config_types = [DAQJobStoreConfigMySQL]
    allowed_message_in_types = [DAQJobMessageStore]

    _write_queue: deque[MySQLWriteQueueItem]
    _last_flush_date: datetime
    _connection: Optional[pymysql.connections.Connection]

    def __init__(self, config: DAQJobStoreMySQLConfig, **kwargs):
        super().__init__(config, **kwargs)

        self._write_queue = deque()
        self._last_flush_date = datetime.now()
        self._connection = None

    def start(self):
        self._connection = pymysql.connect(
            host=self.config.host,
            user=self.config.user,
            port=self.config.port,
            password=self.config.password,
            database=self.config.database,
        )
        super().start()

    def handle_message(self, message: DAQJobMessageStoreTabular) -> bool:
        if not super().handle_message(message):
            return False

        store_config = cast(DAQJobStoreConfigMySQL, message.store_config.mysql)

        # Append rows to write_queue
        for row in message.data:
            self._write_queue.append(
                MySQLWriteQueueItem(store_config.table_name, message.keys, row)
            )

        return True

    def _flush(self, force=False):
        assert self._connection is not None
        if (
            datetime.now() - self._last_flush_date
        ).total_seconds() < DAQ_JOB_STORE_MYSQL_FLUSH_INTERVAL_SECONDS and not force:
            return

        self._connection.commit()
        self._last_flush_date = datetime.now()

    def _sanitize_text(self, text: str) -> str:
        # replace anything but letters, numbers, and underscores with underscores
        return re.sub(r"[^a-zA-Z0-9_]", "_", text)

    def store_loop(self):
        assert self._connection is not None
        with self._connection.cursor() as cursor:
            while self._write_queue:
                item = self._write_queue.popleft()

                table_name = self._sanitize_text(item.table_name)
                keys = ",".join(self._sanitize_text(key) for key in item.keys)
                values = ",".join(["%s"] * len(item.keys))
                query = f"INSERT INTO {table_name} ({keys}) VALUES ({values})"
                cursor.execute(
                    query,
                    tuple(item.rows),
                )
            self._flush()

    def __del__(self):
        if self._connection is not None:
            self._flush(force=True)
            if self._connection.open:
                self._connection.close()

        return super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.

Attributes

allowed_store_config_types : list
A list of allowed store configuration types.

Ancestors

Class variables

var allowed_message_in_types : list[type[DAQJobMessage]]
var allowed_store_config_types : list
var config_type : Any

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.

Methods

def store_loop(self)
Expand source code
def store_loop(self):
    assert self._connection is not None
    with self._connection.cursor() as cursor:
        while self._write_queue:
            item = self._write_queue.popleft()

            table_name = self._sanitize_text(item.table_name)
            keys = ",".join(self._sanitize_text(key) for key in item.keys)
            values = ",".join(["%s"] * len(item.keys))
            query = f"INSERT INTO {table_name} ({keys}) VALUES ({values})"
            cursor.execute(
                query,
                tuple(item.rows),
            )
        self._flush()

Inherited members

class DAQJobStoreMySQLConfig (host: str,
user: str,
password: str,
database: str,
port: int = 3306,
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.

Ancestors

Instance variables

var database : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var host : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var password : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var port : int
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var user : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
class MySQLWriteQueueItem (table_name: str, keys: list[str], rows: list[typing.Any])
Expand source code
@dataclass
class MySQLWriteQueueItem:
    table_name: str
    keys: list[str]
    rows: list[Any]

MySQLWriteQueueItem(table_name: str, keys: list[str], rows: list[typing.Any])

Class variables

var keys : list[str]
var rows : list[typing.Any]
var table_name : str