Skip to content

Commit

Permalink
Added service configuration processing, small fixes for examples
Browse files Browse the repository at this point in the history
  • Loading branch information
samson0v committed Aug 28, 2024
1 parent 2693a89 commit af0f80f
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 47 deletions.
2 changes: 1 addition & 1 deletion examples/device/client_rpc_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def callback(request_id, resp_body, exception=None):
logging.error("Exception: " + str(exception))
else:
logging.info("request id: {request_id}, response body: {resp_body}".format(request_id=request_id,
resp_body=resp_body))
resp_body=resp_body))


def main():
Expand Down
2 changes: 1 addition & 1 deletion examples/device/hardware_specs_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def on_upload_frequency_change(value, error):
def on_server_side_rpc_request(request_id, request_body):
print(client, request_id, request_body)
if request_body["method"] == "getCPULoad":
client.send_rpc_reply(request_id, {"CPU percent": psutil.cpu_percent()})
client.send_rpc_reply(request_id, {"CPU percent": psutil.cpu_percent(interval=0.1)})
elif request_body["method"] == "getMemoryUsage":
client.send_rpc_reply(request_id, {"Memory": psutil.virtual_memory().percent})
else:
Expand Down
2 changes: 1 addition & 1 deletion examples/gateway/respond_to_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def rpc_request_response(gateway, request_body):
req_id = request_body["data"]["id"]
# dependently of request method we send different data back
if method == 'getCPULoad':
gateway.gw_send_rpc_reply(device, req_id, psutil.cpu_percent())
gateway.gw_send_rpc_reply(device, req_id, psutil.cpu_percent(interval=0.1))
elif method == 'getMemoryLoad':
gateway.gw_send_rpc_reply(device, req_id, psutil.virtual_memory().percent)
else:
Expand Down
156 changes: 122 additions & 34 deletions tb_device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ def __init__(self, rate_limit):
self.__no_limit = False
if ''.join(c for c in rate_limit if c not in [' ', ',', ';']) in ("", "0:0"):
self.__no_limit = True
self.__config = rate_limit
self.__rate_limit_dict = {}
self.__lock = RLock()
rate_configs = rate_limit.split(";")
Expand Down Expand Up @@ -248,9 +247,38 @@ def get_minimal_timeout(self):
def has_limit(self):
return not self.__no_limit

def set_limit(self, rate_limit, percentage=0):
self.__rate_limit_dict = {}
rate_configs = rate_limit.split(";")
if "," in rate_limit:
rate_configs = rate_limit.split(",")
for rate in rate_configs:
if rate == "":
continue
rate = rate.split(":")
self.__rate_limit_dict[int(rate[1])] = {"counter": self.__rate_limit_dict[int(rate[1])]['counter'],
"start": self.__rate_limit_dict[int(rate[1])]['start'],
"limit": int(rate[0]) * percentage / 100}

@staticmethod
def get_rate_limits_by_host(host, rate_limit, dp_rate_limit):
if rate_limit == "DEFAULT_RATE_LIMIT":
rate_limit = RateLimit.get_rate_limit_by_host(host, rate_limit)
dp_rate_limit = RateLimit.get_dp_rate_limit_by_host(host, dp_rate_limit)

return rate_limit, dp_rate_limit

@staticmethod
def get_rate_limit_by_host(host, rate_limit):
if rate_limit == "DEFAULT_TELEMETRY_RATE_LIMIT":
if "thingsboard.cloud" in host:
rate_limit = "100:1,4000:60,70000:3600,"
elif "tb" in host and "cloud" in host:
rate_limit = "10:1,300:60,7000:3600,"
elif "demo.thingsboard.io" in host:

Check failure

Code scanning / CodeQL

Incomplete URL substring sanitization High

The string
demo.thingsboard.io
may be at an arbitrary position in the sanitized URL.
rate_limit = "10:1,300:60,"
else:
rate_limit = "0:0,"
elif rate_limit == "DEFAULT_MESSAGES_RATE_LIMIT":
if "thingsboard.cloud" in host:
rate_limit = "100:1,4000:60,70000:3600,"
elif "tb" in host and "cloud" in host:
Expand All @@ -262,7 +290,11 @@ def get_rate_limits_by_host(host, rate_limit, dp_rate_limit):
else:
rate_limit = rate_limit

if dp_rate_limit == "DEFAULT_RATE_LIMIT":
return rate_limit

@staticmethod
def get_dp_rate_limit_by_host(host, dp_rate_limit):
if dp_rate_limit == "DEFAULT_TELEMETRY_DP_RATE_LIMIT":
if "thingsboard.cloud" in host:
dp_rate_limit = "190:1,5900:60,13900:3600,"
elif "tb" in host and "cloud" in host:
Expand All @@ -274,13 +306,18 @@ def get_rate_limits_by_host(host, rate_limit, dp_rate_limit):
else:
dp_rate_limit = dp_rate_limit

return rate_limit, dp_rate_limit
return dp_rate_limit


class TBDeviceMqttClient:
"""ThingsBoard MQTT client. This class provides interface to send data to ThingsBoard and receive data from"""

EMPTY_RATE_LIMIT = RateLimit('0:0,')

def __init__(self, host, port=1883, username=None, password=None, quality_of_service=None, client_id="",
chunk_size=0, rate_limit="DEFAULT_RATE_LIMIT", dp_rate_limit="DEFAULT_RATE_LIMIT"):
chunk_size=0, messages_rate_limit="DEFAULT_MESSAGES_RATE_LIMIT",
telemetry_rate_limit="DEFAULT_TELEMETRY_RATE_LIMIT",
telemetry_dp_rate_limit="DEFAULT_TELEMETRY_DP_RATE_LIMIT", max_payload_size=8196):
self._client = paho.Client(protocol=5, client_id=client_id)
self.quality_of_service = quality_of_service if quality_of_service is not None else 1
self.__host = host
Expand All @@ -301,10 +338,17 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
self.__device_sub_dict = {}
self.__device_client_rpc_dict = {}
self.__attr_request_number = 0
rate_limit, dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host, rate_limit, dp_rate_limit)
self.__rate_limit = RateLimit(rate_limit)
self.__dp_rate_limit = RateLimit(dp_rate_limit)
self._client.max_inflight_messages_set(self.__rate_limit.get_minimal_limit())
self.max_payload_size = max_payload_size
self.service_configuration_callback = self.__on_service_configuration
telemetry_rate_limit, telemetry_dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host,
telemetry_rate_limit,
telemetry_dp_rate_limit)
messages_rate_limit = RateLimit.get_rate_limit_by_host(self.__host, messages_rate_limit)

self._messages_rate_limit = RateLimit(messages_rate_limit)
self.__telemetry_rate_limit = RateLimit(telemetry_rate_limit)
self.__telemetry_dp_rate_limit = RateLimit(telemetry_dp_rate_limit)
self._client.max_inflight_messages_set(self.__telemetry_rate_limit.get_minimal_limit())
self.__attrs_request_timeout = {}
self.__timeout_thread = Thread(target=self.__timeout_check)
self.__timeout_thread.daemon = True
Expand Down Expand Up @@ -345,6 +389,7 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params):
self._subscribe_to_topic(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service)
self._subscribe_to_topic(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service)
self._subscribe_to_topic(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service)
self.request_service_configuration(self.service_configuration_callback)
else:
if isinstance(result_code, int):
if result_code in RESULT_CODES:
Expand Down Expand Up @@ -599,20 +644,39 @@ def send_rpc_call(self, method, params, callback):
RPC_REQUEST_TOPIC + str(rpc_request_id),
self.quality_of_service)

def request_service_configuration(self, callback):
self.send_rpc_call("getServiceConfiguration", {}, callback)

def __on_service_configuration(self, _, service_config, *args, **kwargs):
if service_config.get("deviceMsgRateLimit"):
self._messages_rate_limit.set_limit(service_config.get("deviceMsgRateLimit"), percentage=80)
if service_config.get('deviceTelemetryMsgRateLimit'):
self.__telemetry_rate_limit.set_limit(service_config.get('deviceTelemetryMsgRateLimit'), percentage=80)
if service_config.get('deviceTelemetryDataPointsRateLimit'):
self.__telemetry_dp_rate_limit.set_limit(service_config.get('deviceTelemetryDataPointsRateLimit'),
percentage=80)
if service_config.get('maxInflightMessages'):
self.max_inflight_messages_set(int(service_config.get('maxInflightMessages')))
if service_config.get('maxPayloadSize'):
self.max_payload_size = int(service_config.get('maxPayloadSize'))
if service_config.get('payloadType'):
pass

def set_server_side_rpc_request_handler(self, handler):
"""Set the callback that will be called when a server-side RPC is received."""
self.__device_on_server_side_rpc_response = handler

def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_limit, amount=1):
def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_limit=None, amount=1):
start_time = int(time())
timeout = max(message_rate_limit.get_minimal_timeout(), dp_rate_limit.get_minimal_timeout(), timeout) + 10
dp_rate_limit_timeout = dp_rate_limit.get_minimal_timeout() if dp_rate_limit is not None else 0
timeout = max(message_rate_limit.get_minimal_timeout(), dp_rate_limit_timeout, timeout) + 10
timeout_updated = False
disconnected = False
limit_reached_check = True
log_posted = False
while limit_reached_check:
limit_reached_check = (message_rate_limit.check_limit_reached()
or dp_rate_limit.check_limit_reached(amount=amount)
or (dp_rate_limit is not None and dp_rate_limit.check_limit_reached(amount=amount))
or not self.is_connected())
if not timeout_updated and limit_reached_check:
timeout = max(timeout, limit_reached_check) + 10
Expand Down Expand Up @@ -642,48 +706,69 @@ def wait_until_current_queued_messages_processed(self):
previous_notification_time = int(time())
sleep(.001)

def _send_request(self, type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
msg_rate_limit=None, dp_rate_limit=None):
if msg_rate_limit is None:
msg_rate_limit = self.__rate_limit
if kwargs.get('topic') == TELEMETRY_TOPIC:
msg_rate_limit = self.__telemetry_rate_limit
else:
msg_rate_limit = self._messages_rate_limit
if dp_rate_limit is None:
dp_rate_limit = self.__dp_rate_limit
if kwargs.get('topic') == TELEMETRY_TOPIC:
dp_rate_limit = self.__telemetry_dp_rate_limit
else:
dp_rate_limit = self.EMPTY_RATE_LIMIT
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
msg_rate_limit.increase_rate_limit_counter()
is_reached = self._wait_for_rate_limit_released(timeout, msg_rate_limit, dp_rate_limit)
if is_reached:
return is_reached

if type == TBSendMethod.PUBLISH:
if self.__rate_limit.has_limit():
if _type == TBSendMethod.PUBLISH:
if msg_rate_limit.has_limit():
return self.__send_publish_with_limitations(kwargs, timeout, device, msg_rate_limit, dp_rate_limit)
else:
if "payload" in kwargs and not isinstance(kwargs["payload"], str):
kwargs["payload"] = dumps(kwargs["payload"])
return TBPublishInfo(self._client.publish(**kwargs))
elif type == TBSendMethod.SUBSCRIBE:
elif _type == TBSendMethod.SUBSCRIBE:
return self._client.subscribe(**kwargs)
elif type == TBSendMethod.UNSUBSCRIBE:
elif _type == TBSendMethod.UNSUBSCRIBE:
return self._client.unsubscribe(**kwargs)

def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate_limit=None, dp_rate_limit=None):
def __get_rate_limits_by_topic(self, topic, device=None, msg_rate_limit=None, dp_rate_limit=None):
if device is not None:
return msg_rate_limit, dp_rate_limit
else:
if topic == TELEMETRY_TOPIC:
return self.__telemetry_rate_limit, self.__telemetry_dp_rate_limit
else:
return self._messages_rate_limit, None

def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate_limit: RateLimit = None,
dp_rate_limit: RateLimit = None):
data_for_analysis = data = kwargs.get("payload")
if isinstance(data, str):
data_for_analysis = loads(data)
datapoints = self._count_datapoints_in_message(data_for_analysis, device=device)
datapoints = -1
if dp_rate_limit.has_limit():
datapoints = self._count_datapoints_in_message(data_for_analysis, device=device)
payload = data
if self.__dp_rate_limit.get_minimal_limit() < datapoints:
if dp_rate_limit.has_limit() and datapoints >= 0 and dp_rate_limit.get_minimal_limit() < datapoints:
log.debug("Rate limit is too low, cannot send message with %i datapoints, "
"splitting to messages with %i datapoints",
datapoints, dp_rate_limit.get_minimal_limit())
if device is None or data.get(device) is None:
device_split_messages = self._split_message(data, dp_rate_limit.get_minimal_limit())
device_split_messages = self._split_message(data, dp_rate_limit.get_minimal_limit(),
self.max_payload_size)
split_messages = [{'message': split_message['data'], 'datapoints': split_message['datapoints']}
for split_message in device_split_messages]
else:
device_data = data.get(device)
device_split_messages = self._split_message(device_data, dp_rate_limit.get_minimal_limit())
split_messages = [{'message': {device: [split_message['data']]}, 'datapoints': split_message['datapoints']}
device_split_messages = self._split_message(device_data, dp_rate_limit.get_minimal_limit(),
self.max_payload_size)
split_messages = [
{'message': {device: [split_message['data']]}, 'datapoints': split_message['datapoints']}
for split_message in device_split_messages]
if len(split_messages) == 0:
log.debug("Cannot split message to smaller parts!")
Expand All @@ -699,11 +784,12 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
results.append(self._client.publish(**kwargs))
return TBPublishInfo(results)
else:
dp_rate_limit.increase_rate_limit_counter(datapoints)
self._wait_for_rate_limit_released(timeout,
message_rate_limit=msg_rate_limit,
dp_rate_limit=dp_rate_limit,
amount=datapoints)
if dp_rate_limit is not None:
dp_rate_limit.increase_rate_limit_counter(datapoints)
self._wait_for_rate_limit_released(timeout,
message_rate_limit=msg_rate_limit,
dp_rate_limit=dp_rate_limit,
amount=datapoints)
kwargs["payload"] = dumps(payload)
return TBPublishInfo(self._client.publish(**kwargs))

Expand All @@ -720,7 +806,7 @@ def _subscribe_to_topic(self, topic, qos=None, timeout=DEFAULT_TIMEOUT):
waiting_for_connection_message_time = time()
sleep(0.01)

return self._send_request(TBSendMethod.SUBSCRIBE, {"topic": topic, "qos": qos}, timeout)
return self._send_request(TBSendMethod.SUBSCRIBE, {"topic": topic, "qos": qos}, timeout, msg_rate_limit=self._messages_rate_limit)

def _publish_data(self, data, topic, qos, timeout=DEFAULT_TIMEOUT, device=None,
msg_rate_limit=None, dp_rate_limit=None):
Expand Down Expand Up @@ -921,7 +1007,7 @@ def provision(host,
return provisioning_client.get_credentials()

@staticmethod
def _split_message(message_pack, max_size):
def _split_message(message_pack, max_size, max_payload_size):
if message_pack is None:
return []
split_messages = []
Expand Down Expand Up @@ -961,10 +1047,12 @@ def _split_message(message_pack, max_size):
message_item_values_with_allowed_size = {}
for current_data_key_index in range(len(values_data_keys)):
data_key = values_data_keys[current_data_key_index]
if len(message_item_values_with_allowed_size.keys()) < max_size:
if len(message_item_values_with_allowed_size.keys()) < max_size and len(
str(message_item_values_with_allowed_size)) < max_payload_size:
message_item_values_with_allowed_size[data_key] = values[data_key]
if (len(message_item_values_with_allowed_size.keys()) >= max_size
or current_data_key_index == len(values_data_keys) - 1):
or current_data_key_index == len(values_data_keys) - 1) or len(
str(message_item_values_with_allowed_size)) >= max_payload_size:
if ts is not None:
final_message_item['data'] = {"ts": ts, "values": message_item_values_with_allowed_size}
else:
Expand Down
Loading

0 comments on commit af0f80f

Please sign in to comment.