Module enrgdaq.daq.jobs.store.redis

Classes

class DAQJobStoreRedis (config: DAQJobStoreRedisConfig,
**kwargs)
Expand source code
class DAQJobStoreRedis(DAQJobStore):
    config_type = DAQJobStoreRedisConfig
    allowed_store_config_types = [DAQJobStoreConfigRedis]
    allowed_message_in_types = [
        DAQJobMessageStoreTabular,
        DAQJobMessageStoreRaw,
        DAQJobMessageStorePyArrow,
    ]

    _write_queue: deque[RedisWriteQueueItem]
    _last_flush_date: datetime
    _connection: Optional[redis.Redis]
    _ts: Optional[TimeSeries]

    def __init__(self, config: DAQJobStoreRedisConfig, **kwargs):
        super().__init__(config, **kwargs)

        self._write_queue = deque()
        self._last_flush_date = datetime.now()
        self._connection = None

    def start(self):
        self._connection = redis.Redis(
            host=self.config.host,
            port=self.config.port,
            db=self.config.db,
            password=self.config.password,
        )
        try:
            self._ts = self._connection.ts()
        except Exception as ex:
            self._logger.error("Timeseries not supported by Redis server", exc_info=ex)
            self._ts = None

        super().start()

    def handle_message(
        self,
        message: DAQJobMessageStoreTabular
        | DAQJobMessageStoreRaw
        | DAQJobMessageStorePyArrow,
    ) -> bool:
        if not super().handle_message(message):
            return False

        store_config = cast(DAQJobStoreConfigRedis, message.store_config.redis)

        data: dict[str, list[Any]] | bytes = {}

        # Handle PyArrow messages
        if isinstance(message, DAQJobMessageStorePyArrow):
            table = message.get_table()
            message.release()
            if table.num_rows == 0:
                return True
            # Convert PyArrow table to column-based dictionary
            data = {col: table.column(col).to_pylist() for col in table.column_names}
            self._write_queue.append(
                RedisWriteQueueItem(store_config, data, message.tag)
            )
        elif isinstance(message, DAQJobMessageStoreTabular):
            # Add data to data dict that we can add to Redis
            for i, row in enumerate(message.keys):
                data[row] = [x[i] for x in message.data]
            for row in message.data:
                self._write_queue.append(
                    RedisWriteQueueItem(store_config, data, message.tag)
                )
        else:
            data = message.data
            self._write_queue.append(
                RedisWriteQueueItem(store_config, data, message.tag)
            )

        return True

    def store_loop(self):
        assert self._connection is not None
        while self._write_queue:
            msg = self._write_queue.popleft()
            if msg.store_config.use_timeseries and self._ts is None:
                self._logger.warning(
                    "Trying to store data in timeseries, but timeseries is not supported by Redis server"
                )
                return

            key_expiration = None
            if msg.store_config.key_expiration_days is not None:
                key_expiration = timedelta(days=msg.store_config.key_expiration_days)

            base_item_key = f"{msg.store_config.key}{f'.{msg.tag}' if msg.tag else ''}"
            if isinstance(msg.data, bytes):
                item_key = msg.store_config.key
                data = base64.b64encode(msg.data).decode("utf-8")
                self._connection.set(item_key, data, ex=key_expiration)
                self._logger.debug(f"Added {len(data)} bytes to {item_key}")
                continue
            if not isinstance(msg.data, dict):
                self._logger.error(
                    "msg.data must be a dict or bytes, but got " f"{type(msg.data)}"
                )
                continue

            batched_adds = []
            # Append item to key in redis
            for item in msg.data.items():
                key, values = item
                item_key = f"{base_item_key}.{key}"

                if not msg.store_config.use_timeseries:
                    # Add date to key if expiration is set
                    if key_expiration is not None:
                        item_key += ":" + datetime.now().strftime("%Y-%m-%d")

                    item_exists = self._connection.exists(item_key)
                    self._connection.rpush(item_key, *values)

                    # Set expiration if it was newly created
                    if not item_exists and key_expiration is not None:
                        self._connection.expire(item_key, key_expiration)

                    self._logger.debug(f"Added {len(values)} values to {item_key}")
                    continue

                # Save it via Redis TimeSeries
                assert self._ts is not None

                # Create TimeSeries key if it doesn't exist
                if not self._connection.exists(item_key) and key != "timestamp":
                    retention_msecs = None
                    if msg.store_config.key_expiration_days is not None:
                        retention_msecs = int(
                            timedelta(
                                days=msg.store_config.key_expiration_days
                            ).total_seconds()
                            * 1000
                        )
                    self._ts.create(
                        item_key,
                        retention_msecs=retention_msecs,
                        labels={"key": msg.store_config.key}
                        | ({"tag": msg.tag} if msg.tag else {}),
                    )
                if "timestamp" not in msg.data:
                    self._logger.warning(
                        "Message data does not contain a timestamp, skipping"
                    )
                    return

                batched_adds.extend(
                    [
                        (item_key, msg.data["timestamp"][i], value)
                        for i, value in enumerate(values)
                    ]
                )

            if self._ts is not None and len(batched_adds) > 0:
                self._ts.madd(batched_adds)

    def __del__(self):
        try:
            if self._connection is not None:
                self._connection.close()
        except redis.exceptions.ConnectionError:
            pass

        return super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores.

Ancestors

Class variables

var allowed_message_in_types
var allowed_store_config_types
var config_type : type[DAQJobConfig]

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Methods

def store_loop(self)
Expand source code
def store_loop(self):
    assert self._connection is not None
    while self._write_queue:
        msg = self._write_queue.popleft()
        if msg.store_config.use_timeseries and self._ts is None:
            self._logger.warning(
                "Trying to store data in timeseries, but timeseries is not supported by Redis server"
            )
            return

        key_expiration = None
        if msg.store_config.key_expiration_days is not None:
            key_expiration = timedelta(days=msg.store_config.key_expiration_days)

        base_item_key = f"{msg.store_config.key}{f'.{msg.tag}' if msg.tag else ''}"
        if isinstance(msg.data, bytes):
            item_key = msg.store_config.key
            data = base64.b64encode(msg.data).decode("utf-8")
            self._connection.set(item_key, data, ex=key_expiration)
            self._logger.debug(f"Added {len(data)} bytes to {item_key}")
            continue
        if not isinstance(msg.data, dict):
            self._logger.error(
                "msg.data must be a dict or bytes, but got " f"{type(msg.data)}"
            )
            continue

        batched_adds = []
        # Append item to key in redis
        for item in msg.data.items():
            key, values = item
            item_key = f"{base_item_key}.{key}"

            if not msg.store_config.use_timeseries:
                # Add date to key if expiration is set
                if key_expiration is not None:
                    item_key += ":" + datetime.now().strftime("%Y-%m-%d")

                item_exists = self._connection.exists(item_key)
                self._connection.rpush(item_key, *values)

                # Set expiration if it was newly created
                if not item_exists and key_expiration is not None:
                    self._connection.expire(item_key, key_expiration)

                self._logger.debug(f"Added {len(values)} values to {item_key}")
                continue

            # Save it via Redis TimeSeries
            assert self._ts is not None

            # Create TimeSeries key if it doesn't exist
            if not self._connection.exists(item_key) and key != "timestamp":
                retention_msecs = None
                if msg.store_config.key_expiration_days is not None:
                    retention_msecs = int(
                        timedelta(
                            days=msg.store_config.key_expiration_days
                        ).total_seconds()
                        * 1000
                    )
                self._ts.create(
                    item_key,
                    retention_msecs=retention_msecs,
                    labels={"key": msg.store_config.key}
                    | ({"tag": msg.tag} if msg.tag else {}),
                )
            if "timestamp" not in msg.data:
                self._logger.warning(
                    "Message data does not contain a timestamp, skipping"
                )
                return

            batched_adds.extend(
                [
                    (item_key, msg.data["timestamp"][i], value)
                    for i, value in enumerate(values)
                ]
            )

        if self._ts is not None and len(batched_adds) > 0:
            self._ts.madd(batched_adds)

Inherited members

class DAQJobStoreRedisConfig (host: str,
port: int = 6379,
db: int = 0,
password: str | None = None,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True,
topics_to_subscribe: list[str] = <factory>)
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig):
    host: str
    port: int = 6379
    db: int = 0
    password: Optional[str] = None

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Ancestors

Instance variables

var db : int
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig):
    host: str
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
var host : str
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig):
    host: str
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
var password : str | None
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig):
    host: str
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
var port : int
Expand source code
class DAQJobStoreRedisConfig(DAQJobConfig):
    host: str
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
class RedisWriteQueueItem (store_config: DAQJobStoreConfigRedis,
data: dict[str, list[typing.Any]] | bytes,
tag: str | None)
Expand source code
@dataclass
class RedisWriteQueueItem:
    store_config: DAQJobStoreConfigRedis
    data: dict[str, list[Any]] | bytes
    tag: Optional[str]

RedisWriteQueueItem(store_config: enrgdaq.daq.store.models.DAQJobStoreConfigRedis, data: dict[str, list[typing.Any]] | bytes, tag: Optional[str])

Instance variables

var data : dict[str, list[typing.Any]] | bytes
var store_configDAQJobStoreConfigRedis
var tag : str | None