Module enrgdaq.daq.jobs.sensor.xiaomi_mijia
Classes
class DAQJobXiaomiMijia (config: DAQXiaomiMijiaConfig,
**kwargs)-
Expand source code
class DAQJobXiaomiMijia(DAQJob): config_type = DAQXiaomiMijiaConfig config: DAQXiaomiMijiaConfig def __init__(self, config: DAQXiaomiMijiaConfig, **kwargs): super().__init__(config, **kwargs) logging.getLogger("bleak.backends.winrt.scanner").setLevel( self.config.verbosity.to_logging_level() ) logging.getLogger("bleak.backends.winrt.client").setLevel( self.config.verbosity.to_logging_level() ) logging.getLogger("asyncio").setLevel(self.config.verbosity.to_logging_level()) def start(self): while not self._has_been_freed: try: # _get_data now returns (data, units) data = self._get_data() self._send_store_message(data) except Exception as e: self._logger.warning( f"Failed to get data: {e}. Retrying after poll interval..." ) time.sleep(self.config.poll_interval_seconds) def _get_data(self): """Attempts to connect and retrieve data using the context manager.""" assert ( Lywsd03mmcClientSyncContext is not None ), "lywsd03mmc library not installed" for attempt in range(1, self.config.connect_retries + 1): try: self._logger.debug( f"Connecting to {self.config.mac_address} (attempt {attempt})..." ) # Use the Lywsd03mmcClientSyncContext from your script with Lywsd03mmcClientSyncContext( self.config.mac_address, timeout_sec=self.config.timeout_sec ) as client: data: Lywsd03mmcData = client.get_data() self._logger.debug("Connected and got data.") return data except Exception as e: self._logger.warning(f"Connection attempt {attempt} failed: {e}") if attempt < self.config.connect_retries: time.sleep(self.config.connect_retry_delay) else: self._logger.error("All connection attempts failed.") raise def _send_store_message(self, data: Lywsd03mmcData): """Sends the formatted data to the store.""" keys = ["timestamp", "temperature", "humidity", "battery"] values = [ get_now_unix_timestamp_ms(), data.temperature, data.hum, data.battery_percentage, ] self._logger.debug(f"Sending data to store: {dict(zip(keys, values))}") self._put_message_out( DAQJobMessageStoreTabular( store_config=self.config.store_config, keys=keys, data=[values], ) )DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.
Attributes
allowed_message_in_types:list[type[DAQJobMessage]]- List of allowed message types for input.
config_type:Any- Type of the configuration.
config:Any- Configuration object.
message_in:Queue[DAQJobMessage]- Queue for incoming messages.
message_out:Queue[DAQJobMessage]- Queue for outgoing messages.
instance_id:int- Unique instance identifier.
unique_id:str- Unique identifier for the job.
restart_offset:timedelta- Offset for restarting the job.
info:DAQJobInfo- Information about the job.
_has_been_freed:bool- Flag indicating if the job has been freed.
_logger:logging.Logger- Logger instance for the job.
multiprocessing_method:str- The multiprocessing method to use ('fork' or 'spawn').
Ancestors
Class variables
var config : DAQXiaomiMijiaConfigvar config_type : type[DAQJobConfig]-
Configuration for DAQJobXiaomiMijia.
Attributes
mac_address- Bluetooth MAC address of the sensor.
poll_interval_seconds- How often to poll the sensor.
connect_retries- Number of connection attempts before failing.
connect_retry_delay- Delay (seconds) between connection attempts.
timeout_sec- Connection timeout in seconds.
Methods
def start(self)-
Expand source code
def start(self): while not self._has_been_freed: try: # _get_data now returns (data, units) data = self._get_data() self._send_store_message(data) except Exception as e: self._logger.warning( f"Failed to get data: {e}. Retrying after poll interval..." ) time.sleep(self.config.poll_interval_seconds)
Inherited members
class DAQXiaomiMijiaConfig (store_config: DAQJobStoreConfig,
mac_address: str,
poll_interval_seconds: int = 10,
connect_retries: int = 5,
connect_retry_delay: float = 2.0,
timeout_sec: float = 60.0,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)-
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig): """ Configuration for DAQJobXiaomiMijia. Attributes: mac_address: Bluetooth MAC address of the sensor. poll_interval_seconds: How often to poll the sensor. connect_retries: Number of connection attempts before failing. connect_retry_delay: Delay (seconds) between connection attempts. timeout_sec: Connection timeout in seconds. """ mac_address: str poll_interval_seconds: int = 10 connect_retries: int = 5 connect_retry_delay: float = 2.0 timeout_sec: float = 60.0Configuration for DAQJobXiaomiMijia.
Attributes
mac_address- Bluetooth MAC address of the sensor.
poll_interval_seconds- How often to poll the sensor.
connect_retries- Number of connection attempts before failing.
connect_retry_delay- Delay (seconds) between connection attempts.
timeout_sec- Connection timeout in seconds.
Ancestors
- StorableDAQJobConfig
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var connect_retries : int-
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig): """ Configuration for DAQJobXiaomiMijia. Attributes: mac_address: Bluetooth MAC address of the sensor. poll_interval_seconds: How often to poll the sensor. connect_retries: Number of connection attempts before failing. connect_retry_delay: Delay (seconds) between connection attempts. timeout_sec: Connection timeout in seconds. """ mac_address: str poll_interval_seconds: int = 10 connect_retries: int = 5 connect_retry_delay: float = 2.0 timeout_sec: float = 60.0 var connect_retry_delay : float-
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig): """ Configuration for DAQJobXiaomiMijia. Attributes: mac_address: Bluetooth MAC address of the sensor. poll_interval_seconds: How often to poll the sensor. connect_retries: Number of connection attempts before failing. connect_retry_delay: Delay (seconds) between connection attempts. timeout_sec: Connection timeout in seconds. """ mac_address: str poll_interval_seconds: int = 10 connect_retries: int = 5 connect_retry_delay: float = 2.0 timeout_sec: float = 60.0 var mac_address : str-
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig): """ Configuration for DAQJobXiaomiMijia. Attributes: mac_address: Bluetooth MAC address of the sensor. poll_interval_seconds: How often to poll the sensor. connect_retries: Number of connection attempts before failing. connect_retry_delay: Delay (seconds) between connection attempts. timeout_sec: Connection timeout in seconds. """ mac_address: str poll_interval_seconds: int = 10 connect_retries: int = 5 connect_retry_delay: float = 2.0 timeout_sec: float = 60.0 var poll_interval_seconds : int-
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig): """ Configuration for DAQJobXiaomiMijia. Attributes: mac_address: Bluetooth MAC address of the sensor. poll_interval_seconds: How often to poll the sensor. connect_retries: Number of connection attempts before failing. connect_retry_delay: Delay (seconds) between connection attempts. timeout_sec: Connection timeout in seconds. """ mac_address: str poll_interval_seconds: int = 10 connect_retries: int = 5 connect_retry_delay: float = 2.0 timeout_sec: float = 60.0 var timeout_sec : float-
Expand source code
class DAQXiaomiMijiaConfig(StorableDAQJobConfig): """ Configuration for DAQJobXiaomiMijia. Attributes: mac_address: Bluetooth MAC address of the sensor. poll_interval_seconds: How often to poll the sensor. connect_retries: Number of connection attempts before failing. connect_retry_delay: Delay (seconds) between connection attempts. timeout_sec: Connection timeout in seconds. """ mac_address: str poll_interval_seconds: int = 10 connect_retries: int = 5 connect_retry_delay: float = 2.0 timeout_sec: float = 60.0