diff --git a/app/ldiff/diff.go b/app/ldiff/diff.go index 877e5f2f..bd9ebcf2 100644 --- a/app/ldiff/diff.go +++ b/app/ldiff/diff.go @@ -7,6 +7,7 @@ package ldiff import ( "bytes" "context" + "encoding/hex" "errors" "github.com/cespare/xxhash" "github.com/huandu/skiplist" @@ -89,6 +90,8 @@ type Diff interface { Elements() []Element // Ids retrieves ids of all elements in the Diff Ids() []string + // Hash returns hash of all elements in the diff + Hash() string } // Remote interface for using in the Diff @@ -169,6 +172,13 @@ func (d *diff) Elements() (elements []Element) { return } +func (d *diff) Hash() string { + d.mu.RLock() + defer d.mu.RUnlock() + res := d.getRange(Range{To: math.MaxUint64}) + return hex.EncodeToString(res.Hash) +} + // RemoveId removes element by id func (d *diff) RemoveId(id string) error { d.mu.Lock() diff --git a/app/ldiff/diff_test.go b/app/ldiff/diff_test.go index d16b17c8..8d8db28d 100644 --- a/app/ldiff/diff_test.go +++ b/app/ldiff/diff_test.go @@ -138,3 +138,13 @@ func BenchmarkDiff_Ranges(b *testing.B) { resBuf = resBuf[:0] } } + +func TestDiff_Hash(t *testing.T) { + d := New(16, 16) + h1 := d.Hash() + assert.NotEmpty(t, h1) + d.Set(Element{Id: "1"}) + h2 := d.Hash() + assert.NotEmpty(t, h2) + assert.NotEqual(t, h1, h2) +} diff --git a/app/ldiff/mock_ldiff/mock_ldiff.go b/app/ldiff/mock_ldiff/mock_ldiff.go index 5c30b515..119d7042 100644 --- a/app/ldiff/mock_ldiff/mock_ldiff.go +++ b/app/ldiff/mock_ldiff/mock_ldiff.go @@ -66,6 +66,20 @@ func (mr *MockDiffMockRecorder) Elements() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Elements", reflect.TypeOf((*MockDiff)(nil).Elements)) } +// Hash mocks base method. +func (m *MockDiff) Hash() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Hash") + ret0, _ := ret[0].(string) + return ret0 +} + +// Hash indicates an expected call of Hash. +func (mr *MockDiffMockRecorder) Hash() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Hash", reflect.TypeOf((*MockDiff)(nil).Hash)) +} + // Ids mocks base method. func (m *MockDiff) Ids() []string { m.ctrl.T.Helper() diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 4cd5ec1a..c58cb58e 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -63,7 +63,10 @@ func (d *diffSyncer) Init(deletionState deletionstate.DeletionState) { func (d *diffSyncer) RemoveObjects(ids []string) { for _, id := range ids { - d.diff.RemoveId(id) + _ = d.diff.RemoveId(id) + } + if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil { + d.log.Error("can't write space hash", zap.Error(err)) } } @@ -75,6 +78,9 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) { Id: id, Head: concatStrings(heads), }) + if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil { + d.log.Error("can't write space hash", zap.Error(err)) + } } func (d *diffSyncer) Sync(ctx context.Context) error { diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 686399b2..c74afd8e 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -148,11 +148,14 @@ func TestDiffSyncer_Sync(t *testing.T) { t.Run("update heads updates diff", func(t *testing.T) { newId := "newId" newHeads := []string{"h1", "h2"} + hash := "hash" diffMock.EXPECT().Set(ldiff.Element{ Id: newId, Head: concatStrings(newHeads), }) + diffMock.EXPECT().Hash().Return(hash) delState.EXPECT().Exists(newId).Return(false) + stMock.EXPECT().WriteSpaceHash(hash) diffSyncer.UpdateHeads(newId, newHeads) }) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index f7f78b6d..60168274 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -22,13 +22,14 @@ type TreeHeads struct { } type HeadSync interface { + Init(objectIds []string, deletionState deletionstate.DeletionState) + UpdateHeads(id string, heads []string) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) RemoveObjects(ids []string) AllIds() []string DebugAllHeads() (res []TreeHeads) - Init(objectIds []string, deletionState deletionstate.DeletionState) Close() (err error) } diff --git a/commonspace/space.go b/commonspace/space.go index 7f9936fc..1f28e327 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -26,6 +26,7 @@ import ( "github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey" "github.com/zeebo/errs" "go.uber.org/zap" + "strconv" "sync" "sync/atomic" "time" @@ -65,7 +66,7 @@ type SpaceDescription struct { } func NewSpaceId(id string, repKey uint64) string { - return fmt.Sprintf("%s.%d", id, repKey) + return fmt.Sprintf("%s.%s", id, strconv.FormatUint(repKey, 36)) } type Space interface { @@ -87,6 +88,7 @@ type Space interface { BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) DeleteTree(ctx context.Context, id string) (err error) + HeadSync() headsync.HeadSync SyncStatus() syncstatus.StatusUpdater Storage() spacestorage.SpaceStorage diff --git a/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go b/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go index f349e2eb..26f6bb30 100644 --- a/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go +++ b/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go @@ -95,6 +95,21 @@ func (mr *MockSpaceStorageMockRecorder) Id() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockSpaceStorage)(nil).Id)) } +// ReadSpaceHash mocks base method. +func (m *MockSpaceStorage) ReadSpaceHash() (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadSpaceHash") + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadSpaceHash indicates an expected call of ReadSpaceHash. +func (mr *MockSpaceStorageMockRecorder) ReadSpaceHash() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadSpaceHash", reflect.TypeOf((*MockSpaceStorage)(nil).ReadSpaceHash)) +} + // SetTreeDeletedStatus mocks base method. func (m *MockSpaceStorage) SetTreeDeletedStatus(arg0, arg1 string) error { m.ctrl.T.Helper() @@ -197,3 +212,17 @@ func (mr *MockSpaceStorageMockRecorder) TreeStorage(arg0 interface{}) *gomock.Ca mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TreeStorage", reflect.TypeOf((*MockSpaceStorage)(nil).TreeStorage), arg0) } + +// WriteSpaceHash mocks base method. +func (m *MockSpaceStorage) WriteSpaceHash(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteSpaceHash", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteSpaceHash indicates an expected call of WriteSpaceHash. +func (mr *MockSpaceStorageMockRecorder) WriteSpaceHash(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteSpaceHash", reflect.TypeOf((*MockSpaceStorage)(nil).WriteSpaceHash), arg0) +} diff --git a/commonspace/spacestorage/spacestorage.go b/commonspace/spacestorage/spacestorage.go index 6321f0de..08469f2f 100644 --- a/commonspace/spacestorage/spacestorage.go +++ b/commonspace/spacestorage/spacestorage.go @@ -34,10 +34,13 @@ type SpaceStorage interface { AclStorage() (liststorage.ListStorage, error) SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error) StoredIds() ([]string, error) - Close() error TreeRoot(id string) (*treechangeproto.RawTreeChangeWithId, error) TreeStorage(id string) (treestorage.TreeStorage, error) CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error) + WriteSpaceHash(head string) error + ReadSpaceHash() (hash string, err error) + + Close() error } type SpaceStorageCreatePayload struct { diff --git a/nodeconf/configuration.go b/nodeconf/configuration.go index af8e5f33..c3b511b9 100644 --- a/nodeconf/configuration.go +++ b/nodeconf/configuration.go @@ -1,7 +1,10 @@ //go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/any-sync/nodeconf Service,Configuration package nodeconf -import "github.com/anytypeio/go-chash" +import ( + "github.com/anytypeio/go-chash" + "strings" +) type Configuration interface { // Id returns current nodeconf id @@ -18,6 +21,8 @@ type Configuration interface { Addresses() map[string][]string // CHash returns nodes consistent table CHash() chash.CHash + // Partition returns partition number by spaceId + Partition(spaceId string) (part int) } type configuration struct { @@ -34,7 +39,7 @@ func (c *configuration) Id() string { } func (c *configuration) NodeIds(spaceId string) []string { - members := c.chash.GetMembers(spaceId) + members := c.chash.GetMembers(ReplKey(spaceId)) res := make([]string, 0, len(members)) for _, m := range members { if m.Id() != c.accountId { @@ -45,7 +50,7 @@ func (c *configuration) NodeIds(spaceId string) []string { } func (c *configuration) IsResponsible(spaceId string) bool { - for _, m := range c.chash.GetMembers(spaceId) { + for _, m := range c.chash.GetMembers(ReplKey(spaceId)) { if m.Id() == c.accountId { return true } @@ -72,3 +77,14 @@ func (c *configuration) Addresses() map[string][]string { func (c *configuration) CHash() chash.CHash { return c.chash } + +func (c *configuration) Partition(spaceId string) (part int) { + return c.chash.GetPartition(ReplKey(spaceId)) +} + +func ReplKey(spaceId string) (replKey string) { + if i := strings.LastIndex(spaceId, "."); i != -1 { + return spaceId[i+1:] + } + return spaceId +} diff --git a/nodeconf/configuration_test.go b/nodeconf/configuration_test.go new file mode 100644 index 00000000..757a5bc3 --- /dev/null +++ b/nodeconf/configuration_test.go @@ -0,0 +1,71 @@ +package nodeconf + +import ( + "fmt" + "github.com/anytypeio/go-chash" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "math/rand" + "testing" +) + +func TestReplKey(t *testing.T) { + assert.Equal(t, "repl", ReplKey("id.repl")) + assert.Equal(t, "repl", ReplKey("repl")) + assert.Equal(t, "", ReplKey(".")) +} + +func TestConfiguration_NodeIds(t *testing.T) { + ch, err := chash.New(chash.Config{ + PartitionCount: partitionCount, + ReplicationFactor: replicationFactor, + }) + require.NoError(t, err) + conf := &configuration{ + id: "last", + accountId: "1", + chash: ch, + } + for i := 0; i < 10; i++ { + require.NoError(t, conf.chash.AddMembers(testMember(fmt.Sprint(i+1)))) + } + + t.Run("random keys", func(t *testing.T) { + for i := 0; i < 10; i++ { + spaceId := fmt.Sprintf("%d.%d", rand.Int(), rand.Int()) + members := conf.NodeIds(spaceId) + if conf.IsResponsible(spaceId) { + assert.Len(t, members, 2) + } else { + assert.Len(t, members, 3) + } + } + }) + t.Run("same repl key", func(t *testing.T) { + var prevMemb []string + for i := 0; i < 10; i++ { + spaceId := fmt.Sprintf("%d.%d", rand.Int(), 42) + members := conf.NodeIds(spaceId) + if conf.IsResponsible(spaceId) { + assert.Len(t, members, 2) + } else { + assert.Len(t, members, 3) + } + if i != 0 { + assert.Equal(t, prevMemb, members) + } + prevMemb = members + } + }) + +} + +type testMember string + +func (t testMember) Id() string { + return string(t) +} + +func (t testMember) Capacity() float64 { + return 1 +}