Skip to content

Commit 313f049

Browse files
shirolimitedsiper
authored andcommitted
mp: fix flb_mp_accessor_keys_remove for subkey siblings
Signed-off-by: Arkady Dyakonov <[email protected]>
1 parent f36e292 commit 313f049

File tree

1 file changed

+41
-18
lines changed

1 file changed

+41
-18
lines changed

src/flb_mp.c

+41-18
Original file line numberDiff line numberDiff line change
@@ -502,11 +502,20 @@ struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns)
502502
return mpa;
503503
}
504504

505-
static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
506-
msgpack_object *key)
505+
/**
506+
* Finds matches for a given key in the list of record accessor patterns.
507+
* Stores the indexes of the matches in the provided array.
508+
*
509+
* @return The number of matches found.
510+
*/
511+
static inline int accessor_key_find_matches(struct flb_mp_accessor *mpa,
512+
msgpack_object *key,
513+
int* matched_indexes)
507514
{
508515
int i;
509516
int count;
517+
int match_count = 0;
518+
int out_index = 0;
510519
struct flb_mp_accessor_match *match;
511520

512521
count = mk_list_size(&mpa->ra_list);
@@ -517,14 +526,17 @@ static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
517526
}
518527

519528
if (match->start_key == key) {
520-
return i;
529+
match_count++;
530+
matched_indexes[out_index++] = i;
521531
}
522532
}
523533

524-
return -1;
534+
return match_count;
525535
}
526536

527-
static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
537+
static inline int accessor_sub_pack(struct flb_mp_accessor *mpa,
538+
int* matched_indexes,
539+
int match_count,
528540
msgpack_packer *mp_pck,
529541
msgpack_object *key,
530542
msgpack_object *val)
@@ -534,9 +546,13 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
534546
msgpack_object *k;
535547
msgpack_object *v;
536548
struct flb_mp_map_header mh;
549+
struct flb_mp_accessor_match *match;
537550

538-
if (match->key == key || match->key == val) {
539-
return FLB_FALSE;
551+
for (i = 0; i < match_count; i++) {
552+
match = &mpa->matches[matched_indexes[i]];
553+
if (match->key == key || match->key == val) {
554+
return FLB_FALSE;
555+
}
540556
}
541557

542558
if (key) {
@@ -549,7 +565,7 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
549565
k = &val->via.map.ptr[i].key;
550566
v = &val->via.map.ptr[i].val;
551567

552-
ret = accessor_sub_pack(match, mp_pck, k, v);
568+
ret = accessor_sub_pack(mpa, matched_indexes, match_count, mp_pck, k, v);
553569
if (ret == FLB_TRUE) {
554570
flb_mp_map_header_append(&mh);
555571
}
@@ -560,7 +576,7 @@ static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
560576
flb_mp_array_header_init(&mh, mp_pck);
561577
for (i = 0; i < val->via.array.size; i++) {
562578
v = &val->via.array.ptr[i];
563-
ret = accessor_sub_pack(match, mp_pck, NULL, v);
579+
ret = accessor_sub_pack(mpa, matched_indexes, match_count, mp_pck, NULL, v);
564580
if (ret == FLB_TRUE) {
565581
flb_mp_array_header_append(&mh);
566582
}
@@ -587,6 +603,7 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
587603
int ret;
588604
int rule_id = 0;
589605
int matches = 0;
606+
int* matched_indexes;
590607
msgpack_object *key;
591608
msgpack_object *val;
592609
msgpack_object *s_key;
@@ -641,6 +658,13 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
641658
/* Initialize map */
642659
flb_mp_map_header_init(&mh, &mp_pck);
643660

661+
/* Initialize array of matching indexes to properly handle sibling keys */
662+
matched_indexes = flb_malloc(sizeof(int) * matches);
663+
if (!matched_indexes) {
664+
flb_errno();
665+
return -1;
666+
}
667+
644668
for (i = 0; i < map->via.map.size; i++) {
645669
key = &map->via.map.ptr[i].key;
646670
val = &map->via.map.ptr[i].val;
@@ -649,30 +673,29 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
649673
* For every entry on the path, check if we should do a step-by-step
650674
* repackaging or just pack the whole object.
651675
*
652-
* Just check: does this 'key' exists on any path of the record
653-
* accessor patterns ?
654-
*
655-
* Find if the active key in the map, matches an accessor rule, if
656-
* if match we get the match id as return value, otherwise -1.
676+
* Find all matching rules that match this 'key'. Return the number of matches or 0
677+
* if no matches were found. Found matches are stored in the 'matched_indexes' array.
657678
*/
658-
ret = accessor_key_find_match(mpa, key);
659-
if (ret == -1) {
679+
ret = accessor_key_find_matches(mpa, key, matched_indexes);
680+
if (ret == 0) {
660681
/* No matches, it's ok to pack the kv pair */
661682
flb_mp_map_header_append(&mh);
662683
msgpack_pack_object(&mp_pck, *key);
663684
msgpack_pack_object(&mp_pck, *val);
664685
}
665686
else {
666687
/* The key has a match. Now we do a step-by-step packaging */
667-
match = &mpa->matches[ret];
668-
ret = accessor_sub_pack(match, &mp_pck, key, val);
688+
689+
ret = accessor_sub_pack(mpa, matched_indexes, ret, &mp_pck, key, val);
669690
if (ret == FLB_TRUE) {
670691
flb_mp_map_header_append(&mh);
671692
}
672693
}
673694
}
674695
flb_mp_map_header_end(&mh);
675696

697+
flb_free(matched_indexes);
698+
676699
*out_buf = mp_sbuf.data;
677700
*out_size = mp_sbuf.size;
678701

0 commit comments

Comments
 (0)