Skip to content

Commit c0bb7a7

Browse files
committed
Particularly fixed split messages, to avoid data loss
1 parent 2cc7f22 commit c0bb7a7

File tree

1 file changed

+27
-6
lines changed

1 file changed

+27
-6
lines changed

tb_device_mqtt.py

+27-6
Original file line numberDiff line numberDiff line change
@@ -1163,15 +1163,18 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
11631163
final_message_item = {'data': [], 'datapoints': 0}
11641164

11651165
message_item_values_with_allowed_size = {}
1166+
ts = None
11661167
current_size = 0
11671168

11681169
for (message_index, message) in enumerate(message_pack):
11691170
if not isinstance(message, dict):
11701171
log.error("Message is not a dictionary!")
11711172
log.debug("Message: %s", message)
11721173
continue
1173-
1174+
old_ts = ts if ts is not None else message.get("ts")
11741175
ts = message.get("ts")
1176+
ts_changed = ts is not None and old_ts != ts
1177+
11751178
values = message.get("values", message)
11761179
values_data_keys = list(values)
11771180

@@ -1189,14 +1192,19 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
11891192
data_key_size = len(data_key) + len(str(value))
11901193

11911194
if ((datapoints_max_count == 0 or len(message_item_values_with_allowed_size) < datapoints_max_count)
1192-
and current_size + data_key_size < max_payload_size):
1195+
and current_size + data_key_size < max_payload_size) and not ts_changed:
11931196
message_item_values_with_allowed_size[data_key] = value
11941197
current_size += data_key_size
11951198

1196-
if ((datapoints_max_count > 0 and len(message_item_values_with_allowed_size) >= datapoints_max_count + current_size // 1024)
1197-
or current_size + data_key_size >= max_payload_size):
1199+
if ((TBDeviceMqttClient._datapoints_limit_reached(datapoints_max_count, len(message_item_values_with_allowed_size), current_size))
1200+
or TBDeviceMqttClient._payload_size_limit_reached(max_payload_size, current_size, data_key_size)) \
1201+
or ts_changed:
11981202
if ts:
1199-
message_chunk = {"ts": ts, "values": message_item_values_with_allowed_size.copy()}
1203+
ts_to_write = ts
1204+
if old_ts is not None and old_ts != ts:
1205+
ts_to_write = old_ts
1206+
old_ts = ts
1207+
message_chunk = {"ts": ts_to_write, "values": message_item_values_with_allowed_size.copy()}
12001208
if 'metadata' in message:
12011209
message_chunk['metadata'] = message['metadata']
12021210
final_message_item['data'].append(message_chunk)
@@ -1208,6 +1216,10 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
12081216
final_message_item = {'data': [], 'datapoints': 0}
12091217

12101218
message_item_values_with_allowed_size.clear()
1219+
if ts_changed:
1220+
message_item_values_with_allowed_size[data_key] = value
1221+
current_size += data_key_size
1222+
ts_changed = False
12111223
current_size = 0
12121224

12131225
if (message_index == len(message_pack) - 1
@@ -1221,10 +1233,19 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
12211233
final_message_item['data'].append(message_item_values_with_allowed_size.copy())
12221234

12231235
final_message_item['datapoints'] = len(message_item_values_with_allowed_size)
1224-
append_split_message(final_message_item.copy())
1236+
if final_message_item['data']:
1237+
append_split_message(final_message_item.copy())
12251238

12261239
return split_messages
12271240

1241+
@staticmethod
1242+
def _datapoints_limit_reached(datapoints_max_count, current_datapoints_size, current_size):
1243+
return 0 < datapoints_max_count <= current_datapoints_size + current_size // 1024
1244+
1245+
@staticmethod
1246+
def _payload_size_limit_reached(max_payload_size, current_size, additional_size):
1247+
return current_size + additional_size >= max_payload_size
1248+
12281249
def add_attrs_request_timeout(self, attr_request_number, timeout):
12291250
self.__attrs_request_timeout[attr_request_number] = timeout
12301251

0 commit comments

Comments
 (0)