Skip to content

Commit bdd74f2

Browse files
committed
Extended IPFIX decoder: support decoding messages with multiple sets and sets with multiple records. More robust decoding: if decoding a set fails, still decode subsequent sets. Added ReadCount() and Peek() functionality to reader.Reader and allSetIds() to MemCache.
1 parent 39c1aea commit bdd74f2

File tree

6 files changed

+342
-69
lines changed

6 files changed

+342
-69
lines changed

ipfix/decoder.go

+129-58
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
package ipfix
2424

2525
import (
26+
"bytes"
27+
"errors"
2628
"fmt"
2729
"io"
2830
"net"
@@ -96,75 +98,115 @@ func NewDecoder(raddr net.IP, b []byte) *Decoder {
9698

9799
// Decode decodes the IPFIX raw data
98100
func (d *Decoder) Decode(mem MemCache) (*Message, error) {
99-
var (
100-
msg = new(Message)
101-
err error
102-
)
101+
var msg = new(Message)
103102

104103
// IPFIX Message Header decoding
105-
if err = msg.Header.unmarshal(d.reader); err != nil {
104+
if err := msg.Header.unmarshal(d.reader); err != nil {
106105
return nil, err
107106
}
108107
// IPFIX Message Header validation
109-
if err = msg.Header.validate(); err != nil {
108+
if err := msg.Header.validate(); err != nil {
110109
return nil, err
111110
}
112111

113112
// Add source IP address as Agent ID
114113
msg.AgentID = d.raddr.String()
115114

115+
// In case there are multiple non-fatal errors, collect them and report all of them.
116+
// The rest of the received sets will still be interpreted, until a fatal error is encountered.
117+
// A non-fatal error is for example an illegal data record or unknown template id.
118+
var decodeErrors []error
116119
for d.reader.Len() > 4 {
120+
if err := d.decodeSet(mem, msg); err != nil {
121+
switch err.(type) {
122+
case nonfatalError:
123+
decodeErrors = append(decodeErrors, err)
124+
default:
125+
return nil, err
126+
}
127+
}
128+
}
117129

118-
setHeader := new(SetHeader)
119-
setHeader.unmarshal(d.reader)
130+
return msg, combineErrors(decodeErrors...)
131+
}
120132

121-
if setHeader.Length < 4 {
122-
return nil, io.ErrUnexpectedEOF
123-
}
133+
type nonfatalError error
124134

125-
switch {
126-
case setHeader.SetID == 2:
127-
// Template set
128-
tr := TemplateRecord{}
129-
tr.unmarshal(d.reader)
130-
mem.insert(tr.TemplateID, d.raddr, tr)
131-
case setHeader.SetID == 3:
132-
// Option set
133-
tr := TemplateRecord{}
134-
tr.unmarshalOpts(d.reader)
135-
mem.insert(tr.TemplateID, d.raddr, tr)
136-
case setHeader.SetID >= 4 && setHeader.SetID <= 255:
137-
// Reserved
138-
default:
139-
// data
140-
tr, ok := mem.retrieve(setHeader.SetID, d.raddr)
141-
if !ok {
142-
select {
143-
case rpcChan <- RPCRequest{
144-
ID: setHeader.SetID,
145-
IP: d.raddr,
146-
}:
147-
default:
148-
}
149-
return msg, fmt.Errorf("%s unknown ipfix template id# %d",
150-
d.raddr.String(),
151-
setHeader.SetID,
152-
)
135+
func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
136+
startCount := d.reader.ReadCount()
137+
138+
setHeader := new(SetHeader)
139+
if err := setHeader.unmarshal(d.reader); err != nil {
140+
return err
141+
}
142+
if setHeader.Length < 4 {
143+
return io.ErrUnexpectedEOF
144+
}
145+
146+
var tr TemplateRecord
147+
var err error
148+
// This check is somewhat redundant with the switch-clause below, but the retrieve() operation should not be executed inside the loop.
149+
if setHeader.SetID > 255 {
150+
var ok bool
151+
tr, ok = mem.retrieve(setHeader.SetID, d.raddr)
152+
if !ok {
153+
select {
154+
case rpcChan <- RPCRequest{
155+
ID: setHeader.SetID,
156+
IP: d.raddr,
157+
}:
158+
default:
153159
}
160+
err = nonfatalError(fmt.Errorf("%s unknown ipfix template id# %d. Known ids: %v",
161+
d.raddr.String(),
162+
setHeader.SetID,
163+
mem.allSetIds(),
164+
))
165+
}
166+
}
154167

155-
// data records
156-
for d.reader.Len() > 2 {
157-
data, err := decodeData(d.reader, tr)
158-
if err != nil {
159-
return msg, err
160-
}
168+
for err == nil && setHeader.Length > uint16(d.reader.ReadCount()-startCount) {
169+
if setId := setHeader.SetID; setId == 2 || setId == 3 {
170+
// Template record or template option record
161171

172+
// Check if only padding is left in this set. A template id of zero indicates padding bytes, which MUST be zero.
173+
templateId, err := d.reader.PeekUint16()
174+
if err == nil && templateId == 0 {
175+
break
176+
}
177+
178+
tr := TemplateRecord{}
179+
if setId == 2 {
180+
err = tr.unmarshal(d.reader)
181+
} else {
182+
err = tr.unmarshalOpts(d.reader)
183+
}
184+
if err == nil {
185+
mem.insert(tr.TemplateID, d.raddr, tr)
186+
}
187+
} else if setId >= 4 && setId <= 255 {
188+
// Reserved set, do not read any records
189+
break
190+
} else {
191+
// Data set
192+
var data []DecodedField
193+
data, err = d.decodeData(tr)
194+
if err == nil {
162195
msg.DataSets = append(msg.DataSets, data)
163196
}
164197
}
165198
}
166199

167-
return msg, nil
200+
// Skip the rest of the set in order to properly continue with the next set
201+
// This is necessary if the set is padded, has a reserved set ID, or a nonfatal error occurred
202+
leftoverBytes := setHeader.Length - uint16(d.reader.ReadCount()-startCount)
203+
if leftoverBytes > 0 {
204+
_, skipErr := d.reader.Read(int(leftoverBytes))
205+
if skipErr != nil {
206+
err = skipErr
207+
}
208+
}
209+
return err
168210
}
169211

170212
// RFC 7011 - part 3.1. Message Header Format
@@ -336,20 +378,25 @@ func (f *TemplateFieldSpecifier) unmarshal(r *reader.Reader) error {
336378
// | ... | ... |
337379
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
338380

339-
func (tr *TemplateRecord) unmarshal(r *reader.Reader) {
381+
func (tr *TemplateRecord) unmarshal(r *reader.Reader) error {
340382
var (
341383
th = TemplateHeader{}
342384
tf = TemplateFieldSpecifier{}
343385
)
344386

345-
th.unmarshal(r)
387+
if err := th.unmarshal(r); err != nil {
388+
return err
389+
}
346390
tr.TemplateID = th.TemplateID
347391
tr.FieldCount = th.FieldCount
348392

349393
for i := th.FieldCount; i > 0; i-- {
350-
tf.unmarshal(r)
394+
if err := tf.unmarshal(r); err != nil {
395+
return err
396+
}
351397
tr.FieldSpecifiers = append(tr.FieldSpecifiers, tf)
352398
}
399+
return nil
353400
}
354401

355402
// 0 1 2 3
@@ -380,34 +427,42 @@ func (tr *TemplateRecord) unmarshal(r *reader.Reader) {
380427
// | Option M Field Length | Padding (optional) |
381428
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
382429

383-
func (tr *TemplateRecord) unmarshalOpts(r *reader.Reader) {
430+
func (tr *TemplateRecord) unmarshalOpts(r *reader.Reader) error {
384431
var (
385432
th = TemplateHeader{}
386433
tf = TemplateFieldSpecifier{}
387434
)
388435

389-
th.unmarshalOpts(r)
436+
if err := th.unmarshalOpts(r); err != nil {
437+
return err
438+
}
390439
tr.TemplateID = th.TemplateID
391440
tr.FieldCount = th.FieldCount
392441
tr.ScopeFieldCount = th.ScopeFieldCount
393442

394443
for i := th.ScopeFieldCount; i > 0; i-- {
395-
tf.unmarshal(r)
444+
if err := tf.unmarshal(r); err != nil {
445+
return err
446+
}
396447
tr.ScopeFieldSpecifiers = append(tr.FieldSpecifiers, tf)
397448
}
398449

399450
for i := th.FieldCount - th.ScopeFieldCount; i > 0; i-- {
400-
tf.unmarshal(r)
451+
if err := tf.unmarshal(r); err != nil {
452+
return err
453+
}
401454
tr.FieldSpecifiers = append(tr.FieldSpecifiers, tf)
402455
}
456+
return nil
403457
}
404458

405-
func decodeData(r *reader.Reader, tr TemplateRecord) ([]DecodedField, error) {
459+
func (d *Decoder) decodeData(tr TemplateRecord) ([]DecodedField, error) {
406460
var (
407461
fields []DecodedField
408462
err error
409463
b []byte
410464
)
465+
r := d.reader
411466

412467
for i := 0; i < len(tr.FieldSpecifiers); i++ {
413468
b, err = r.Read(int(tr.FieldSpecifiers[i].Length))
@@ -421,8 +476,8 @@ func decodeData(r *reader.Reader, tr TemplateRecord) ([]DecodedField, error) {
421476
}]
422477

423478
if !ok {
424-
return nil, fmt.Errorf("IPFIX element key (%d) not exist",
425-
tr.FieldSpecifiers[i].ElementID)
479+
return nil, nonfatalError(fmt.Errorf("IPFIX element key (%d) not exist",
480+
tr.FieldSpecifiers[i].ElementID))
426481
}
427482

428483
fields = append(fields, DecodedField{
@@ -443,8 +498,8 @@ func decodeData(r *reader.Reader, tr TemplateRecord) ([]DecodedField, error) {
443498
}]
444499

445500
if !ok {
446-
return nil, fmt.Errorf("IPFIX element key (%d) not exist (scope)",
447-
tr.ScopeFieldSpecifiers[i].ElementID)
501+
return nil, nonfatalError(fmt.Errorf("IPFIX element key (%d) not exist (scope)",
502+
tr.ScopeFieldSpecifiers[i].ElementID))
448503
}
449504

450505
fields = append(fields, DecodedField{
@@ -455,3 +510,19 @@ func decodeData(r *reader.Reader, tr TemplateRecord) ([]DecodedField, error) {
455510

456511
return fields, nil
457512
}
513+
514+
func combineErrors(errorSlice ...error) (err error) {
515+
switch len(errorSlice) {
516+
case 0:
517+
case 1:
518+
err = errorSlice[0]
519+
default:
520+
var errMsg bytes.Buffer
521+
errMsg.WriteString("Multiple errors:")
522+
for _, subError := range errorSlice {
523+
errMsg.WriteString("\n- " + subError.Error())
524+
}
525+
err = errors.New(errMsg.String())
526+
}
527+
return
528+
}

0 commit comments

Comments
 (0)