Skip to content

Commit 11a88a5

Browse files
committed
refactor(thead): refactor ipns logic to implement thead. #31
1 parent 3e3997e commit 11a88a5

File tree

3 files changed

+181
-0
lines changed

3 files changed

+181
-0
lines changed

thead/ipns/ipns.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,105 @@
11
// Package ipns provides implementation for pdcl head management based on ipns protocol.
22
package ipns
3+
4+
import (
5+
"context"
6+
"fmt"
7+
"path"
8+
"time"
9+
10+
"github.com/ipfs/go-cid"
11+
shell "github.com/ipfs/go-ipfs-api"
12+
"github.com/libp2p/go-libp2p-core/peer"
13+
"github.com/multiformats/go-multibase"
14+
15+
"github.com/areknoster/public-distributed-commit-log/thead"
16+
)
17+
18+
var _ thead.Reader = (*BasicHeadReader)(nil)
19+
20+
// BasicHeadReader get's topic head from given IPNS address
21+
type BasicHeadReader struct {
22+
sh *shell.Shell
23+
resolver resolver
24+
ipnsAddr string
25+
}
26+
27+
func NewBasicHeadReader(sh *shell.Shell, ipnsAddr string) *BasicHeadReader {
28+
bhr := &BasicHeadReader{
29+
sh: sh,
30+
resolver: newShellResolver(sh),
31+
ipnsAddr: ipnsAddr,
32+
}
33+
bhr.ipnsAddr = ipnsAddr
34+
return bhr
35+
}
36+
37+
func (hr *BasicHeadReader) ReadHead(ctx context.Context) (cid.Cid, error) {
38+
id, err := hr.resolver.resolveIPNS(hr.ipnsAddr)
39+
if err != nil {
40+
return cid.Cid{}, err
41+
}
42+
return id, nil
43+
}
44+
45+
// BasicHeadSetter sets default IPFS daemon's IPNS key address to point to given CID
46+
type BasicHeadSetter struct {
47+
sh *shell.Shell
48+
}
49+
50+
func NewBasicHeadSetter(sh *shell.Shell) *BasicHeadSetter {
51+
return &BasicHeadSetter{sh: sh}
52+
}
53+
54+
func (bhs *BasicHeadSetter) SetHead(ctx context.Context, cid cid.Cid) error {
55+
ipfsAddr := path.Join("/ipfs/", cid.String())
56+
_, err := bhs.sh.PublishWithDetails(ipfsAddr, "", 24*time.Hour, 10*time.Minute, false)
57+
if err != nil {
58+
return fmt.Errorf("publishing ipns update to ipfs daemon: %v", err)
59+
}
60+
return nil
61+
}
62+
63+
// BasicHeadManager can be used by Sentinel to manage topic head
64+
// Since consumers don't set topic's head, it should not be used by them.
65+
// Use HeadReader implementation to get topic head for consumer
66+
// and some other (e.g. memory or disk) implementation to store internal consumer offset
67+
type BasicHeadManager struct {
68+
*BasicHeadReader
69+
*BasicHeadSetter
70+
}
71+
72+
// NewBasicHeadManager initializes BasicHeadManager with default daemon's key used as PK for topic's head
73+
func NewBasicHeadManager(sh *shell.Shell) (BasicHeadManager, error) {
74+
ipnsAddr, err := GetDaemonIPNSAddress(sh)
75+
if err != nil {
76+
return BasicHeadManager{}, fmt.Errorf("get daemon IPNS address: %w", err)
77+
}
78+
return BasicHeadManager{
79+
BasicHeadReader: NewBasicHeadReader(sh, ipnsAddr),
80+
BasicHeadSetter: NewBasicHeadSetter(sh),
81+
}, nil
82+
}
83+
84+
// GetDaemonIPNSAddress gets IPNS address attached to daemon's default key.
85+
// In most scenarios it's to be used when initializing
86+
// sentinel with some existing daemon to use its default
87+
// ipns address as IPNS head.
88+
func GetDaemonIPNSAddress(sh *shell.Shell) (string, error) {
89+
// this implementation is extremely non-obvious
90+
// because IPFS doesn't normally allow for finding IPNS address
91+
// of given key unless some file is added to it.
92+
resp, err := sh.ID()
93+
if err != nil {
94+
return "", fmt.Errorf("get IPFS ID: %w", err)
95+
}
96+
pid, err := peer.Decode(resp.ID)
97+
if err != nil {
98+
return "", fmt.Errorf("decode peer ID: %w", err)
99+
}
100+
ipnsAddr, err := peer.ToCid(pid).StringOfBase(multibase.Base36)
101+
if err != nil {
102+
return "", fmt.Errorf("encode ipns address: %w", err)
103+
}
104+
return ipnsAddr, nil
105+
}

thead/ipns/resolver.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package ipns
2+
3+
import (
4+
"fmt"
5+
"path"
6+
"strings"
7+
8+
"github.com/areknoster/public-distributed-commit-log/pdcl"
9+
"github.com/ipfs/go-cid"
10+
shell "github.com/ipfs/go-ipfs-api"
11+
)
12+
13+
type resolver interface {
14+
// resolveIPNS finds IPFS address that's pointed by given IPNS address
15+
resolveIPNS(ipnsName string) (ipfsAddress cid.Cid, err error)
16+
}
17+
18+
type shellResolver struct {
19+
shell *shell.Shell
20+
}
21+
22+
func newShellResolver(shell *shell.Shell) *shellResolver {
23+
return &shellResolver{shell: shell}
24+
}
25+
26+
func (m *shellResolver) resolveIPNS(ipnsName string) (ipfsAddress cid.Cid, err error) {
27+
resolvedAddr, err := m.shell.Resolve(path.Join("/ipns/", ipnsName))
28+
if err != nil {
29+
return cid.Undef, fmt.Errorf("resolve %s from IPNS: %w", ipnsName, err)
30+
}
31+
resolvedCid, err := pdcl.ParseCID(strings.TrimPrefix(resolvedAddr, "/ipfs/"))
32+
if err != nil {
33+
return cid.Undef, fmt.Errorf("parse resolved IPNS address to CID: %w", err)
34+
}
35+
return resolvedCid, nil
36+
}

thead/ipns/test/ipns_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Package ipns provides implementation for pdcl head management based on ipns protocol.
2+
package ipns
3+
4+
import (
5+
"context"
6+
"testing"
7+
"time"
8+
9+
"github.com/areknoster/public-distributed-commit-log/storage/message/ipfs"
10+
"github.com/areknoster/public-distributed-commit-log/storage/pbcodec"
11+
"github.com/areknoster/public-distributed-commit-log/test/testpb"
12+
"github.com/areknoster/public-distributed-commit-log/thead"
13+
. "github.com/areknoster/public-distributed-commit-log/thead/ipns"
14+
shell "github.com/ipfs/go-ipfs-api"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestHeadManager_WriteReadHead(t *testing.T) {
20+
if testing.Short() {
21+
t.Skip("skipping because ipfs daemon is needed")
22+
}
23+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
24+
t.Cleanup(cancel)
25+
26+
sh := shell.NewShell("localhost:5001")
27+
28+
storage := ipfs.NewStorage(sh, pbcodec.ProtoBuf{})
29+
msg := testpb.MakeCurrentRandomTestMessage()
30+
id, err := storage.Write(ctx, msg)
31+
require.NoError(t, err)
32+
t.Log("message written")
33+
34+
var headManager thead.Manager
35+
headManager, err = NewBasicHeadManager(sh)
36+
require.NoError(t, err)
37+
require.NoError(t, headManager.SetHead(ctx, id))
38+
39+
headCid, err := headManager.ReadHead(ctx)
40+
require.NoError(t, err)
41+
assert.Equal(t, id, headCid)
42+
}

0 commit comments

Comments
 (0)