Module enrgdaq.daq.jobs.store.redis
Classes
class DAQJobStoreRedis (config: DAQJobStoreRedisConfig,
**kwargs)-
Expand source code
class DAQJobStoreRedis(DAQJobStore): config_type = DAQJobStoreRedisConfig allowed_store_config_types = [DAQJobStoreConfigRedis] allowed_message_in_types = [DAQJobMessageStoreTabular] _write_queue: deque[RedisWriteQueueItem] _last_flush_date: datetime _connection: Optional[redis.Redis] _ts: Optional[TimeSeries] def __init__(self, config: DAQJobStoreRedisConfig, **kwargs): super().__init__(config, **kwargs) self._write_queue = deque() self._last_flush_date = datetime.now() self._connection = None def start(self): self._connection = redis.Redis( host=self.config.host, port=self.config.port, db=self.config.db, password=self.config.password, ) try: self._ts = self._connection.ts() except Exception as ex: self._logger.error("Timeseries not supported by Redis server", exc_info=ex) self._ts = None super().start() def handle_message(self, message: DAQJobMessageStoreTabular) -> bool: if not super().handle_message(message): return False store_config = cast(DAQJobStoreConfigRedis, message.store_config.redis) data = {} # Add data to data dict that we can add to Redis for i, row in enumerate(message.keys): data[row] = [x[i] for x in message.data] # Append rows to write_queue for row in message.data: self._write_queue.append( RedisWriteQueueItem(store_config, data, message.tag) ) return True def store_loop(self): assert self._connection is not None while self._write_queue: msg = self._write_queue.popleft() if msg.store_config.use_timeseries and self._ts is None: self._logger.warning( "Trying to store data in timeseries, but timeseries is not supported by Redis server" ) return key_expiration = None if msg.store_config.key_expiration_days is not None: key_expiration = timedelta(days=msg.store_config.key_expiration_days) # Append item to key in redis for i, item in enumerate(msg.data.items()): key, values = item tag = "" if msg.tag is None else f".{msg.tag}" item_key = f"{msg.store_config.key}{tag}.{key}" if msg.store_config.use_timeseries: # Use Redis TimeSeries if requested assert self._ts is not None # Create TimeSeries key if it doesn't exist if not self._connection.exists(item_key) and key != "timestamp": retention_msecs = None if msg.store_config.key_expiration_days is not None: retention_msecs = int( timedelta( days=msg.store_config.key_expiration_days ).total_seconds() * 1000 ) self._ts.create( item_key, retention_msecs=retention_msecs, labels={"key": msg.store_config.key} | ({"tag": msg.tag} if msg.tag else {}), ) if "timestamp" not in msg.data: self._logger.warning( "Message data does not contain a timestamp, skipping" ) return self._ts.madd( [ (item_key, msg.data["timestamp"][i], value) for i, value in enumerate(values) ] ) else: # Add date to key if expiration is set if key_expiration is not None: item_key += ":" + datetime.now().strftime("%Y-%m-%d") item_exists = self._connection.exists(item_key) self._connection.rpush(item_key, *values) # Set expiration if it was newly created if not item_exists and key_expiration is not None: self._connection.expire(item_key, key_expiration) def __del__(self): try: if self._connection is not None: self._connection.close() except redis.exceptions.ConnectionError: pass return super().__del__()
DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.
Attributes
allowed_store_config_types
:list
- A list of allowed store configuration types.
Ancestors
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]
var allowed_store_config_types : list
var config_type : Any
-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Methods
def store_loop(self)
-
Expand source code
def store_loop(self): assert self._connection is not None while self._write_queue: msg = self._write_queue.popleft() if msg.store_config.use_timeseries and self._ts is None: self._logger.warning( "Trying to store data in timeseries, but timeseries is not supported by Redis server" ) return key_expiration = None if msg.store_config.key_expiration_days is not None: key_expiration = timedelta(days=msg.store_config.key_expiration_days) # Append item to key in redis for i, item in enumerate(msg.data.items()): key, values = item tag = "" if msg.tag is None else f".{msg.tag}" item_key = f"{msg.store_config.key}{tag}.{key}" if msg.store_config.use_timeseries: # Use Redis TimeSeries if requested assert self._ts is not None # Create TimeSeries key if it doesn't exist if not self._connection.exists(item_key) and key != "timestamp": retention_msecs = None if msg.store_config.key_expiration_days is not None: retention_msecs = int( timedelta( days=msg.store_config.key_expiration_days ).total_seconds() * 1000 ) self._ts.create( item_key, retention_msecs=retention_msecs, labels={"key": msg.store_config.key} | ({"tag": msg.tag} if msg.tag else {}), ) if "timestamp" not in msg.data: self._logger.warning( "Message data does not contain a timestamp, skipping" ) return self._ts.madd( [ (item_key, msg.data["timestamp"][i], value) for i, value in enumerate(values) ] ) else: # Add date to key if expiration is set if key_expiration is not None: item_key += ":" + datetime.now().strftime("%Y-%m-%d") item_exists = self._connection.exists(item_key) self._connection.rpush(item_key, *values) # Set expiration if it was newly created if not item_exists and key_expiration is not None: self._connection.expire(item_key, key_expiration)
Inherited members
class DAQJobStoreRedisConfig (host: str,
port: int = 6379,
db: int = 0,
password: str | None = None,
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)-
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig): host: str port: int = 6379 db: int = 0 password: Optional[str] = None
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var db : int
-
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig): host: str port: int = 6379 db: int = 0 password: Optional[str] = None
var host : str
-
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig): host: str port: int = 6379 db: int = 0 password: Optional[str] = None
var password : str | None
-
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig): host: str port: int = 6379 db: int = 0 password: Optional[str] = None
var port : int
-
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig): host: str port: int = 6379 db: int = 0 password: Optional[str] = None
class RedisWriteQueueItem (store_config: DAQJobStoreConfigRedis,
data: dict[str, list[typing.Any]],
tag: str | None)-
Expand source code
@dataclass class RedisWriteQueueItem: store_config: DAQJobStoreConfigRedis data: dict[str, list[Any]] tag: Optional[str]
RedisWriteQueueItem(store_config: enrgdaq.daq.store.models.DAQJobStoreConfigRedis, data: dict[str, list[typing.Any]], tag: Optional[str])
Class variables
var data : dict[str, list[typing.Any]]
var store_config : DAQJobStoreConfigRedis
var tag : str | None