Module enrgdaq.cnc.models

Classes

class CNCClientInfo (identity: bytes | None = None,
last_seen: str = '',
info: SupervisorInfo | None = None)
Expand source code
class CNCClientInfo(Struct):
    """Information about a connected client."""

    identity: Optional[bytes] = None
    last_seen: str = ""
    info: Optional[SupervisorInfo] = None

Information about a connected client.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var identity : bytes | None
Expand source code
class CNCClientInfo(Struct):
    """Information about a connected client."""

    identity: Optional[bytes] = None
    last_seen: str = ""
    info: Optional[SupervisorInfo] = None
var infoSupervisorInfo | None
Expand source code
class CNCClientInfo(Struct):
    """Information about a connected client."""

    identity: Optional[bytes] = None
    last_seen: str = ""
    info: Optional[SupervisorInfo] = None
var last_seen : str
Expand source code
class CNCClientInfo(Struct):
    """Information about a connected client."""

    identity: Optional[bytes] = None
    last_seen: str = ""
    info: Optional[SupervisorInfo] = None
class CNCMessage (*, req_id: str | None = None)
Expand source code
class CNCMessage(Struct, tag=True, kw_only=True):
    """Base class for C&C messages."""

    req_id: Optional[str] = None

Base class for C&C messages.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Subclasses

Instance variables

var req_id : str | None
Expand source code
class CNCMessage(Struct, tag=True, kw_only=True):
    """Base class for C&C messages."""

    req_id: Optional[str] = None
class CNCMessageHeartbeat (supervisor_info: SupervisorInfo,
*,
req_id: str | None = None)
Expand source code
class CNCMessageHeartbeat(CNCMessage):
    """Heartbeat message sent by clients to the server."""

    supervisor_info: SupervisorInfo

Heartbeat message sent by clients to the server.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var supervisor_infoSupervisorInfo
Expand source code
class CNCMessageHeartbeat(CNCMessage):
    """Heartbeat message sent by clients to the server."""

    supervisor_info: SupervisorInfo
class CNCMessageLog (level: str,
message: str,
timestamp: str,
module: str,
client_id: str,
*,
req_id: str | None = None)
Expand source code
class CNCMessageLog(CNCMessage):
    """Log message from client to server."""

    level: str  # e.g. 'INFO', 'WARNING', 'ERROR'
    message: str
    timestamp: str
    module: str  # module or component that generated the log
    client_id: str  # ID of the client sending the log

Log message from client to server.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var client_id : str
Expand source code
class CNCMessageLog(CNCMessage):
    """Log message from client to server."""

    level: str  # e.g. 'INFO', 'WARNING', 'ERROR'
    message: str
    timestamp: str
    module: str  # module or component that generated the log
    client_id: str  # ID of the client sending the log
var level : str
Expand source code
class CNCMessageLog(CNCMessage):
    """Log message from client to server."""

    level: str  # e.g. 'INFO', 'WARNING', 'ERROR'
    message: str
    timestamp: str
    module: str  # module or component that generated the log
    client_id: str  # ID of the client sending the log
var message : str
Expand source code
class CNCMessageLog(CNCMessage):
    """Log message from client to server."""

    level: str  # e.g. 'INFO', 'WARNING', 'ERROR'
    message: str
    timestamp: str
    module: str  # module or component that generated the log
    client_id: str  # ID of the client sending the log
var module : str
Expand source code
class CNCMessageLog(CNCMessage):
    """Log message from client to server."""

    level: str  # e.g. 'INFO', 'WARNING', 'ERROR'
    message: str
    timestamp: str
    module: str  # module or component that generated the log
    client_id: str  # ID of the client sending the log
var timestamp : str
Expand source code
class CNCMessageLog(CNCMessage):
    """Log message from client to server."""

    level: str  # e.g. 'INFO', 'WARNING', 'ERROR'
    message: str
    timestamp: str
    module: str  # module or component that generated the log
    client_id: str  # ID of the client sending the log
class CNCMessageReqListClients (*, req_id: str | None = None)
Expand source code
class CNCMessageReqListClients(CNCMessage):
    """Request to list connected clients."""

    pass

Request to list connected clients.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin
class CNCMessageReqPing (*, req_id: str | None = None)
Expand source code
class CNCMessageReqPing(CNCMessage):
    """Ping message."""

    pass

Ping message.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin
class CNCMessageReqRestartDAQ (update: bool = False, *, req_id: str | None = None)
Expand source code
class CNCMessageReqRestartDAQ(CNCMessage):
    """Request to restart ENRGDAQ."""

    update: bool = False

    pass

Request to restart ENRGDAQ.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var update : bool
Expand source code
class CNCMessageReqRestartDAQ(CNCMessage):
    """Request to restart ENRGDAQ."""

    update: bool = False

    pass
class CNCMessageReqRunCustomDAQJob (config: str, restart_on_crash: bool = True, *, req_id: str | None = None)
Expand source code
class CNCMessageReqRunCustomDAQJob(CNCMessage):
    """Request to run a custom DAQJob with config."""

    config: str  # TOML config as string
    restart_on_crash: bool = True  # If False, job will not be restarted on crash

Request to run a custom DAQJob with config.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var config : str
Expand source code
class CNCMessageReqRunCustomDAQJob(CNCMessage):
    """Request to run a custom DAQJob with config."""

    config: str  # TOML config as string
    restart_on_crash: bool = True  # If False, job will not be restarted on crash
var restart_on_crash : bool
Expand source code
class CNCMessageReqRunCustomDAQJob(CNCMessage):
    """Request to run a custom DAQJob with config."""

    config: str  # TOML config as string
    restart_on_crash: bool = True  # If False, job will not be restarted on crash
class CNCMessageReqSendMessage (message_type: str,
payload: str,
target_daq_job_unique_id: str | None = None,
*,
req_id: str | None = None)
Expand source code
class CNCMessageReqSendMessage(CNCMessage):
    """Request to send a custom message to DAQ job(s)."""

    message_type: str  # The type name of the DAQJobMessage to send
    payload: str  # JSON-encoded message payload
    target_daq_job_unique_id: Optional[str] = (
        None  # If specified, only send to this job
    )

Request to send a custom message to DAQ job(s).

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var message_type : str
Expand source code
class CNCMessageReqSendMessage(CNCMessage):
    """Request to send a custom message to DAQ job(s)."""

    message_type: str  # The type name of the DAQJobMessage to send
    payload: str  # JSON-encoded message payload
    target_daq_job_unique_id: Optional[str] = (
        None  # If specified, only send to this job
    )
var payload : str
Expand source code
class CNCMessageReqSendMessage(CNCMessage):
    """Request to send a custom message to DAQ job(s)."""

    message_type: str  # The type name of the DAQJobMessage to send
    payload: str  # JSON-encoded message payload
    target_daq_job_unique_id: Optional[str] = (
        None  # If specified, only send to this job
    )
var target_daq_job_unique_id : str | None
Expand source code
class CNCMessageReqSendMessage(CNCMessage):
    """Request to send a custom message to DAQ job(s)."""

    message_type: str  # The type name of the DAQJobMessage to send
    payload: str  # JSON-encoded message payload
    target_daq_job_unique_id: Optional[str] = (
        None  # If specified, only send to this job
    )
class CNCMessageReqStatus (*, req_id: str | None = None)
Expand source code
class CNCMessageReqStatus(CNCMessage):
    """Request for supervisor status."""

    pass

Request for supervisor status.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin
class CNCMessageReqStopDAQJob (daq_job_name: str | None = None,
daq_job_unique_id: str | None = None,
remove: bool = False,
*,
req_id: str | None = None)
Expand source code
class CNCMessageReqStopDAQJob(CNCMessage):
    """Request to stop and remove a specific DAQJob by name."""

    daq_job_name: Optional[str] = None
    daq_job_unique_id: Optional[str] = None
    remove: bool = False

Request to stop and remove a specific DAQJob by name.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var daq_job_name : str | None
Expand source code
class CNCMessageReqStopDAQJob(CNCMessage):
    """Request to stop and remove a specific DAQJob by name."""

    daq_job_name: Optional[str] = None
    daq_job_unique_id: Optional[str] = None
    remove: bool = False
var daq_job_unique_id : str | None
Expand source code
class CNCMessageReqStopDAQJob(CNCMessage):
    """Request to stop and remove a specific DAQJob by name."""

    daq_job_name: Optional[str] = None
    daq_job_unique_id: Optional[str] = None
    remove: bool = False
var remove : bool
Expand source code
class CNCMessageReqStopDAQJob(CNCMessage):
    """Request to stop and remove a specific DAQJob by name."""

    daq_job_name: Optional[str] = None
    daq_job_unique_id: Optional[str] = None
    remove: bool = False
class CNCMessageReqStopDAQJobs (*, req_id: str | None = None)
Expand source code
class CNCMessageReqStopDAQJobs(CNCMessage):
    """Request to restart DAQJobs."""

    pass

Request to restart DAQJobs.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin
class CNCMessageResListClients (clients: dict[str, SupervisorInfo | None],
*,
req_id: str | None = None)
Expand source code
class CNCMessageResListClients(CNCMessage):
    """List of connected clients."""

    clients: dict[str, Optional[SupervisorInfo]]

List of connected clients.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var clients : dict[str, SupervisorInfo | None]
Expand source code
class CNCMessageResListClients(CNCMessage):
    """List of connected clients."""

    clients: dict[str, Optional[SupervisorInfo]]
class CNCMessageResPing (*, req_id: str | None = None)
Expand source code
class CNCMessageResPing(CNCMessage):
    """Pong message."""

    pass

Pong message.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin
class CNCMessageResRestartDAQ (success: bool, message: str, *, req_id: str | None = None)
Expand source code
class CNCMessageResRestartDAQ(CNCMessage):
    """Response to update and restart request."""

    success: bool
    message: str

Response to update and restart request.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var message : str
Expand source code
class CNCMessageResRestartDAQ(CNCMessage):
    """Response to update and restart request."""

    success: bool
    message: str
var success : bool
Expand source code
class CNCMessageResRestartDAQ(CNCMessage):
    """Response to update and restart request."""

    success: bool
    message: str
class CNCMessageResRunCustomDAQJob (success: bool, message: str, *, req_id: str | None = None)
Expand source code
class CNCMessageResRunCustomDAQJob(CNCMessage):
    """Response to run custom DAQJob request."""

    success: bool
    message: str

Response to run custom DAQJob request.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var message : str
Expand source code
class CNCMessageResRunCustomDAQJob(CNCMessage):
    """Response to run custom DAQJob request."""

    success: bool
    message: str
var success : bool
Expand source code
class CNCMessageResRunCustomDAQJob(CNCMessage):
    """Response to run custom DAQJob request."""

    success: bool
    message: str
class CNCMessageResSendMessage (success: bool, message: str, jobs_notified: int = 0, *, req_id: str | None = None)
Expand source code
class CNCMessageResSendMessage(CNCMessage):
    """Response to send custom message request."""

    success: bool
    message: str
    jobs_notified: int = 0

Response to send custom message request.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var jobs_notified : int
Expand source code
class CNCMessageResSendMessage(CNCMessage):
    """Response to send custom message request."""

    success: bool
    message: str
    jobs_notified: int = 0
var message : str
Expand source code
class CNCMessageResSendMessage(CNCMessage):
    """Response to send custom message request."""

    success: bool
    message: str
    jobs_notified: int = 0
var success : bool
Expand source code
class CNCMessageResSendMessage(CNCMessage):
    """Response to send custom message request."""

    success: bool
    message: str
    jobs_notified: int = 0
class CNCMessageResStatus (status: SupervisorStatus,
*,
req_id: str | None = None)
Expand source code
class CNCMessageResStatus(CNCMessage):
    """Supervisor status."""

    status: SupervisorStatus

Supervisor status.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var statusSupervisorStatus
Expand source code
class CNCMessageResStatus(CNCMessage):
    """Supervisor status."""

    status: SupervisorStatus
class CNCMessageResStopDAQJob (success: bool, message: str, *, req_id: str | None = None)
Expand source code
class CNCMessageResStopDAQJob(CNCMessage):
    """Response to stop and remove DAQJob request."""

    success: bool
    message: str

Response to stop and remove DAQJob request.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var message : str
Expand source code
class CNCMessageResStopDAQJob(CNCMessage):
    """Response to stop and remove DAQJob request."""

    success: bool
    message: str
var success : bool
Expand source code
class CNCMessageResStopDAQJob(CNCMessage):
    """Response to stop and remove DAQJob request."""

    success: bool
    message: str
class CNCMessageResStopDAQJobs (success: bool, message: str, *, req_id: str | None = None)
Expand source code
class CNCMessageResStopDAQJobs(CNCMessage):
    """Response to restart DAQJobs request."""

    success: bool
    message: str

Response to restart DAQJobs request.

Ancestors

  • CNCMessage
  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var message : str
Expand source code
class CNCMessageResStopDAQJobs(CNCMessage):
    """Response to restart DAQJobs request."""

    success: bool
    message: str
var success : bool
Expand source code
class CNCMessageResStopDAQJobs(CNCMessage):
    """Response to restart DAQJobs request."""

    success: bool
    message: str
class SupervisorStatus (supervisor_info: SupervisorInfo,
daq_jobs: list[DAQJobInfo],
daq_job_stats: Dict[str, DAQJobStats],
daq_job_remote_stats: dict[str, SupervisorRemoteStats],
restart_schedules: list[RestartScheduleInfo])
Expand source code
class SupervisorStatus(Struct):
    """
    A class to represent the status of a supervisor.
    """

    supervisor_info: SupervisorInfo
    daq_jobs: list[DAQJobInfo]
    daq_job_stats: DAQJobStatsDict
    daq_job_remote_stats: DAQJobRemoteStatsDict
    restart_schedules: list[RestartScheduleInfo]

A class to represent the status of a supervisor.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var daq_job_remote_stats : dict[str, SupervisorRemoteStats]
Expand source code
class SupervisorStatus(Struct):
    """
    A class to represent the status of a supervisor.
    """

    supervisor_info: SupervisorInfo
    daq_jobs: list[DAQJobInfo]
    daq_job_stats: DAQJobStatsDict
    daq_job_remote_stats: DAQJobRemoteStatsDict
    restart_schedules: list[RestartScheduleInfo]
var daq_job_stats : Dict[str, DAQJobStats]
Expand source code
class SupervisorStatus(Struct):
    """
    A class to represent the status of a supervisor.
    """

    supervisor_info: SupervisorInfo
    daq_jobs: list[DAQJobInfo]
    daq_job_stats: DAQJobStatsDict
    daq_job_remote_stats: DAQJobRemoteStatsDict
    restart_schedules: list[RestartScheduleInfo]
var daq_jobs : list[DAQJobInfo]
Expand source code
class SupervisorStatus(Struct):
    """
    A class to represent the status of a supervisor.
    """

    supervisor_info: SupervisorInfo
    daq_jobs: list[DAQJobInfo]
    daq_job_stats: DAQJobStatsDict
    daq_job_remote_stats: DAQJobRemoteStatsDict
    restart_schedules: list[RestartScheduleInfo]
var restart_schedules : list[RestartScheduleInfo]
Expand source code
class SupervisorStatus(Struct):
    """
    A class to represent the status of a supervisor.
    """

    supervisor_info: SupervisorInfo
    daq_jobs: list[DAQJobInfo]
    daq_job_stats: DAQJobStatsDict
    daq_job_remote_stats: DAQJobRemoteStatsDict
    restart_schedules: list[RestartScheduleInfo]
var supervisor_infoSupervisorInfo
Expand source code
class SupervisorStatus(Struct):
    """
    A class to represent the status of a supervisor.
    """

    supervisor_info: SupervisorInfo
    daq_jobs: list[DAQJobInfo]
    daq_job_stats: DAQJobStatsDict
    daq_job_remote_stats: DAQJobRemoteStatsDict
    restart_schedules: list[RestartScheduleInfo]