Module enrgdaq.daq.alert.alert_slack

Classes

class DAQJobAlertSlack (config: DAQJobAlertSlackConfig,
**kwargs)
Expand source code
class DAQJobAlertSlack(DAQJobAlert):
    config_type = DAQJobAlertSlackConfig
    config: DAQJobAlertSlackConfig
    _slack: Slack

    def __init__(self, config: DAQJobAlertSlackConfig, **kwargs):
        super().__init__(config, **kwargs)
        self._slack = Slack(url=config.slack_webhook_url)

    def handle_message(self, message: DAQJobMessageAlert) -> bool:
        if not super().handle_message(message):
            return False
        self.send_webhook(message)
        return True

    def send_webhook(self, alert: DAQJobMessageAlert):
        self._logger.info(
            f"Sending alert to Slack: [{alert.alert_info.severity}] {alert.alert_info.message}"
        )
        assert alert.daq_job_info is not None
        res = self._slack.post(
            attachments=[
                {
                    "fallback": alert.alert_info.message,
                    "color": ALERT_SEVERITY_TO_SLACK_COLOR[alert.alert_info.severity],
                    "author_name": alert.daq_job_info.daq_job_type,
                    "title": "Alert!",
                    "fields": [
                        {
                            "title": "Supervisor ID",
                            "value": alert.daq_job_info.supervisor_info.supervisor_id
                            if alert.daq_job_info.supervisor_info
                            else "-",
                            "short": True,
                        },
                        {
                            "title": "Originated Supervisor ID",
                            "value": alert.originated_supervisor_id,
                            "short": True,
                        },
                        {
                            "title": "Severity",
                            "value": alert.alert_info.severity,
                            "short": True,
                        },
                        {
                            "title": "Date",
                            "value": alert.date.strftime("%Y-%m-%d %H:%M:%S"),
                            "short": True,
                        },
                        {
                            "title": "Message",
                            "value": alert.alert_info.message,
                            "short": False,
                        },
                    ],
                }
            ]
        )
        if res != "ok":
            raise Exception("Slack webhook returned error!")

DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.

Attributes

allowed_message_in_types : list[type[DAQJobMessage]]
List of allowed message types for input.
topics_to_subscribe : list[str]
List of topics to subscribe to.
config_type : Any
Type of the configuration.
config : Any
Configuration object.
message_in : Queue[DAQJobMessage]
Queue for incoming messages.
message_out : Queue[DAQJobMessage]
Queue for outgoing messages.
instance_id : int
Unique instance identifier.
unique_id : str
Unique identifier for the job.
restart_offset : timedelta
Offset for restarting the job.
info : DAQJobInfo
Information about the job.
_has_been_freed : bool
Flag indicating if the job has been freed.
_logger : logging.Logger
Logger instance for the job.
multiprocessing_method : str
The multiprocessing method to use ('fork' or 'spawn').

Ancestors

Class variables

var configDAQJobAlertSlackConfig
var config_type : type[DAQJobConfig]

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Methods

def send_webhook(self,
alert: DAQJobMessageAlert)
Expand source code
def send_webhook(self, alert: DAQJobMessageAlert):
    self._logger.info(
        f"Sending alert to Slack: [{alert.alert_info.severity}] {alert.alert_info.message}"
    )
    assert alert.daq_job_info is not None
    res = self._slack.post(
        attachments=[
            {
                "fallback": alert.alert_info.message,
                "color": ALERT_SEVERITY_TO_SLACK_COLOR[alert.alert_info.severity],
                "author_name": alert.daq_job_info.daq_job_type,
                "title": "Alert!",
                "fields": [
                    {
                        "title": "Supervisor ID",
                        "value": alert.daq_job_info.supervisor_info.supervisor_id
                        if alert.daq_job_info.supervisor_info
                        else "-",
                        "short": True,
                    },
                    {
                        "title": "Originated Supervisor ID",
                        "value": alert.originated_supervisor_id,
                        "short": True,
                    },
                    {
                        "title": "Severity",
                        "value": alert.alert_info.severity,
                        "short": True,
                    },
                    {
                        "title": "Date",
                        "value": alert.date.strftime("%Y-%m-%d %H:%M:%S"),
                        "short": True,
                    },
                    {
                        "title": "Message",
                        "value": alert.alert_info.message,
                        "short": False,
                    },
                ],
            }
        ]
    )
    if res != "ok":
        raise Exception("Slack webhook returned error!")

Inherited members

class DAQJobAlertSlackConfig (slack_webhook_url: str,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True,
topics_to_subscribe: list[str] = <factory>)
Expand source code
class DAQJobAlertSlackConfig(DAQJobConfig):
    slack_webhook_url: str

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe : list[str]
List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.

Ancestors

Instance variables

var slack_webhook_url : str
Expand source code
class DAQJobAlertSlackConfig(DAQJobConfig):
    slack_webhook_url: str