Skip to content

feat: blockstore: GetMany blockstore method #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added

* `boxo/blockstore`:
* [GetMany blockstore implementation](https://github.com/ipfs/boxo/pull/492)
* `boxo/gateway`:
* A new `WithResolver(...)` option can be used with `NewBlocksBackend(...)` allowing the user to pass their custom `Resolver` implementation.
* `boxo/bitswap/client`:
Expand Down
125 changes: 125 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package blockstore
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -64,6 +65,13 @@ type Blockstore interface {
HashOnRead(enabled bool)
}

// GetManyBlockstore is a blockstore interface that supports GetMany and PutMany methods using ds.TxnDatastore
type GetManyBlockstore interface {
Blockstore
PutMany(ctx context.Context, blocks []blocks.Block) error
GetMany(context.Context, []cid.Cid) ([]blocks.Block, []cid.Cid, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two recommendations:

  1. It seems like there's no need for []blocks.Block and []cid.Cid since Block contains a .Cid() method https://github.com/ipfs/go-block-format/blob/v0.2.0/blocks.go#L19-L25
  2. You may want to consider a streaming interface so that you don't have to buffer all the blocks in memory

If returning an asynchronous object (e.g. channel or iterator) might be worth taking a look at ipfs/kubo#4592 to make sure you don't run into some common pitfalls. With Go generics now iterators may also make this easier than it used to be.

}

// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
Expand Down Expand Up @@ -310,6 +318,123 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return output, nil
}

// NewGetManyBlockstore returns a default GetManyBlockstore implementation
// using the provided datastore.TxnDatastore backend.
func NewGetManyBlockstore(d ds.TxnDatastore, opts ...Option) GetManyBlockstore {
bs := &blockstore{
datastore: batchingTxnDatastoreStub{TxnDatastore: d},
}

for _, o := range opts {
o.f(bs)
}

if !bs.noPrefix {
bs.datastore = dsns.Wrap(bs.datastore, BlockPrefix)
d = dsns.WrapTxnDatastore(d, BlockPrefix)
}

gmbs := &getManyBlockstore{
blockstore: bs,
datastore: d,
}

return gmbs
}

type getManyBlockstore struct {
*blockstore
datastore ds.TxnDatastore
}

type batchingTxnDatastoreStub struct {
ds.BatchingFeature
ds.TxnDatastore
}

func (bs *getManyBlockstore) GetMany(ctx context.Context, cs []cid.Cid) ([]blocks.Block, []cid.Cid, error) {
if len(cs) == 1 {
// performance fast-path
block, err := bs.Get(ctx, cs[0])
return []blocks.Block{block}, nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't make sense to not return the CID here given it's in the signature, but also it doesn't seem like []cid.Cid needs to be in the return signature

}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return nil, nil, err
}
blks := make([]blocks.Block, 0, len(cs))
missingCIDs := make([]cid.Cid, 0, len(cs))
for _, c := range cs {
if !c.Defined() {
logger.Error("undefined cid in blockstore")
return nil, nil, ipld.ErrNotFound{Cid: c}
}
bdata, err := t.Get(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err != nil {
if err == ds.ErrNotFound {
missingCIDs = append(missingCIDs, c)
} else {
return nil, nil, err
}
} else {
if bs.blockstore.rehash.Load() {
rbcid, err := c.Prefix().Sum(bdata)
if err != nil {
return nil, nil, err
}

if !rbcid.Equals(c) {
return nil, nil, fmt.Errorf("block in storage has different hash (%x) than requested (%x)", rbcid.Hash(), c.Hash())
}

blk, err := blocks.NewBlockWithCid(bdata, rbcid)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
} else {
blk, err := blocks.NewBlockWithCid(bdata, c)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
}
}
}
return blks, missingCIDs, t.Commit(ctx)
}

func (bs *getManyBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return err
}
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())

if !bs.blockstore.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
continue
}
}

err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}
}
return t.Commit(ctx)
}

// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
Expand Down
122 changes: 122 additions & 0 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

dstest "github.com/ipfs/go-datastore/test"

u "github.com/ipfs/boxo/util"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -72,6 +74,126 @@ func TestCidv0v1(t *testing.T) {
}
}

func TestGetManyWhenKeyNotPresent(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
c1 := cid.NewCidV0(u.Hash([]byte("stuff")))
c2 := cid.NewCidV0(u.Hash([]byte("stuff2")))

blks, missingCIDs, err := bs.GetMany(bg, []cid.Cid{c1, c2})

if len(blks) != 0 {
t.Error("no blocks expected")
}
if len(missingCIDs) != 2 {
t.Error("2 missing cids expected")
}
if err != nil {
t.Error("no error expected")
}
}

func TestGetManyWhenKeyIsNil(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
_, _, err := bs.GetMany(bg, []cid.Cid{{}, {}})
if !ipld.IsNotFound(err) {
t.Fail()
}
}

func TestGetManyBlockstorePutThenGetBlock(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block := blocks.NewBlock([]byte("some data"))

err := bs.Put(bg, block)
if err != nil {
t.Fatal(err)
}

blockFromBlockstore, err := bs.Get(bg, block.Cid())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) {
t.Fail()
}
}

func TestPutsThenGetManyBlock(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg, []cid.Cid{block1.Cid(), block2.Cid(), block3.Cid(), block4.Cid()})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), block3.Cid().Bytes()) {
t.Fail()
}
}

func TestCidv0v1Many(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg,
[]cid.Cid{cid.NewCidV1(cid.DagProtobuf, block1.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block2.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block4.Cid().Hash())})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()).Bytes()) {
t.Fail()
}
}

func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
Expand Down
32 changes: 17 additions & 15 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ require (
github.com/multiformats/go-multicodec v0.9.0
github.com/prometheus/client_golang v1.16.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0
go.opentelemetry.io/contrib/propagators/autoprop v0.40.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/sdk v1.19.0
)

require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
Expand All @@ -52,7 +52,7 @@ require (
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
Expand Down Expand Up @@ -139,15 +139,14 @@ require (
go.opentelemetry.io/contrib/propagators/jaeger v1.15.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.15.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.20.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand All @@ -157,16 +156,19 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

replace github.com/ipfs/boxo => ../

replace github.com/ipfs/go-datastore => github.com/vulcanize/go-datastore v0.6.1-internal-0.0.1
Loading