Skip to content

Commit 122f40b

Browse files
committed
Adjusted rate limits processing and message splitting
1 parent e3c01c7 commit 122f40b

File tree

3 files changed

+117
-91
lines changed

3 files changed

+117
-91
lines changed

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
with open(path.join(this_directory, 'README.md')) as f:
2222
long_description = f.read()
2323

24-
VERSION = "1.10.7"
24+
VERSION = "1.10.8"
2525

2626
setup(
2727
version=VERSION,

tb_device_mqtt.py

+114-87
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import logging
1616
from copy import deepcopy
1717
from inspect import signature
18+
from logging import Logger
1819
from time import sleep
1920

2021
import paho.mqtt.client as paho
@@ -163,79 +164,80 @@ def get(self):
163164

164165
class RateLimit:
165166
def __init__(self, rate_limit, name=None, percentage=80):
166-
self.__no_limit = False
167-
self.__rate_limit_dict = {}
167+
self._no_limit = False
168+
self._rate_limit_dict = {}
168169
self.__lock = RLock()
169-
self.__minimal_timeout = DEFAULT_TIMEOUT
170-
self.__minimal_limit = 1000000000
170+
self._minimal_timeout = DEFAULT_TIMEOUT
171+
self._minimal_limit = 1000000000
171172
from_dict = isinstance(rate_limit, dict)
172173
if from_dict:
173-
self.__rate_limit_dict = rate_limit.get('rateLimit', rate_limit)
174+
self._rate_limit_dict = rate_limit.get('rateLimit', rate_limit)
174175
name = rate_limit.get('name', name)
175176
percentage = rate_limit.get('percentage', percentage)
176-
self.no_limit = rate_limit.get('no_limit', False)
177+
self._no_limit = rate_limit.get('no_limit', False)
177178
self.name = name
178179
self.percentage = percentage
179180
self.__start_time = int(monotonic())
180181
if not from_dict:
181182
if ''.join(c for c in rate_limit if c not in [' ', ',', ';']) in ("", "0:0"):
182-
self.__no_limit = True
183+
self._no_limit = True
184+
return
183185
rate_configs = rate_limit.split(";")
184186
if "," in rate_limit:
185187
rate_configs = rate_limit.split(",")
186188
for rate in rate_configs:
187189
if rate == "":
188190
continue
189191
rate = rate.split(":")
190-
self.__rate_limit_dict[int(rate[1])] = {"counter": 0,
192+
self._rate_limit_dict[int(rate[1])] = {"counter": 0,
191193
"start": int(monotonic()),
192194
"limit": int(int(rate[0]) * self.percentage / 100)}
193195
log.debug("Rate limit %s set to values: " % self.name)
194196
with self.__lock:
195-
if not self.__no_limit:
196-
for rate_limit_time in self.__rate_limit_dict:
197+
if not self._no_limit:
198+
for rate_limit_time in self._rate_limit_dict:
197199
log.debug("Time: %s, Limit: %s", rate_limit_time,
198-
self.__rate_limit_dict[rate_limit_time]["limit"])
199-
if self.__rate_limit_dict[rate_limit_time]["limit"] < self.__minimal_limit:
200-
self.__minimal_limit = self.__rate_limit_dict[rate_limit_time]["limit"]
201-
if rate_limit_time < self.__minimal_limit:
202-
self.__minimal_timeout = rate_limit_time + 1
200+
self._rate_limit_dict[rate_limit_time]["limit"])
201+
if self._rate_limit_dict[rate_limit_time]["limit"] < self._minimal_limit:
202+
self._minimal_limit = self._rate_limit_dict[rate_limit_time]["limit"]
203+
if rate_limit_time < self._minimal_limit:
204+
self._minimal_timeout = rate_limit_time + 1
203205
else:
204206
log.debug("No rate limits.")
205207

206208
def increase_rate_limit_counter(self, amount=1):
207-
if self.__no_limit:
209+
if self._no_limit:
208210
return
209211
with self.__lock:
210-
for rate_limit_time in self.__rate_limit_dict:
211-
self.__rate_limit_dict[rate_limit_time]["counter"] += amount
212+
for rate_limit_time in self._rate_limit_dict:
213+
self._rate_limit_dict[rate_limit_time]["counter"] += amount
212214

213215
def check_limit_reached(self, amount=1):
214-
if self.__no_limit:
216+
if self._no_limit:
215217
return False
216218
with self.__lock:
217219
current_time = int(monotonic())
218-
for rate_limit_time, rate_limit_info in self.__rate_limit_dict.items():
219-
if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time <= current_time:
220-
self.__rate_limit_dict[rate_limit_time]["start"] = current_time
221-
self.__rate_limit_dict[rate_limit_time]["counter"] = 0
220+
for rate_limit_time, rate_limit_info in self._rate_limit_dict.items():
221+
if self._rate_limit_dict[rate_limit_time]["start"] + rate_limit_time <= current_time:
222+
self._rate_limit_dict[rate_limit_time]["start"] = current_time
223+
self._rate_limit_dict[rate_limit_time]["counter"] = 0
222224
if rate_limit_info['counter'] + amount > rate_limit_info['limit']:
223225
return rate_limit_time
224226
return False
225227

226228
def get_minimal_limit(self):
227-
return self.__minimal_limit
229+
return self._minimal_limit if self.has_limit() else 0
228230

229231
def get_minimal_timeout(self):
230-
return self.__minimal_timeout
232+
return self._minimal_timeout if self.has_limit() else 0
231233

232234
def has_limit(self):
233-
return not self.__no_limit
235+
return not self._no_limit
234236

235237
def set_limit(self, rate_limit, percentage=80):
236238
with self.__lock:
237-
old_rate_limit_dict = deepcopy(self.__rate_limit_dict)
238-
self.__rate_limit_dict = {}
239+
old_rate_limit_dict = deepcopy(self._rate_limit_dict)
240+
self._rate_limit_dict = {}
239241
self.percentage = percentage if percentage != 0 else self.percentage
240242
rate_configs = rate_limit.split(";")
241243
if "," in rate_limit:
@@ -246,26 +248,27 @@ def set_limit(self, rate_limit, percentage=80):
246248
rate = rate.split(":")
247249
rate_limit_time = int(rate[1])
248250
limit = int(int(rate[0]) * percentage / 100)
249-
self.__rate_limit_dict[int(rate[1])] = {
251+
self._rate_limit_dict[int(rate[1])] = {
250252
"counter": old_rate_limit_dict.get(rate_limit_time, {}).get('counter', 0),
251-
"start": self.__rate_limit_dict.get(rate_limit_time, {}).get('start', int(monotonic())),
253+
"start": self._rate_limit_dict.get(rate_limit_time, {}).get('start', int(monotonic())),
252254
"limit": limit}
253-
if rate_limit_time < self.__minimal_limit:
254-
self.__minimal_timeout = rate_limit_time + 1
255-
if limit < self.__minimal_limit:
256-
self.__minimal_limit = limit
257-
if self.__rate_limit_dict:
258-
self.__no_limit = False
255+
if rate_limit_time < self._minimal_limit:
256+
self._minimal_timeout = rate_limit_time + 1
257+
if limit < self._minimal_limit:
258+
self._minimal_limit = limit
259+
if self._rate_limit_dict:
260+
self._no_limit = False
259261
log.debug("Rate limit set to values: ")
260-
for rate_limit_time in self.__rate_limit_dict:
261-
log.debug("Time: %s, Limit: %s", rate_limit_time, self.__rate_limit_dict[rate_limit_time]["limit"])
262+
for rate_limit_time in self._rate_limit_dict:
263+
log.debug("Time: %s, Limit: %s", rate_limit_time, self._rate_limit_dict[rate_limit_time]["limit"])
262264

265+
@property
263266
def __dict__(self):
264267
return {
265-
"rateLimit": self.__rate_limit_dict,
268+
"rateLimit": self._rate_limit_dict,
266269
"name": self.name,
267270
"percentage": self.percentage,
268-
"no_limit": self.__no_limit
271+
"no_limit": self._no_limit
269272
}
270273

271274
@staticmethod
@@ -380,9 +383,31 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
380383
self.__firmware_request_id = 0
381384
self.__chunk_size = chunk_size
382385
self.firmware_received = False
383-
self.__updating_thread = Thread(target=self.__update_thread, name="Updating thread")
384-
self.__updating_thread.daemon = True
385386
self.rate_limits_received = False
387+
self.__request_service_configuration_required = False
388+
self.__service_loop = Thread(target=self.__service_loop, name="Service loop", daemon=True)
389+
self.__service_loop.start()
390+
391+
def __service_loop(self):
392+
while not self.stopped:
393+
if self.__request_service_configuration_required:
394+
self.request_service_configuration(self.service_configuration_callback)
395+
self.__request_service_configuration_required = False
396+
elif self.firmware_received:
397+
self.current_firmware_info[FW_STATE_ATTR] = "UPDATING"
398+
self.send_telemetry(self.current_firmware_info)
399+
sleep(1)
400+
401+
self.__on_firmware_received(self.firmware_info.get(FW_VERSION_ATTR))
402+
403+
self.current_firmware_info = {
404+
"current_" + FW_TITLE_ATTR: self.firmware_info.get(FW_TITLE_ATTR),
405+
"current_" + FW_VERSION_ATTR: self.firmware_info.get(FW_VERSION_ATTR),
406+
FW_STATE_ATTR: "UPDATED"
407+
}
408+
self.send_telemetry(self.current_firmware_info)
409+
self.firmware_received = False
410+
sleep(0.05)
386411

387412
def _on_publish(self, client, userdata, mid):
388413
# log.debug("Message %s was published, by client with id: %r", mid ,id(client))
@@ -413,7 +438,7 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params):
413438
self._subscribe_to_topic(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service)
414439
self._subscribe_to_topic(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service)
415440
self._subscribe_to_topic(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service)
416-
self.request_service_configuration(self.service_configuration_callback)
441+
self.__request_service_configuration_required = True
417442
else:
418443
if isinstance(result_code, int):
419444
if result_code in RESULT_CODES:
@@ -602,25 +627,6 @@ def __on_firmware_received(self, version_to):
602627
firmware_file.write(self.firmware_data)
603628
log.info('Firmware is updated!\n Current firmware version is: %s' % version_to)
604629

605-
def __update_thread(self):
606-
while not self.stopped:
607-
if self.firmware_received:
608-
self.current_firmware_info[FW_STATE_ATTR] = "UPDATING"
609-
self.send_telemetry(self.current_firmware_info)
610-
sleep(1)
611-
612-
self.__on_firmware_received(self.firmware_info.get(FW_VERSION_ATTR))
613-
614-
self.current_firmware_info = {
615-
"current_" + FW_TITLE_ATTR: self.firmware_info.get(FW_TITLE_ATTR),
616-
"current_" + FW_VERSION_ATTR: self.firmware_info.get(FW_VERSION_ATTR),
617-
FW_STATE_ATTR: "UPDATED"
618-
}
619-
self.send_telemetry(self.current_firmware_info)
620-
self.firmware_received = False
621-
622-
sleep(0.2)
623-
624630
@staticmethod
625631
def _decode(message):
626632
try:
@@ -701,10 +707,22 @@ def on_service_configuration(self, _, response, *args, **kwargs):
701707
self._telemetry_rate_limit.set_limit(rate_limits_config.get('telemetryMessages'), percentage=80)
702708
if rate_limits_config.get('telemetryDataPoints'):
703709
self._telemetry_dp_rate_limit.set_limit(rate_limits_config.get('telemetryDataPoints'), percentage=80)
710+
704711
if service_config.get('maxInflightMessages'):
705-
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
706-
self._telemetry_rate_limit.get_minimal_limit(),
707-
service_config.get('maxInflightMessages', 100)) * 80 / 100)
712+
use_messages_rate_limit_factor = self._messages_rate_limit.has_limit()
713+
use_telemetry_rate_limit_factor = self._telemetry_rate_limit.has_limit()
714+
if use_messages_rate_limit_factor and use_telemetry_rate_limit_factor:
715+
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
716+
self._telemetry_rate_limit.get_minimal_limit(),
717+
service_config.get('maxInflightMessages', 100)) * 80 / 100)
718+
elif use_messages_rate_limit_factor:
719+
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
720+
service_config.get('maxInflightMessages', 100)) * 80 / 100)
721+
elif use_telemetry_rate_limit_factor:
722+
max_inflight_messages = int(min(self._telemetry_rate_limit.get_minimal_limit(),
723+
service_config.get('maxInflightMessages', 100)) * 80 / 100)
724+
else:
725+
max_inflight_messages = int(service_config.get('maxInflightMessages', 100) * 80 / 100)
708726
self.max_inflight_messages_set(max_inflight_messages)
709727
self.max_queued_messages_set(max_inflight_messages)
710728
if service_config.get('maxPayloadSize'):
@@ -718,6 +736,8 @@ def set_server_side_rpc_request_handler(self, handler):
718736
self.__device_on_server_side_rpc_response = handler
719737

720738
def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_limit=None, amount=1):
739+
if not message_rate_limit.has_limit() and not (dp_rate_limit is None or dp_rate_limit.has_limit()):
740+
return
721741
start_time = int(monotonic())
722742
dp_rate_limit_timeout = dp_rate_limit.get_minimal_timeout() if dp_rate_limit is not None else 0
723743
timeout = max(message_rate_limit.get_minimal_timeout(), dp_rate_limit_timeout, timeout) + 10
@@ -779,7 +799,7 @@ def _wait_until_current_queued_messages_processed(self):
779799
connection_was_lost = True
780800
if current_out_messages >= inflight_messages:
781801
sleep(.001)
782-
if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost:
802+
if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost or self.stopped:
783803
break
784804

785805
def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
@@ -863,29 +883,33 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
863883

864884
def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit,
865885
topic):
866-
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
867-
rate_limited = self._wait_for_rate_limit_released(timeout,
868-
message_rate_limit=msg_rate_limit,
869-
dp_rate_limit=dp_rate_limit,
870-
amount=part['datapoints'])
871-
if rate_limited:
872-
return rate_limited
873-
msg_rate_limit.increase_rate_limit_counter()
886+
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
887+
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
888+
rate_limited = self._wait_for_rate_limit_released(timeout,
889+
message_rate_limit=msg_rate_limit,
890+
dp_rate_limit=dp_rate_limit,
891+
amount=part['datapoints'])
892+
if rate_limited:
893+
return rate_limited
894+
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
895+
msg_rate_limit.increase_rate_limit_counter()
874896
kwargs["payload"] = dumps(part['message'])
875897
self._wait_until_current_queued_messages_processed()
876898
if not self.stopped:
877899
if device is not None:
878900
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
879-
if part['datapoints'] > 0:
880-
log.debug("Sending message with %i datapoints", part['datapoints'])
881-
log.debug("Message payload: %r", kwargs["payload"])
882-
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
883-
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
884-
else:
885-
log.debug("Sending message with %r", kwargs["payload"])
886-
log.debug("Message payload: %r", kwargs["payload"])
887-
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
888-
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
901+
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
902+
if part['datapoints'] > 0:
903+
log.debug("Sending message with %i datapoints", part['datapoints'])
904+
if log.isEnabledFor(5) and hasattr(log, 'trace'):
905+
log.trace("Message payload: %r", kwargs["payload"])
906+
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
907+
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
908+
else:
909+
if log.isEnabledFor(5) and hasattr(log, 'trace'):
910+
log.trace("Sending message with %r", kwargs["payload"])
911+
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
912+
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
889913
result = self._client.publish(**kwargs)
890914
if result.rc == MQTT_ERR_QUEUE_SIZE:
891915
while not self.stopped and result.rc == MQTT_ERR_QUEUE_SIZE:
@@ -1124,7 +1148,8 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
11241148
if not isinstance(message_pack, list):
11251149
message_pack = [message_pack]
11261150

1127-
datapoints_max_count = max(datapoints_max_count - 1, 1)
1151+
datapoints_max_count = max(datapoints_max_count - 1, 0)
1152+
11281153
append_split_message = split_messages.append
11291154

11301155
final_message_item = {'data': [], 'datapoints': 0}
@@ -1155,11 +1180,13 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
11551180
value = values[data_key]
11561181
data_key_size = len(data_key) + len(str(value))
11571182

1158-
if len(message_item_values_with_allowed_size) < datapoints_max_count and current_size + data_key_size < max_payload_size:
1183+
if ((datapoints_max_count == 0 or len(message_item_values_with_allowed_size) < datapoints_max_count)
1184+
and current_size + data_key_size < max_payload_size):
11591185
message_item_values_with_allowed_size[data_key] = value
11601186
current_size += data_key_size
11611187

1162-
if len(message_item_values_with_allowed_size) >= datapoints_max_count + current_size // 1024 or current_size + data_key_size >= max_payload_size:
1188+
if ((datapoints_max_count > 0 and len(message_item_values_with_allowed_size) >= datapoints_max_count + current_size // 1024)
1189+
or current_size + data_key_size >= max_payload_size):
11631190
if ts:
11641191
message_chunk = {"ts": ts, "values": message_item_values_with_allowed_size.copy()}
11651192
if 'metadata' in message:

0 commit comments

Comments
 (0)