Module enrgdaq.daq.jobs.store.mysql
Classes
class DAQJobStoreMySQL (config: DAQJobStoreMySQLConfig,
**kwargs)-
Expand source code
class DAQJobStoreMySQL(DAQJobStore): config_type = DAQJobStoreMySQLConfig allowed_store_config_types = [DAQJobStoreConfigMySQL] allowed_message_in_types = [DAQJobMessageStore] _write_queue: deque[MySQLWriteQueueItem] _last_flush_date: datetime _connection: Optional[pymysql.connections.Connection] def __init__(self, config: DAQJobStoreMySQLConfig, **kwargs): super().__init__(config, **kwargs) self._write_queue = deque() self._last_flush_date = datetime.now() self._connection = None def start(self): self._connection = pymysql.connect( host=self.config.host, user=self.config.user, port=self.config.port, password=self.config.password, database=self.config.database, ) super().start() def handle_message(self, message: DAQJobMessageStoreTabular) -> bool: if not super().handle_message(message): return False store_config = cast(DAQJobStoreConfigMySQL, message.store_config.mysql) # Append rows to write_queue for row in message.data: self._write_queue.append( MySQLWriteQueueItem(store_config.table_name, message.keys, row) ) return True def _flush(self, force=False): assert self._connection is not None if ( datetime.now() - self._last_flush_date ).total_seconds() < DAQ_JOB_STORE_MYSQL_FLUSH_INTERVAL_SECONDS and not force: return self._connection.commit() self._last_flush_date = datetime.now() def _sanitize_text(self, text: str) -> str: # replace anything but letters, numbers, and underscores with underscores return re.sub(r"[^a-zA-Z0-9_]", "_", text) def store_loop(self): assert self._connection is not None with self._connection.cursor() as cursor: while self._write_queue: item = self._write_queue.popleft() table_name = self._sanitize_text(item.table_name) keys = ",".join(self._sanitize_text(key) for key in item.keys) values = ",".join(["%s"] * len(item.keys)) query = f"INSERT INTO {table_name} ({keys}) VALUES ({values})" cursor.execute( query, tuple(item.rows), ) self._flush() def __del__(self): if self._connection is not None: self._flush(force=True) if self._connection.open: self._connection.close() return super().__del__()
DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.
Attributes
allowed_store_config_types
:list
- A list of allowed store configuration types.
Ancestors
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]
var allowed_store_config_types : list
var config_type : Any
-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Methods
def store_loop(self)
-
Expand source code
def store_loop(self): assert self._connection is not None with self._connection.cursor() as cursor: while self._write_queue: item = self._write_queue.popleft() table_name = self._sanitize_text(item.table_name) keys = ",".join(self._sanitize_text(key) for key in item.keys) values = ",".join(["%s"] * len(item.keys)) query = f"INSERT INTO {table_name} ({keys}) VALUES ({values})" cursor.execute( query, tuple(item.rows), ) self._flush()
Inherited members
class DAQJobStoreMySQLConfig (host: str,
user: str,
password: str,
database: str,
port: int = 3306,
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)-
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig): host: str user: str password: str database: str port: int = 3306
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var database : str
-
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig): host: str user: str password: str database: str port: int = 3306
var host : str
-
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig): host: str user: str password: str database: str port: int = 3306
var password : str
-
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig): host: str user: str password: str database: str port: int = 3306
var port : int
-
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig): host: str user: str password: str database: str port: int = 3306
var user : str
-
Expand source code
class DAQJobStoreMySQLConfig(DAQJobConfig): host: str user: str password: str database: str port: int = 3306
class MySQLWriteQueueItem (table_name: str, keys: list[str], rows: list[typing.Any])
-
Expand source code
@dataclass class MySQLWriteQueueItem: table_name: str keys: list[str] rows: list[Any]
MySQLWriteQueueItem(table_name: str, keys: list[str], rows: list[typing.Any])
Class variables
var keys : list[str]
var rows : list[typing.Any]
var table_name : str