From 978cecf6864111c99656e7edc2aa03432da86dd5 Mon Sep 17 00:00:00 2001 From: majuss Date: Tue, 5 May 2026 13:12:51 +0200 Subject: [PATCH] BACnet: Setup wo discovery; force Int config params; removed waits#2130 --- docker/Dockerfile | 2 +- .../bacnet/test_application_read_fallback.py | 65 ++++ .../bacnet/test_configured_i_am_fallback.py | 166 ++++++++++ .../connectors/bacnet/application.py | 160 +++++++++- .../connectors/bacnet/bacnet_connector.py | 288 ++++++++++++++++-- .../bacnet/bacnet_uplink_converter.py | 64 +++- 6 files changed, 706 insertions(+), 39 deletions(-) create mode 100644 tests/unit/connectors/bacnet/test_application_read_fallback.py create mode 100644 tests/unit/connectors/bacnet/test_configured_i_am_fallback.py diff --git a/docker/Dockerfile b/docker/Dockerfile index 3afdf4a714..269f4a6599 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -47,7 +47,7 @@ python /thingsboard_gateway/tb_gateway.py' > /start-gateway.sh && chmod +x /star python3 -m pip install --no-cache-dir --upgrade pip setuptools wheel && \ python3 -m pip install --no-cache-dir cryptography && \ python3 -m pip install --no-cache-dir -r requirements.txt && \ - RUN rustup self uninstall -y || { \ + rustup self uninstall -y || { \ echo "rustup uninstall failed, removing manually..."; \ rm -rf /root/.rustup /root/.cargo; \ } && \ diff --git a/tests/unit/connectors/bacnet/test_application_read_fallback.py b/tests/unit/connectors/bacnet/test_application_read_fallback.py new file mode 100644 index 0000000000..4160fca9c7 --- /dev/null +++ b/tests/unit/connectors/bacnet/test_application_read_fallback.py @@ -0,0 +1,65 @@ +# Copyright 2026. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from types import SimpleNamespace +from unittest import IsolatedAsyncioTestCase +from unittest.mock import AsyncMock + +from thingsboard_gateway.connectors.bacnet.application import Application + + +class TestApplicationReadFallback(IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + self.app = Application.__new__(Application) + self.app._Application__log = logging.getLogger("bacnet application fallback tests") + + async def test_read_multiple_objects_falls_back_to_read_property(self): + device = SimpleNamespace(details=SimpleNamespace(vendor_id=0, address="192.168.1.199:47808", object_id=101)) + object_list = [{ + "objectType": "analogInput", + "objectId": "5", + "propertyId": "presentValue" + }] + + async def fake_send_wrapper(func, err_msg=None, *args, **kwargs): + if func.__name__ == "request": + return None + if func.__name__ == "read_property": + return 42.5 + return None + + self.app._Application__send_request_wrapper = AsyncMock(side_effect=fake_send_wrapper) + + result = await self.app.read_multiple_objects(device, object_list) + + self.assertEqual(len(result), 1) + self.assertEqual(str(result[0][0]), "analog-input,5") + self.assertEqual(str(result[0][1]), "present-value") + self.assertEqual(result[0][3], 42.5) + + async def test_read_multiple_objects_returns_empty_when_fallback_fails(self): + device = SimpleNamespace(details=SimpleNamespace(vendor_id=0, address="192.168.1.199:47808", object_id=101)) + object_list = [{ + "objectType": "analogInput", + "objectId": "5", + "propertyId": "presentValue" + }] + + self.app._Application__send_request_wrapper = AsyncMock(return_value=None) + + result = await self.app.read_multiple_objects(device, object_list) + + self.assertEqual(result, []) diff --git a/tests/unit/connectors/bacnet/test_configured_i_am_fallback.py b/tests/unit/connectors/bacnet/test_configured_i_am_fallback.py new file mode 100644 index 0000000000..8f47b589ac --- /dev/null +++ b/tests/unit/connectors/bacnet/test_configured_i_am_fallback.py @@ -0,0 +1,166 @@ +# Copyright 2026. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from types import SimpleNamespace +from unittest import IsolatedAsyncioTestCase +from unittest.mock import AsyncMock + +from bacpypes3.basetypes import Segmentation +from bacpypes3.pdu import Address + +from thingsboard_gateway.connectors.bacnet.bacnet_connector import AsyncBACnetConnector + + +class TestConfiguredIAmFallback(IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + self.connector = AsyncBACnetConnector.__new__(AsyncBACnetConnector) + self.connector._AsyncBACnetConnector__log = logging.getLogger("bacnet configured i-am tests") + self.connector._AsyncBACnetConnector__application = AsyncMock() + self.connector._AsyncBACnetConnector__stopped = False + + async def test_build_i_am_from_device_config_without_probe(self): + device_config = { + "objectName": "manual-device", + "vendorIdentifier": "221", + "maxApduLengthAccepted": "2048", + "segmentationSupported": "segmentedBoth" + } + + apdu = await self.connector._AsyncBACnetConnector__build_i_am_like_apdu( + Address("192.168.1.199:47808"), + 101, + device_config + ) + + self.assertIsNotNone(apdu) + self.assertEqual(apdu.iAmDeviceIdentifier, ("device", 101)) + self.assertEqual(apdu.deviceName, "manual-device") + self.assertEqual(apdu.vendorID, 221) + self.assertEqual(apdu.maxAPDULengthAccepted, 2048) + self.assertEqual(apdu.segmentationSupported, Segmentation.segmentedBoth) + self.connector._AsyncBACnetConnector__application.read_property.assert_not_called() + + async def test_build_i_am_returns_defaults_without_probe(self): + self.connector._AsyncBACnetConnector__application.read_property = AsyncMock(side_effect=RuntimeError("No response")) + + apdu = await self.connector._AsyncBACnetConnector__build_i_am_like_apdu( + Address("192.168.1.200:47808"), + 102, + {} + ) + + self.assertIsNotNone(apdu) + self.assertEqual(apdu.deviceName, "102") + self.assertEqual(apdu.vendorID, 0) + self.assertEqual(apdu.maxAPDULengthAccepted, 50) + self.assertEqual(apdu.segmentationSupported, Segmentation.noSegmentation) + self.assertEqual(self.connector._AsyncBACnetConnector__application.read_property.await_count, 4) + + async def test_build_i_am_with_partial_config_and_probe_fill(self): + self.connector._AsyncBACnetConnector__application.read_property = AsyncMock(side_effect=[ + "probed-device-name", + 1476, + Segmentation.segmentedTransmit + ]) + + device_config = { + "vendorIdentifier": 99 + } + + apdu = await self.connector._AsyncBACnetConnector__build_i_am_like_apdu( + Address("192.168.1.201:47808"), + 103, + device_config + ) + + self.assertIsNotNone(apdu) + self.assertEqual(apdu.deviceName, "probed-device-name") + self.assertEqual(apdu.vendorID, 99) + self.assertEqual(apdu.maxAPDULengthAccepted, 1476) + self.assertEqual(apdu.segmentationSupported, Segmentation.segmentedTransmit) + self.assertEqual( + [call.args[2] for call in self.connector._AsyncBACnetConnector__application.read_property.await_args_list], + ["objectName", "maxApduLengthAccepted", "segmentationSupported"] + ) + + async def test_discover_devices_defaults_to_setup_without_discovery(self): + self.connector._AsyncBACnetConnector__config = { + "devices": [ + { + "address": "192.168.1.202:47808", + "deviceId": 104 + } + ] + } + self.connector._AsyncBACnetConnector__add_configured_device_without_iam = AsyncMock() + + await self.connector._AsyncBACnetConnector__discover_devices() + + self.connector._AsyncBACnetConnector__add_configured_device_without_iam.assert_awaited_once() + self.connector._AsyncBACnetConnector__application.do_who_is.assert_not_awaited() + + async def test_discover_devices_runs_who_is_when_setup_without_discovery_disabled(self): + self.connector._AsyncBACnetConnector__config = { + "devices": [ + { + "address": "192.168.1.202:47808", + "deviceId": 104, + "setupWithoutDiscovery": False + } + ] + } + self.connector._AsyncBACnetConnector__add_configured_device_without_iam = AsyncMock() + + await self.connector._AsyncBACnetConnector__discover_devices() + + self.connector._AsyncBACnetConnector__application.do_who_is.assert_awaited_once_with( + device_address="192.168.1.202:47808" + ) + self.connector._AsyncBACnetConnector__add_configured_device_without_iam.assert_not_awaited() + + async def test_discover_devices_uses_who_is_for_pattern_address(self): + self.connector._AsyncBACnetConnector__config = { + "devices": [ + { + "address": "192.168.1.X:47808", + "deviceId": 104 + } + ] + } + self.connector._AsyncBACnetConnector__add_configured_device_without_iam = AsyncMock() + + await self.connector._AsyncBACnetConnector__discover_devices() + + self.connector._AsyncBACnetConnector__application.do_who_is.assert_awaited_once() + self.connector._AsyncBACnetConnector__add_configured_device_without_iam.assert_not_awaited() + + async def test_set_additional_device_info_keeps_manual_name_when_probe_fails(self): + self.connector._AsyncBACnetConnector__application.get_device_name = AsyncMock(return_value=None) + apdu = SimpleNamespace( + pduSource=Address("192.168.1.203:47808"), + iAmDeviceIdentifier=("device", 105), + deviceName="manual-name" + ) + device_config = { + "deviceInfo": { + "deviceNameExpression": "BACnet Device ${objectName}", + "deviceProfileExpression": "default" + } + } + + await self.connector._AsyncBACnetConnector__set_additional_device_info_to_apdu(apdu, device_config) + + self.assertEqual(apdu.deviceName, "manual-name") diff --git a/thingsboard_gateway/connectors/bacnet/application.py b/thingsboard_gateway/connectors/bacnet/application.py index ee463885bb..612b6418af 100644 --- a/thingsboard_gateway/connectors/bacnet/application.py +++ b/thingsboard_gateway/connectors/bacnet/application.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from asyncio import Queue, QueueEmpty, sleep +from asyncio import Queue, wait_for from typing import List from bacpypes3.ipv4.app import NormalApplication, ForeignApplication from bacpypes3.local.device import DeviceObject @@ -92,22 +92,22 @@ async def confirmation(self, apdu: APDU) -> None: async def confirmation_handler(self): while not self.__stopped: try: - apdu = self.__confirmation_queue.get_nowait() + apdu = await wait_for(self.__confirmation_queue.get(), timeout=1.0) if apdu.pduSource is None: self.__log.warning("Received APDU without source address: %s", apdu) - return + continue pdu_source = apdu.pduSource if pdu_source not in self._requests: - return + continue requests = self._requests[pdu_source] for indx, (request, future) in enumerate(requests): if request.apduInvokeID == apdu.apduInvokeID: break else: - return + continue if isinstance(apdu, (SimpleAckPDU, ComplexAckPDU)): future.set_result(apdu) @@ -115,8 +115,8 @@ async def confirmation_handler(self): future.set_exception(apdu) else: raise TypeError("apdu") - except QueueEmpty: - await sleep(0.1) + except TimeoutError: + pass except Exception as e: self.__log.error("APDU confirmation error: %s", e) @@ -209,12 +209,53 @@ async def read_multiple_objects(self, device, object_list): if not isinstance(result, ReadPropertyMultipleACK): self.__log.error("Invalid response type: %s", type(result)) + fallback_result = await self.__read_single_object_with_fallback(device, object_list) + if fallback_result: + return fallback_result return [] decoded_result = self.decode_tag_list(result, device.details.vendor_id) return decoded_result + async def __read_single_object_with_fallback(self, device, object_list): + if len(object_list) != 1: + return [] + + object_config = object_list[0] + try: + vendor_info = get_vendor_info(device.details.vendor_id) + object_id = object_config['objectId'] + if not isinstance(object_id, ObjectIdentifier): + object_id = vendor_info.object_identifier(f"{object_config['objectType']},{object_id}") + + properties = object_config['propertyId'] + if not isinstance(properties, (set, list, tuple)): + properties = {properties} + + fallback_result = [] + for prop in properties: + property_identifier = vendor_info.property_identifier(prop) + property_value = await self.__send_request_wrapper( + self.read_property, + err_msg=f"Failed to read {object_id}:{property_identifier} in readProperty fallback", + address=Address(device.details.address), + objid=object_id, + prop=property_identifier + ) + if property_value is None: + continue + + fallback_result.append((object_id, property_identifier, None, property_value)) + + if fallback_result: + self.__log.debug("readProperty fallback succeeded for %s", object_id) + + return fallback_result + except Exception as e: + self.__log.warning("readProperty fallback failed for object config %s: %s", object_config, e) + return [] + async def get_device_objects(self, device, with_all_properties=False, index_to_read=None): if device.details.is_segmentation_supported(): object_list = await self.get_object_identifiers_with_segmentation(device) @@ -283,6 +324,111 @@ async def get_device_name(self, address: Address, object_id: ObjectIdentifier): return device_name + async def probe_device_properties(self, address: Address, object_id, property_names: list, + timeout: float = 10.0) -> dict: + """ + Read multiple device-object properties in a single ReadPropertyMultiple request. + Returns a dict mapping property name -> value for properties that were successfully read. + The returned keys match the original property_names (camelCase) passed by the caller. + Falls back to individual ReadProperty calls if RPM fails. + The entire probe is guarded by a timeout (default 10s) to avoid hanging + if the device does not respond. + """ + result = {} + + # Build a lookup from kebab-case (as returned by bacpypes3) back to the original + # camelCase names the caller used, so the returned dict keys match the caller's expectations. + kebab_to_original = {str(PropertyIdentifier(p)): p for p in property_names} + + try: + result = await wait_for( + self.__probe_device_properties_impl(address, object_id, property_names, kebab_to_original), + timeout=timeout + ) + except TimeoutError: + self.__log.warning("Probe timed out after %.1fs for %s — device did not respond", timeout, address) + except Exception as e: + self.__log.warning("Probe failed for %s: %s", address, e) + + # Log which properties were successfully read and which are missing + probed = set(result.keys()) + missing = set(property_names) - probed + if probed: + self.__log.info("Probed device %s — read: %s", address, ', '.join(sorted(probed))) + if missing: + self.__log.info("Probed device %s — missing (using defaults): %s", address, ', '.join(sorted(missing))) + if not probed: + self.__log.warning("Probed device %s — could not read any properties, using all defaults", address) + + return result + + async def __probe_device_properties_impl(self, address, object_id, property_names, kebab_to_original): + result = {} + + try: + property_references = [PropertyIdentifier(p) for p in property_names] + ras = ReadAccessSpecification( + objectIdentifier=ObjectIdentifier(f"device,{object_id[1]}" if isinstance(object_id, tuple) else object_id), + listOfPropertyReferences=property_references, + ) + request = ReadPropertyMultipleRequest( + listOfReadAccessSpecs=SequenceOf(ReadAccessSpecification)([ras]), + destination=Address(str(address)), + ) + + rpm_result = await self.__send_request_wrapper( + self.request, + err_msg=f"Failed to probe device properties via RPM for {address}", + apdu=request + ) + + if isinstance(rpm_result, ReadPropertyMultipleACK): + for read_access_result in rpm_result.listOfReadAccessResults: + for element in read_access_result.listOfResults: + try: + kebab_name = str(element.propertyIdentifier) + original_name = kebab_to_original.get(kebab_name, kebab_name) + read_result = element.readResult + if read_result.propertyAccessError: + self.__log.debug("Property %s returned access error for %s", original_name, address) + continue + vendor_info = get_vendor_info(0) + object_class = vendor_info.get_object_class(ObjectIdentifier( + f"device,{object_id[1]}" if isinstance(object_id, tuple) else object_id + )[0]) + property_type = object_class.get_property_type(element.propertyIdentifier) + if property_type is not None: + value = read_result.propertyValue.cast_out(property_type) + result[original_name] = value + except Exception as e: + self.__log.debug("Failed to decode probed property %s: %s", + element.propertyIdentifier, e) + return result + else: + self.__log.debug("RPM probe returned non-ACK (%s) for %s, falling back to individual reads", + type(rpm_result).__name__, address) + except Exception as e: + self.__log.debug("RPM probe failed for %s, falling back to individual reads: %s", address, e) + + # Fallback: read properties individually + for prop_name in property_names: + if prop_name in result: + continue + try: + value = await self.__send_request_wrapper( + self.read_property, + err_msg=f"Failed to probe {prop_name} for {address}", + address=address, + objid=object_id, + prop=prop_name + ) + if value is not None: + result[prop_name] = value + except Exception: + pass + + return result + async def get_router_info(self, device_address: Address): router_info = {} diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py index 57b107e7fd..97901a4eab 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py @@ -22,6 +22,7 @@ from string import ascii_lowercase from random import choice from time import monotonic, sleep +from types import SimpleNamespace from typing import TYPE_CHECKING, Tuple, Any from thingsboard_gateway.connectors.bacnet.constants import ( @@ -51,7 +52,7 @@ from bacpypes3.pdu import Address, IPv4Address from bacpypes3.primitivedata import Null, Real -from bacpypes3.basetypes import DailySchedule, TimeValue, DeviceObjectPropertyReference, ObjectPropertyReference, PropertyIdentifier, ObjectIdentifier +from bacpypes3.basetypes import DailySchedule, TimeValue, DeviceObjectPropertyReference, ObjectPropertyReference, PropertyIdentifier, ObjectIdentifier, Segmentation from thingsboard_gateway.connectors.bacnet.device import Device, Devices from thingsboard_gateway.connectors.bacnet.entities.device_object_config import DeviceObjectConfig from thingsboard_gateway.connectors.bacnet.application import Application @@ -120,6 +121,8 @@ def __init__(self, gateway, config, connector_type): self.__routers_cache = Routers() self.__devices_discover_period = self.__config.get('devicesDiscoverPeriodSeconds', 30) self.__previous_discover_time = 0 + self.__active_reads = 0 + self.__poll_cycle_start = 0 self.__devices_rescan_objects_period = self.__config['application'].get('devicesRescanObjectsPeriodSeconds', 60) def __parse_ede_config(self): @@ -209,10 +212,12 @@ def run(self): async def __rescan_devices(self): while not self.__stopped: try: - device = self.__process_device_rescan_queue.get_nowait() + device = await asyncio.wait_for( + self.__process_device_rescan_queue.get(), timeout=1.0 + ) self.loop.create_task(self.__rescan(device)) - except QueueEmpty: - await asyncio.sleep(.1) + except TimeoutError: + pass async def __start(self): if not self.__is_valid_application_device_section(): @@ -231,14 +236,23 @@ async def __start(self): self.__application = Application(DeviceObjectConfig( self.__config['application']), self.__handle_indication, self.__log) - await self.__discover_devices() - await asyncio.gather(self.__main_loop(), + await asyncio.gather(self.__initial_discovery_then_main_loop(), self.__rescan_devices(), self.__convert_data(), self.__save_data(), self.indication_callback(), self.__application.confirmation_handler()) + async def __initial_discovery_then_main_loop(self): + """Run initial device discovery first, then enter the main polling loop. + This runs inside asyncio.gather so confirmation_handler is active + and can resolve BACnet responses during property probing.""" + try: + await self.__discover_devices() + except Exception as e: + self.__log.error('Error during initial device discovery: %s', e) + await self.__main_loop() + def __is_valid_application_device_section(self) -> bool: app = self.__config.get('application') if not isinstance(app, dict): @@ -364,11 +378,14 @@ async def __add_device(self, apdu, device_config): async def __set_additional_device_info_to_apdu(self, apdu, device_config): if Device.need_to_retrieve_device_name(device_config): + fallback_device_name = apdu.deviceName if hasattr(apdu, 'deviceName') else None apdu.deviceName = None device_name = await self.__application.get_device_name(apdu.pduSource, apdu.iAmDeviceIdentifier) if device_name is not None: apdu.deviceName = device_name + elif fallback_device_name is not None: + apdu.deviceName = fallback_device_name if Device.need_to_retrieve_router_info(device_config): try: @@ -527,6 +544,14 @@ async def __local_objects_discovery(self, device, discover_for, index_to_read=No device.config = new_config + async def __run_discovery(self): + try: + await self.__discover_devices() + except Exception as e: + self.__log.error('Error during device discovery: %s', e) + finally: + self.__discovery_in_progress = False + async def __discover_devices(self): self.__previous_discover_time = monotonic() self.__log.info('Discovering devices...') @@ -542,33 +567,228 @@ async def __discover_devices(self): self.__log.error('Invalid address %s', device_config['address']) continue + is_pattern_address = '*' in device_config['address'] or 'X' in device_config['address'] + setup_without_discovery = self.__is_setup_without_discovery_enabled(device_config) + + if setup_without_discovery and not is_pattern_address: + self.__log.debug('setupWithoutDiscovery is enabled for device %s', device_config['address']) + await self.__add_configured_device_without_iam(device_config) + continue + + if setup_without_discovery and is_pattern_address: + self.__log.debug('setupWithoutDiscovery for pattern address %s is ignored. Falling back to WhoIs', + device_config['address']) + await self.__application.do_who_is(device_address=who_is_address) self.__log.debug('WhoIs request sent to device %s', device_config['address']) except Exception as e: self.__log.error('Error discovering device %s: %s', device_config['address'], e) + async def __add_configured_device_without_iam(self, device_config): + device_id = self.__get_explicit_device_id(device_config.get('deviceId')) + if device_id is None: + return + + configured_address = device_config.get('address') + if configured_address is None: + return + + if '*' in configured_address or 'X' in configured_address: + self.__log.debug('Skipping config-driven add for pattern address %s', configured_address) + return + + try: + address = Address(configured_address) + except Exception as e: + self.__log.error('Invalid BACnet address in config %s: %s', configured_address, e) + return + + device_unique_id = Devices.get_device_unique_id(str(address), device_id) + if len(await self.__devices.get_devices_by_id(device_unique_id)) > 0: + return + + apdu = await self.__build_i_am_like_apdu(address, device_id, device_config) + if apdu is None: + return + + self.__log.debug('Adding configured BACnet device by synthetic I-Am: %s (deviceId=%s)', + configured_address, + device_id) + await self.__add_device(apdu, device_config) + + async def __build_i_am_like_apdu(self, address: Address, device_id: int, device_config: dict): + object_id = ('device', device_id) + + device_name_from_config = self.__get_config_value(device_config, 'objectName') + vendor_id_from_config = self.__get_config_value(device_config, 'vendorID', 'vendorId', 'vendorIdentifier') + max_apdu_from_config = self.__get_config_value(device_config, + 'maxAPDULengthAccepted', + 'maxApduLengthAccepted') + segmentation_from_config = self.__get_config_value(device_config, 'segmentationSupported') + + # Safe defaults used only when property probing and manual i-am values are unavailable. + # Use 300 as default — enough for typical BACnet/MSTP and sufficient to batch + # multiple objects per ReadPropertyMultiple request. + vendor_id = self.__parse_int_or_default(vendor_id_from_config, 0) + max_apdu_length = self.__parse_int_or_default(max_apdu_from_config, 300) + segmentation_supported = self.__parse_segmentation_or_default(segmentation_from_config, + Segmentation.noSegmentation) + device_name = str(device_name_from_config if device_name_from_config is not None else device_id) + + # Collect which properties need to be probed from the device + properties_to_probe = [] + if device_name_from_config is None: + properties_to_probe.append('objectName') + if vendor_id_from_config is None: + properties_to_probe.append('vendorIdentifier') + if max_apdu_from_config is None: + properties_to_probe.append('maxApduLengthAccepted') + if segmentation_from_config is None: + properties_to_probe.append('segmentationSupported') + + probe_succeeded = False + + if properties_to_probe: + probed_values = await self.__application.probe_device_properties( + address, object_id, properties_to_probe + ) + + if probed_values: + probe_succeeded = True + + if 'objectName' in probed_values: + device_name = str(probed_values['objectName']) + if 'vendorIdentifier' in probed_values: + vendor_id = int(probed_values['vendorIdentifier']) + if 'maxApduLengthAccepted' in probed_values: + max_apdu_length = int(probed_values['maxApduLengthAccepted']) + if 'segmentationSupported' in probed_values: + segmentation_supported = self.__parse_segmentation_or_default( + probed_values['segmentationSupported'], segmentation_supported + ) + + if not probe_succeeded: + self.__log.debug('Device %s (deviceId=%s) is being added from config defaults without discovery response', + address, device_id) + + return SimpleNamespace( + pduSource=address, + iAmDeviceIdentifier=object_id, + vendorID=vendor_id, + maxAPDULengthAccepted=max_apdu_length, + segmentationSupported=segmentation_supported, + deviceName=device_name + ) + + @staticmethod + def __get_explicit_device_id(device_id_config): + if isinstance(device_id_config, int): + return device_id_config + + if isinstance(device_id_config, str): + stripped_device_id = device_id_config.strip() + if stripped_device_id.isdigit(): + return int(stripped_device_id) + + if isinstance(device_id_config, list) and len(device_id_config) == 1: + single_value = device_id_config[0] + if isinstance(single_value, int): + return single_value + if isinstance(single_value, str): + stripped_device_id = single_value.strip() + if stripped_device_id.isdigit(): + return int(stripped_device_id) + + return None + + @staticmethod + def __is_setup_without_discovery_enabled(device_config): + value = device_config.get('setupWithoutDiscovery', True) + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() in ('1', 'true', 'yes', 'y', 'on') + + return bool(value) + + @staticmethod + def __get_config_value(config, *keys): + for key in keys: + if key in config: + return config.get(key) + + return None + + @staticmethod + def __parse_int_or_default(value, default): + if value is None: + return default + + try: + return int(value) + except Exception: + return default + + @staticmethod + def __parse_segmentation_or_default(value, default): + if value is None: + return default + + if isinstance(value, Segmentation): + return value + + try: + return Segmentation(value) + except Exception: + pass + + value_str = str(value).strip() + if value_str: + try: + return Segmentation[value_str] + except Exception: + value_lower = value_str.lower() + for segmentation in Segmentation: + if segmentation.name.lower() == value_lower: + return segmentation + + return default + async def __main_loop(self): + self.__discovery_in_progress = False + while not self.__stopped: try: - if monotonic() - self.__previous_discover_time >= self.__devices_discover_period: - await self.__discover_devices() + if (not self.__discovery_in_progress + and monotonic() - self.__previous_discover_time >= self.__devices_discover_period): + self.__discovery_in_progress = True + self.loop.create_task(self.__run_discovery()) except Exception as e: self.__log.error('Error in main loop during discovering devices: %s', e) - await asyncio.sleep(1) - try: - device: Device = self.__process_device_queue.get_nowait() - if device.stopped: - self.__log.trace('Device %s stopped', device) - continue - - self.__log.info('Reading data from device %s...', device.details.address) - self.loop.create_task(self.__read_multiple_properties(device)) - # TODO: Add handling for device activity/inactivity - except QueueEmpty: + drained = False + while True: + try: + device: Device = self.__process_device_queue.get_nowait() + if device.stopped: + self.__log.trace('Device %s stopped', device) + continue + + if self.__active_reads == 0: + self.__poll_cycle_start = monotonic() + + self.__active_reads += 1 + self.__log.info('Reading data from device %s...', device.details.address) + self.loop.create_task(self.__read_multiple_properties(device)) + drained = True + except QueueEmpty: + break + except Exception as e: + self.__log.error('Error processing device requests: %s', e) + break + + if not drained: await asyncio.sleep(.1) - except Exception as e: - self.__log.error('Error processing device requests: %s', e) async def __read_multiple_properties(self, device): reading_started = monotonic() @@ -585,6 +805,16 @@ async def __read_multiple_properties(self, device): reading_ended = monotonic() current_reading_time = reading_ended - reading_started + self.__log.debug('Reading cycle complete for device %s (%s) — %d objects read in %.3fs', + device.name, device.details.address, + len(device.uplink_converter_config.objects_to_read), + current_reading_time) + + self.__active_reads -= 1 + if self.__active_reads == 0: + total_cycle_time = reading_ended - self.__poll_cycle_start + self.__log.debug('=== Poll cycle complete — all devices read in %.3fs ===', total_cycle_time) + if current_reading_time > device.poll_period: device.poll_period = current_reading_time elif current_reading_time < device.poll_period: @@ -678,26 +908,30 @@ async def __prepare_list_of_object_property_references_value(self, value, proper async def __convert_data(self): while not self.__stopped: try: - device, config, values = self.__data_to_convert_queue.get_nowait() + device, config, values = await asyncio.wait_for( + self.__data_to_convert_queue.get(), timeout=1.0 + ) self.__log.trace('%s data to convert: %s', device, values) converted_data = device.uplink_converter.convert(config, values) self.__data_to_save_queue.put_nowait((device, converted_data)) - except QueueEmpty: - await asyncio.sleep(.1) + except TimeoutError: + pass except Exception as e: self.__log.error('Error converting data: %s', e) async def __save_data(self): while not self.__stopped: try: - device, data_to_save = self.__data_to_save_queue.get_nowait() + device, data_to_save = await asyncio.wait_for( + self.__data_to_save_queue.get(), timeout=1.0 + ) self.__log.trace('%s data to save: %s', device, data_to_save) StatisticsService.count_connector_message(self.get_name(), stat_parameter_name='storageMsgPushed') self.__gateway.send_to_storage(self.get_name(), self.get_id(), data_to_save) self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1 - except QueueEmpty: - await asyncio.sleep(.1) + except TimeoutError: + pass except Exception as e: self.__log.error('Error saving data: %s', e) diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py b/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py index 46834b272d..d53e120ce7 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py @@ -51,12 +51,23 @@ def convert(self, config, data): self.__config.device_name) received_data_ts = int(time() * 1000) + # Pre-index data by (object_id, normalized_object_type) for O(1) lookup per config item. + # Each value in data is (object_identifier_tuple, property_identifier, array_index, value). + data_index = {} + for value in data: + obj_id_num = value[0][-1] + obj_type_normalized = TBUtility.kebab_case_to_camel_case(str(value[0][0])) + key = (obj_id_num, obj_type_normalized) + if key not in data_index: + data_index[key] = [] + data_index[key].append(value) + for item_config in config: try: - values_group = self.__find_values(data, - item_config['objectId'], - item_config['objectType'], - item_config['propertyId']) + values_group = self.__find_values_indexed(data_index, + item_config['objectId'], + item_config['objectType'], + item_config['propertyId']) converted_values = self.__convert_data(values_group) if len(converted_values) > 0: @@ -101,10 +112,19 @@ def __convert_data(self, data): try: object_id, value_prop_id, _, value = value_obj if isinstance(value, Exception) or isinstance(value, ErrorType): + error_class = getattr(value, 'errorClass', None) + error_code = getattr(value, 'errorCode', None) self.__log.error("Error converting object with objectId: \"%s\", and propertyId: \"%s\". Error: %s", object_id, value_prop_id, value) + if isinstance(value, ErrorType): + self.__log.error("BACnet error details for objectId: \"%s\", propertyId: \"%s\". " + "errorClass: %s, errorCode: %s", + object_id, + value_prop_id, + error_class, + error_code) continue if isinstance(value, DateTime): @@ -152,6 +172,42 @@ def _get_device_report_strategy(self, report_strategy, device_name): except ValueError as e: self.__log.trace("Report strategy config is not specified for device %s: %s", device_name, e) + def __find_values_indexed(self, data_index, object_id, object_type, property_id): + """ + O(1) lookup version using pre-built index dict keyed by (object_id, normalized_type). + """ + required_object_type = TBUtility.kebab_case_to_camel_case(object_type) + key = (object_id, required_object_type) + candidates = data_index.get(key, []) + + if not candidates: + return [] + + # Normalize property_id filter once + if isinstance(property_id, set): + normalized_props = {TBUtility.kebab_case_to_camel_case(p) for p in property_id} + elif isinstance(property_id, str): + normalized_props = {TBUtility.kebab_case_to_camel_case(property_id)} + else: + normalized_props = {TBUtility.kebab_case_to_camel_case(str(p)) for p in property_id} + + result = [] + seen = set() + + for value in candidates: + prop_normalized = TBUtility.kebab_case_to_camel_case(str(value[1])) + if prop_normalized not in normalized_props: + continue + + dedup_key = (value[0][-1], required_object_type, prop_normalized) + if dedup_key in seen: + continue + + seen.add(dedup_key) + result.append(value) + + return result + def __find_values(self, data, object_id, object_type, property_id): required_object_type = TBUtility.kebab_case_to_camel_case(object_type) result = []