Skip to content

Commit b524461

Browse files
authored
Update F3 observer to accept partial messages (#957)
Accept and store partial messages in F3 observer. Update the schema accordingly to store the vote value key.
1 parent fd3ef15 commit b524461

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

observer/model.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package observer
33
import (
44
"time"
55

6+
"github.com/filecoin-project/go-f3"
67
"github.com/filecoin-project/go-f3/gpbft"
78
)
89

@@ -16,6 +17,7 @@ type message struct {
1617
Signature []byte `json:"Signature"`
1718
Ticket []byte `json:"Ticket"`
1819
Justification *justification `json:"Justification"`
20+
VoteValueKey []byte `json:"VoteValueKey"`
1921
}
2022

2123
type justification struct {
@@ -44,7 +46,7 @@ type tipSet struct {
4446
PowerTable string `json:"PowerTable"`
4547
}
4648

47-
func newMessage(timestamp time.Time, nn string, msg gpbft.GMessage) (*message, error) {
49+
func newMessage(timestamp time.Time, nn string, msg f3.PartialGMessage) (*message, error) {
4850
j, err := newJustification(msg.Justification)
4951
if err != nil {
5052
return nil, err
@@ -57,6 +59,7 @@ func newMessage(timestamp time.Time, nn string, msg gpbft.GMessage) (*message, e
5759
Ticket: msg.Ticket,
5860
Vote: newPayload(msg.Vote),
5961
Justification: j,
62+
VoteValueKey: msg.VoteValueKey[:],
6063
}, nil
6164
}
6265

observer/observer.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package observer
22

33
import (
4-
"bytes"
54
"context"
65
"database/sql"
76
_ "embed"
@@ -14,7 +13,9 @@ import (
1413
"sync"
1514
"time"
1615

16+
"github.com/filecoin-project/go-f3"
1717
"github.com/filecoin-project/go-f3/gpbft"
18+
"github.com/filecoin-project/go-f3/internal/encoding"
1819
"github.com/filecoin-project/go-f3/internal/psutil"
1920
"github.com/filecoin-project/go-f3/manifest"
2021
"github.com/ipfs/go-log/v2"
@@ -41,16 +42,22 @@ type Observer struct {
4142

4243
messageObserved chan *message
4344
networkChanged <-chan gpbft.NetworkName
45+
msgEncoding *encoding.ZSTD[*f3.PartialGMessage]
4446
}
4547

4648
func New(o ...Option) (*Observer, error) {
4749
opts, err := newOptions(o...)
4850
if err != nil {
4951
return nil, err
5052
}
53+
msgEncoding, err := encoding.NewZSTD[*f3.PartialGMessage]()
54+
if err != nil {
55+
return nil, err
56+
}
5157
return &Observer{
5258
options: opts,
5359
messageObserved: make(chan *message, opts.messageBufferSize),
60+
msgEncoding: msgEncoding,
5461
}, nil
5562
}
5663

@@ -189,7 +196,7 @@ func (o *Observer) observe(ctx context.Context) error {
189196
}
190197

191198
func (o *Observer) storeMessage(ctx context.Context, om *message) error {
192-
const insertMessage = `INSERT INTO latest_messages VALUES(?,?,?,?::json,?,?,?::json);`
199+
const insertMessage = `INSERT INTO latest_messages VALUES(?,?,?,?::json,?,?,?::json,?);`
193200
voteMarshaled, err := json.Marshal(om.Vote)
194201
if err != nil {
195202
return fmt.Errorf("failed to marshal vote: %w", err)
@@ -210,6 +217,7 @@ func (o *Observer) storeMessage(ctx context.Context, om *message) error {
210217
om.Signature,
211218
om.Ticket,
212219
justificationMarshaled,
220+
om.VoteValueKey,
213221
); err != nil {
214222
return fmt.Errorf("failed to execute query: %w", err)
215223
}
@@ -317,7 +325,7 @@ func (o *Observer) startObserverFor(ctx context.Context, networkName gpbft.Netwo
317325
}
318326
}
319327
}()
320-
if err := o.pubSub.RegisterTopicValidator(topicName, validatePubSubMessage); err != nil {
328+
if err := o.pubSub.RegisterTopicValidator(topicName, o.validatePubSubMessage); err != nil {
321329
return nil, fmt.Errorf("failed to register topic validator: %w", err)
322330
}
323331
topic, err = o.pubSub.Join(topicName, pubsub.WithTopicMessageIdFn(psutil.GPBFTMessageIdFn))
@@ -353,7 +361,7 @@ func (o *Observer) startObserverFor(ctx context.Context, networkName gpbft.Netwo
353361
continue
354362
}
355363

356-
om, err := newMessage(time.Now().UTC(), string(networkName), msg.ValidatorData.(gpbft.GMessage))
364+
om, err := newMessage(time.Now().UTC(), string(networkName), msg.ValidatorData.(f3.PartialGMessage))
357365
if err != nil {
358366
logger.Errorw("Failed to instantiate observation message", "err", err)
359367
continue
@@ -394,11 +402,11 @@ func (o *Observer) Stop(ctx context.Context) error {
394402
return err
395403
}
396404

397-
func validatePubSubMessage(_ context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
398-
var gmsg gpbft.GMessage
399-
if err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)); err != nil {
405+
func (o *Observer) validatePubSubMessage(_ context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
406+
var pgmsg f3.PartialGMessage
407+
if err := o.msgEncoding.Decode(msg.Data, &pgmsg); err != nil {
400408
return pubsub.ValidationReject
401409
}
402-
msg.ValidatorData = gmsg
410+
msg.ValidatorData = pgmsg
403411
return pubsub.ValidationAccept
404412
}

observer/schema.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,7 @@ CREATE TABLE IF NOT EXISTS latest_messages (
6060
Signature BLOB
6161
) NULL
6262
);
63+
64+
-- Add Key column to latest_messages table to accommodate partial messages.
65+
ALTER TABLE latest_messages
66+
ADD COLUMN VoteValueKey BLOB;

0 commit comments

Comments
 (0)