Module enrgdaq.daq.alert.alert_slack
Classes
class DAQJobAlertSlack (config: DAQJobAlertSlackConfig,
**kwargs)-
Expand source code
class DAQJobAlertSlack(DAQJobAlert): config_type = DAQJobAlertSlackConfig config: DAQJobAlertSlackConfig _slack: Slack def __init__(self, config: DAQJobAlertSlackConfig, **kwargs): super().__init__(config, **kwargs) self._slack = Slack(url=config.slack_webhook_url) def handle_message(self, message: DAQJobMessageAlert) -> bool: if not super().handle_message(message): return False self.send_webhook(message) return True def send_webhook(self, alert: DAQJobMessageAlert): self._logger.info( f"Sending alert to Slack: [{alert.alert_info.severity}] {alert.alert_info.message}" ) assert alert.daq_job_info is not None res = self._slack.post( attachments=[ { "fallback": alert.alert_info.message, "color": ALERT_SEVERITY_TO_SLACK_COLOR[alert.alert_info.severity], "author_name": alert.daq_job_info.daq_job_class_name, "title": "Alert!", "fields": [ { "title": "Supervisor ID", "value": alert.daq_job_info.supervisor_config.supervisor_id if alert.daq_job_info.supervisor_config else "-", "short": True, }, { "title": "Originated Supervisor ID", "value": alert.originated_supervisor_id, "short": True, }, { "title": "Severity", "value": alert.alert_info.severity, "short": True, }, { "title": "Date", "value": alert.date.strftime("%Y-%m-%d %H:%M:%S"), "short": True, }, { "title": "Message", "value": alert.alert_info.message, "short": False, }, ], } ] ) if res != "ok": raise Exception("Slack webhook returned error!")
DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.
Attributes
allowed_message_in_types
:list[type[DAQJobMessage]]
- List of allowed message types for input.
config_type
:Any
- Type of the configuration.
config
:Any
- Configuration object.
message_in
:Queue[DAQJobMessage]
- Queue for incoming messages.
message_out
:Queue[DAQJobMessage]
- Queue for outgoing messages.
instance_id
:int
- Unique instance identifier.
unique_id
:str
- Unique identifier for the job.
restart_offset
:timedelta
- Offset for restarting the job.
info
:DAQJobInfo
- Information about the job.
_has_been_freed
:bool
- Flag indicating if the job has been freed.
_logger
:logging.Logger
- Logger instance for the job.
Ancestors
Class variables
var config : DAQJobAlertSlackConfig
var config_type : Any
-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Methods
def send_webhook(self,
alert: DAQJobMessageAlert)-
Expand source code
def send_webhook(self, alert: DAQJobMessageAlert): self._logger.info( f"Sending alert to Slack: [{alert.alert_info.severity}] {alert.alert_info.message}" ) assert alert.daq_job_info is not None res = self._slack.post( attachments=[ { "fallback": alert.alert_info.message, "color": ALERT_SEVERITY_TO_SLACK_COLOR[alert.alert_info.severity], "author_name": alert.daq_job_info.daq_job_class_name, "title": "Alert!", "fields": [ { "title": "Supervisor ID", "value": alert.daq_job_info.supervisor_config.supervisor_id if alert.daq_job_info.supervisor_config else "-", "short": True, }, { "title": "Originated Supervisor ID", "value": alert.originated_supervisor_id, "short": True, }, { "title": "Severity", "value": alert.alert_info.severity, "short": True, }, { "title": "Date", "value": alert.date.strftime("%Y-%m-%d %H:%M:%S"), "short": True, }, { "title": "Message", "value": alert.alert_info.message, "short": False, }, ], } ] ) if res != "ok": raise Exception("Slack webhook returned error!")
Inherited members
class DAQJobAlertSlackConfig (slack_webhook_url: str,
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)-
Expand source code
class DAQJobAlertSlackConfig(DAQJobConfig): slack_webhook_url: str
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var slack_webhook_url : str
-
Expand source code
class DAQJobAlertSlackConfig(DAQJobConfig): slack_webhook_url: str