Module enrgdaq.daq.jobs.store.csv

Classes

class CSVFile (file: ,
last_flush_date: datetime.datetime,
write_queue: collections.deque[list[typing.Any]],
overwrite: bool | None = None)
Expand source code
@dataclass
class CSVFile:
    file: TextIO
    last_flush_date: datetime
    write_queue: deque[list[Any]]
    overwrite: Optional[bool] = None

CSVFile(file: , last_flush_date: datetime.datetime, write_queue: collections.deque[list[typing.Any]], overwrite: Optional[bool] = None)

Class variables

var file
var last_flush_date : datetime.datetime
var overwrite : bool | None
var write_queue : collections.deque[list[typing.Any]]
class DAQJobStoreCSV (config: DAQJobStoreCSVConfig,
**kwargs)
Expand source code
class DAQJobStoreCSV(DAQJobStore):
    config_type = DAQJobStoreCSVConfig
    allowed_store_config_types = [DAQJobStoreConfigCSV]
    allowed_message_in_types = [DAQJobMessageStore]

    _open_csv_files: dict[str, CSVFile]

    def __init__(self, config: DAQJobStoreCSVConfig, **kwargs):
        super().__init__(config, **kwargs)

        self._open_csv_files = {}

    def handle_message(self, message: DAQJobMessageStoreTabular) -> bool:
        if not super().handle_message(message):
            return False

        store_config = cast(DAQJobStoreConfigCSV, message.store_config.csv)
        file_path = modify_file_path(
            store_config.file_path, store_config.add_date, message.tag
        )
        file_path = os.path.join(self.config.out_dir, file_path)

        file, new_file = self._open_csv_file(
            file_path, store_config.overwrite, store_config.use_gzip
        )
        if file.overwrite:
            file.write_queue.clear()

        # Write headers if the file is new
        if new_file or file.overwrite:
            file.write_queue.append(message.keys)

        # Append rows to write_queue
        for row in message.data:
            file.write_queue.append(row)

        return True

    def _open_csv_file(
        self, file_path: str, overwrite: Optional[bool], use_gzip: Optional[bool]
    ) -> tuple[CSVFile, bool]:
        """
        Opens a file and returns (CSVFile, new_file)
        """
        if file_path not in self._open_csv_files:
            file_exists = os.path.exists(file_path)
            # Create the file if it doesn't exist
            if not file_exists:
                # Create the directory if it doesn't exist
                Path(os.path.dirname(file_path)).mkdir(parents=True, exist_ok=True)
                Path(file_path).touch()

            if use_gzip:
                file_handle = gzip.open(
                    file_path, "at" if not overwrite else "wt", newline=""
                )
            else:
                file_handle = open(file_path, "a" if not overwrite else "w", newline="")

            # Open file
            file = CSVFile(
                file_handle,
                datetime.now(),
                deque(),
                overwrite,
            )
            self._open_csv_files[file_path] = file
        else:
            file_exists = True
            file = self._open_csv_files[file_path]
        return file, not file_exists

    def _flush(self, file: CSVFile) -> bool:
        if (
            datetime.now() - file.last_flush_date
        ).total_seconds() < DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS:
            return False

        file.file.flush()
        file.last_flush_date = datetime.now()
        return True

    def store_loop(self):
        files_to_delete = []
        for file_path, file in self._open_csv_files.items():
            if file.file.closed:
                files_to_delete.append(file_path)
                continue
            writer = csv.writer(file.file)

            row_size = len(file.write_queue)
            if row_size > 0:
                writer.writerows(list(file.write_queue))
            file.write_queue.clear()

            if file.overwrite:
                file.file.close()
                files_to_delete.append(file_path)
                continue

            # Flush if the flush time is up
            self._flush(file)

        for file_path in files_to_delete:
            del self._open_csv_files[file_path]

    def __del__(self):
        self.store_loop()

        # Close all open files
        for file in self._open_csv_files.values():
            if file.file.closed:
                continue
            file.file.close()

        return super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.

Attributes

allowed_store_config_types : list
A list of allowed store configuration types.

Ancestors

Class variables

var allowed_message_in_types : list[type[DAQJobMessage]]
var allowed_store_config_types : list
var config_type : Any

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.

Methods

def store_loop(self)
Expand source code
def store_loop(self):
    files_to_delete = []
    for file_path, file in self._open_csv_files.items():
        if file.file.closed:
            files_to_delete.append(file_path)
            continue
        writer = csv.writer(file.file)

        row_size = len(file.write_queue)
        if row_size > 0:
            writer.writerows(list(file.write_queue))
        file.write_queue.clear()

        if file.overwrite:
            file.file.close()
            files_to_delete.append(file_path)
            continue

        # Flush if the flush time is up
        self._flush(file)

    for file_path in files_to_delete:
        del self._open_csv_files[file_path]

Inherited members

class DAQJobStoreCSVConfig (out_dir: str = 'out/',
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)
Expand source code
class DAQJobStoreCSVConfig(DAQJobConfig):
    out_dir: str = "out/"

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.

Ancestors

Instance variables

var out_dir : str
Expand source code
class DAQJobStoreCSVConfig(DAQJobConfig):
    out_dir: str = "out/"