Module enrgdaq.daq.jobs.store.mysql

Classes

class DAQJobStoreMySQL (config: DAQJobStoreMySQLConfig,
**kwargs)
Expand source code
class DAQJobStoreMySQL(DAQJobStore):
    config_type = DAQJobStoreMySQLConfig
    allowed_store_config_types = [DAQJobStoreConfigMySQL]
    allowed_message_in_types = [DAQJobMessageStore]

    _write_queue: deque[MySQLWriteQueueItem]
    _last_flush_date: datetime
    _connection: Optional[pymysql.connections.Connection]

    def __init__(self, config: DAQJobStoreMySQLConfig, **kwargs):
        super().__init__(config, **kwargs)

        self._write_queue = deque()
        self._last_flush_date = datetime.now()
        self._connection = None

    def start(self):
        self._connection = pymysql.connect(
            host=self.config.host,
            user=self.config.user,
            port=self.config.port,
            password=self.config.password,
            database=self.config.database,
        )
        super().start()

    def handle_message(self, message: DAQJobMessageStoreTabular) -> bool:
        if not super().handle_message(message):
            return False

        store_config = cast(DAQJobStoreConfigMySQL, message.store_config.mysql)

        # Append rows to write_queue
        for row in message.data:
            self._write_queue.append(
                MySQLWriteQueueItem(store_config.table_name, message.keys, row)
            )

        return True

    def _flush(self, force=False):
        assert self._connection is not None
        if (
            datetime.now() - self._last_flush_date
        ).total_seconds() < DAQ_JOB_STORE_MYSQL_FLUSH_INTERVAL_SECONDS and not force:
            return

        self._connection.commit()
        self._last_flush_date = datetime.now()

    def _sanitize_text(self, text: str) -> str:
        # replace anything but letters, numbers, and underscores with underscores
        return re.sub(r"[^a-zA-Z0-9_]", "_", text)

    def store_loop(self):
        assert self._connection is not None
        with self._connection.cursor() as cursor:
            while self._write_queue:
                item = self._write_queue.popleft()

                table_name = self._sanitize_text(item.table_name)
                keys = ",".join(self._sanitize_text(key) for key in item.keys)
                values = ",".join(["%s"] * len(item.keys))
                query = f"INSERT INTO {table_name} ({keys}) VALUES ({values})"
                cursor.execute(
                    query,
                    tuple(item.rows),
                )
            self._flush()

    def __del__(self):
        if self._connection is not None:
            self._flush(force=True)
            if self._connection.open:
                self._connection.close()

        return super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores.

Ancestors

Class variables

var allowed_message_in_types
var allowed_store_config_types
var config_type : type[DAQJobConfig]

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Methods

def store_loop(self)
Expand source code
def store_loop(self):
    assert self._connection is not None
    with self._connection.cursor() as cursor:
        while self._write_queue:
            item = self._write_queue.popleft()

            table_name = self._sanitize_text(item.table_name)
            keys = ",".join(self._sanitize_text(key) for key in item.keys)
            values = ",".join(["%s"] * len(item.keys))
            query = f"INSERT INTO {table_name} ({keys}) VALUES ({values})"
            cursor.execute(
                query,
                tuple(item.rows),
            )
        self._flush()

Inherited members

class DAQJobStoreMySQLConfig (host: str,
user: str,
password: str,
database: str,
port: int = 3306,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True,
topics_to_subscribe: list[str] = <factory>)
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Ancestors

Instance variables

var database : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var host : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var password : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var port : int
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
var user : str
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig):
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
class MySQLWriteQueueItem (table_name: str, keys: list[str], rows: list[typing.Any])
Expand source code
@dataclass
class MySQLWriteQueueItem:
    table_name: str
    keys: list[str]
    rows: list[Any]

MySQLWriteQueueItem(table_name: str, keys: list[str], rows: list[typing.Any])

Instance variables

var keys : list[str]
var rows : list[typing.Any]
var table_name : str