Module enrgdaq.daq.jobs.store.csv

Classes

class CSVFile (file: ,
last_flush_date: datetime.datetime,
write_queue: collections.deque[list[typing.Any]],
arrow_tables: list[pyarrow.lib.Table] = <factory>,
header_written: bool = False,
overwrite: bool | None = None)
Expand source code
@dataclass
class CSVFile:
    file: TextIO
    last_flush_date: datetime
    write_queue: deque[list[Any]]
    arrow_tables: list[pa.Table] = field(default_factory=list)
    header_written: bool = False
    overwrite: Optional[bool] = None

CSVFile(file: , last_flush_date: datetime.datetime, write_queue: collections.deque[list[typing.Any]], arrow_tables: list[pyarrow.lib.Table] = , header_written: bool = False, overwrite: Optional[bool] = None)

Instance variables

var arrow_tables : list[pyarrow.lib.Table]
var file
var header_written : bool
var last_flush_date : datetime.datetime
var overwrite : bool | None
var write_queue : collections.deque[list[typing.Any]]
class DAQJobStoreCSV (config: DAQJobStoreCSVConfig,
**kwargs)
Expand source code
class DAQJobStoreCSV(DAQJobStore):
    config_type = DAQJobStoreCSVConfig
    allowed_store_config_types = [DAQJobStoreConfigCSV]
    allowed_message_in_types = [DAQJobMessageStore]

    _open_csv_files: dict[str, CSVFile]

    def __init__(self, config: DAQJobStoreCSVConfig, **kwargs):
        super().__init__(config, **kwargs)

        self._open_csv_files = {}

    def handle_message(
        self, message: DAQJobMessageStoreTabular | DAQJobMessageStorePyArrow
    ) -> bool:
        if not super().handle_message(message):
            return False

        store_config = cast(DAQJobStoreConfigCSV, message.store_config.csv)
        file_path = modify_file_path(
            store_config.file_path, store_config.add_date, message.tag
        )
        file_path = os.path.join(self.config.out_dir, file_path)

        file, new_file = self._open_csv_file(
            file_path,
            store_config.overwrite,
            store_config.use_zstd,
        )
        if file.overwrite:
            file.write_queue.clear()

        # Handle PyArrow messages
        if isinstance(message, DAQJobMessageStorePyArrow):
            table = message.get_table()
            message.release()
            if table.num_rows == 0:
                return True
            file.arrow_tables.append(table)
            return True

        # Handle Tabular messages
        # Write headers if the file is new
        if new_file or file.overwrite:
            file.write_queue.append(message.keys)

        # Append rows to write_queue
        if message.data is not None:
            for row in message.data:
                file.write_queue.append(row)
        elif message.data_columns is not None:
            cols = [message.data_columns[k] for k in message.keys]
            if cols and all(isinstance(c, ndarray) for c in cols):
                data_rows = np.stack(cols, axis=-1)
                file.write_queue.extend(data_rows)
            else:
                for row in zip(*cols):
                    file.write_queue.append(list(row))

        return True

    def _open_csv_file(
        self,
        file_path: str,
        overwrite: bool | None,
        use_zstd: bool | None = False,
    ) -> tuple[CSVFile, bool]:
        """
        Opens a file and returns (CSVFile, new_file)
        """
        if file_path not in self._open_csv_files:
            file_exists = os.path.exists(file_path)
            # Check if existing file has content (meaning header was already written)
            file_has_content = file_exists and os.path.getsize(file_path) > 0
            # Create the file if it doesn't exist
            if not file_exists:
                # Create the directory if it doesn't exist
                Path(os.path.dirname(file_path)).mkdir(parents=True, exist_ok=True)
                Path(file_path).touch()

            if use_zstd:
                # Zstd streaming with frame flush for recoverability
                mode = "ab" if not overwrite else "wb"
                raw_file = open(file_path, mode)
                compressor = zstd.ZstdCompressor(level=3)
                zstd_writer = compressor.stream_writer(
                    raw_file, closefd=True, write_return_read=False
                )
                # Wrap in a TextIOWrapper for CSV writer compatibility
                file_handle = io.TextIOWrapper(
                    zstd_writer, encoding="utf-8", newline=""
                )
            else:
                file_handle = open(file_path, "a" if not overwrite else "w", newline="")

            # Open file
            # If file already has content and we're appending, assume header was written
            file = CSVFile(
                file_handle,
                datetime.now(),
                deque(),
                overwrite=overwrite,
                header_written=file_has_content and not overwrite,
            )
            self._open_csv_files[file_path] = file
        else:
            file_exists = True
            file = self._open_csv_files[file_path]
        return file, not file_exists

    def _flush(self, file: CSVFile) -> bool:
        if (
            datetime.now() - file.last_flush_date
        ).total_seconds() < DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS:
            return False

        file.file.flush()

        # For zstd wrapped in TextIOWrapper, flush with FLUSH_FRAME for recoverability
        if hasattr(file.file, "buffer") and hasattr(file.file.buffer, "flush"):
            buffer = file.file.buffer
            # Check if it's a zstd stream writer (has FLUSH_FRAME capability)
            try:
                buffer.flush(zstd.FLUSH_FRAME)
            except TypeError:
                # Not a zstd writer, just a regular buffer
                pass

        file.last_flush_date = datetime.now()
        return True

    def store_loop(self):
        files_to_delete = []
        for file_path, file in self._open_csv_files.copy().items():
            if file.file.closed:
                files_to_delete.append(file_path)
                continue

            # Write PyArrow tables efficiently using native CSV writer
            if file.arrow_tables:
                combined = pa.concat_tables(file.arrow_tables)
                file.arrow_tables.clear()

                # Use PyArrow's native CSV writer (much faster)
                write_options = pa_csv.WriteOptions(
                    include_header=not file.header_written
                )

                # Write to a BytesIO buffer first, then decode and write to text file
                # PyArrow's write_csv requires a binary file, but we have a text file
                buffer = io.BytesIO()
                pa_csv.write_csv(combined, buffer, write_options=write_options)
                file.file.write(buffer.getvalue().decode("utf-8"))
                file.header_written = True

            # Write traditional queue rows
            writer = csv.writer(file.file)
            row_size = len(file.write_queue)
            if row_size > 0:
                if not file.header_written:
                    # First row is header for tabular messages
                    pass
                writer.writerows(list(file.write_queue))
            file.write_queue.clear()

            if file.overwrite:
                file.file.close()
                files_to_delete.append(file_path)
                continue

            # Flush if the flush time is up
            self._flush(file)

        for file_path in files_to_delete:
            del self._open_csv_files[file_path]

    def __del__(self):
        self.store_loop()

        # Close all open files
        for file in self._open_csv_files.values():
            if file.file.closed:
                continue
            file.file.close()

        return super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores.

Ancestors

Class variables

var allowed_message_in_types
var allowed_store_config_types
var config_type : type[DAQJobConfig]

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Methods

def store_loop(self)
Expand source code
def store_loop(self):
    files_to_delete = []
    for file_path, file in self._open_csv_files.copy().items():
        if file.file.closed:
            files_to_delete.append(file_path)
            continue

        # Write PyArrow tables efficiently using native CSV writer
        if file.arrow_tables:
            combined = pa.concat_tables(file.arrow_tables)
            file.arrow_tables.clear()

            # Use PyArrow's native CSV writer (much faster)
            write_options = pa_csv.WriteOptions(
                include_header=not file.header_written
            )

            # Write to a BytesIO buffer first, then decode and write to text file
            # PyArrow's write_csv requires a binary file, but we have a text file
            buffer = io.BytesIO()
            pa_csv.write_csv(combined, buffer, write_options=write_options)
            file.file.write(buffer.getvalue().decode("utf-8"))
            file.header_written = True

        # Write traditional queue rows
        writer = csv.writer(file.file)
        row_size = len(file.write_queue)
        if row_size > 0:
            if not file.header_written:
                # First row is header for tabular messages
                pass
            writer.writerows(list(file.write_queue))
        file.write_queue.clear()

        if file.overwrite:
            file.file.close()
            files_to_delete.append(file_path)
            continue

        # Flush if the flush time is up
        self._flush(file)

    for file_path in files_to_delete:
        del self._open_csv_files[file_path]

Inherited members

class DAQJobStoreCSVConfig (out_dir: str = 'out/',
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True,
topics_to_subscribe: list[str] = <factory>)
Expand source code
class DAQJobStoreCSVConfig(DAQJobConfig):
    out_dir: str = "out/"

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Ancestors

Instance variables

var out_dir : str
Expand source code
class DAQJobStoreCSVConfig(DAQJobConfig):
    out_dir: str = "out/"