Module enrgdaq.daq.jobs.sensor.xiaomi_mijia

Classes

class DAQJobXiaomiMijia (config: DAQXiaomiMijiaConfig,
**kwargs)
Expand source code
class DAQJobXiaomiMijia(DAQJob):
    config_type = DAQXiaomiMijiaConfig
    config: DAQXiaomiMijiaConfig

    def __init__(self, config: DAQXiaomiMijiaConfig, **kwargs):
        super().__init__(config, **kwargs)
        logging.getLogger("bleak.backends.winrt.scanner").setLevel(
            self.config.verbosity.to_logging_level()
        )
        logging.getLogger("bleak.backends.winrt.client").setLevel(
            self.config.verbosity.to_logging_level()
        )
        logging.getLogger("asyncio").setLevel(self.config.verbosity.to_logging_level())

    def start(self):
        while not self._has_been_freed:
            try:
                # _get_data now returns (data, units)
                data = self._get_data()
                self._send_store_message(data)
            except Exception as e:
                self._logger.warning(
                    f"Failed to get data: {e}. Retrying after poll interval..."
                )
            time.sleep(self.config.poll_interval_seconds)

    def _get_data(self):
        """Attempts to connect and retrieve data using the context manager."""
        assert (
            Lywsd03mmcClientSyncContext is not None
        ), "lywsd03mmc library not installed"

        for attempt in range(1, self.config.connect_retries + 1):
            try:
                self._logger.debug(
                    f"Connecting to {self.config.mac_address} (attempt {attempt})..."
                )
                # Use the Lywsd03mmcClientSyncContext from your script
                with Lywsd03mmcClientSyncContext(
                    self.config.mac_address, timeout_sec=self.config.timeout_sec
                ) as client:
                    data: Lywsd03mmcData = client.get_data()
                    self._logger.debug("Connected and got data.")
                    return data

            except Exception as e:
                self._logger.warning(f"Connection attempt {attempt} failed: {e}")
                if attempt < self.config.connect_retries:
                    time.sleep(self.config.connect_retry_delay)
                else:
                    self._logger.error("All connection attempts failed.")
                    raise

    def _send_store_message(self, data: Lywsd03mmcData):
        """Sends the formatted data to the store."""

        keys = ["timestamp", "temperature", "humidity", "battery"]
        values = [
            get_now_unix_timestamp_ms(),
            data.temperature,
            data.hum,
            data.battery_percentage,
        ]

        self._logger.debug(f"Sending data to store: {dict(zip(keys, values))}")
        self._put_message_out(
            DAQJobMessageStoreTabular(
                store_config=self.config.store_config,
                keys=keys,
                data=[values],
            )
        )

DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.

Attributes

allowed_message_in_types : list[type[DAQJobMessage]]
List of allowed message types for input.
config_type : Any
Type of the configuration.
config : Any
Configuration object.
message_in : Queue[DAQJobMessage]
Queue for incoming messages.
message_out : Queue[DAQJobMessage]
Queue for outgoing messages.
instance_id : int
Unique instance identifier.
unique_id : str
Unique identifier for the job.
restart_offset : timedelta
Offset for restarting the job.
info : DAQJobInfo
Information about the job.
_has_been_freed : bool
Flag indicating if the job has been freed.
_logger : logging.Logger
Logger instance for the job.
multiprocessing_method : str
The multiprocessing method to use ('fork' or 'spawn').

Ancestors

Class variables

var configDAQXiaomiMijiaConfig
var config_type : type[DAQJobConfig]

Configuration for DAQJobXiaomiMijia.

Attributes

mac_address
Bluetooth MAC address of the sensor.
poll_interval_seconds
How often to poll the sensor.
connect_retries
Number of connection attempts before failing.
connect_retry_delay
Delay (seconds) between connection attempts.
timeout_sec
Connection timeout in seconds.

Methods

def start(self)
Expand source code
def start(self):
    while not self._has_been_freed:
        try:
            # _get_data now returns (data, units)
            data = self._get_data()
            self._send_store_message(data)
        except Exception as e:
            self._logger.warning(
                f"Failed to get data: {e}. Retrying after poll interval..."
            )
        time.sleep(self.config.poll_interval_seconds)

Inherited members

class DAQXiaomiMijiaConfig (store_config: DAQJobStoreConfig,
mac_address: str,
poll_interval_seconds: int = 10,
connect_retries: int = 5,
connect_retry_delay: float = 2.0,
timeout_sec: float = 60.0,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobXiaomiMijia.
    Attributes:
        mac_address: Bluetooth MAC address of the sensor.
        poll_interval_seconds: How often to poll the sensor.
        connect_retries: Number of connection attempts before failing.
        connect_retry_delay: Delay (seconds) between connection attempts.
        timeout_sec: Connection timeout in seconds.
    """

    mac_address: str
    poll_interval_seconds: int = 10
    connect_retries: int = 5
    connect_retry_delay: float = 2.0
    timeout_sec: float = 60.0

Configuration for DAQJobXiaomiMijia.

Attributes

mac_address
Bluetooth MAC address of the sensor.
poll_interval_seconds
How often to poll the sensor.
connect_retries
Number of connection attempts before failing.
connect_retry_delay
Delay (seconds) between connection attempts.
timeout_sec
Connection timeout in seconds.

Ancestors

Instance variables

var connect_retries : int
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobXiaomiMijia.
    Attributes:
        mac_address: Bluetooth MAC address of the sensor.
        poll_interval_seconds: How often to poll the sensor.
        connect_retries: Number of connection attempts before failing.
        connect_retry_delay: Delay (seconds) between connection attempts.
        timeout_sec: Connection timeout in seconds.
    """

    mac_address: str
    poll_interval_seconds: int = 10
    connect_retries: int = 5
    connect_retry_delay: float = 2.0
    timeout_sec: float = 60.0
var connect_retry_delay : float
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobXiaomiMijia.
    Attributes:
        mac_address: Bluetooth MAC address of the sensor.
        poll_interval_seconds: How often to poll the sensor.
        connect_retries: Number of connection attempts before failing.
        connect_retry_delay: Delay (seconds) between connection attempts.
        timeout_sec: Connection timeout in seconds.
    """

    mac_address: str
    poll_interval_seconds: int = 10
    connect_retries: int = 5
    connect_retry_delay: float = 2.0
    timeout_sec: float = 60.0
var mac_address : str
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobXiaomiMijia.
    Attributes:
        mac_address: Bluetooth MAC address of the sensor.
        poll_interval_seconds: How often to poll the sensor.
        connect_retries: Number of connection attempts before failing.
        connect_retry_delay: Delay (seconds) between connection attempts.
        timeout_sec: Connection timeout in seconds.
    """

    mac_address: str
    poll_interval_seconds: int = 10
    connect_retries: int = 5
    connect_retry_delay: float = 2.0
    timeout_sec: float = 60.0
var poll_interval_seconds : int
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobXiaomiMijia.
    Attributes:
        mac_address: Bluetooth MAC address of the sensor.
        poll_interval_seconds: How often to poll the sensor.
        connect_retries: Number of connection attempts before failing.
        connect_retry_delay: Delay (seconds) between connection attempts.
        timeout_sec: Connection timeout in seconds.
    """

    mac_address: str
    poll_interval_seconds: int = 10
    connect_retries: int = 5
    connect_retry_delay: float = 2.0
    timeout_sec: float = 60.0
var timeout_sec : float
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobXiaomiMijia.
    Attributes:
        mac_address: Bluetooth MAC address of the sensor.
        poll_interval_seconds: How often to poll the sensor.
        connect_retries: Number of connection attempts before failing.
        connect_retry_delay: Delay (seconds) between connection attempts.
        timeout_sec: Connection timeout in seconds.
    """

    mac_address: str
    poll_interval_seconds: int = 10
    connect_retries: int = 5
    connect_retry_delay: float = 2.0
    timeout_sec: float = 60.0