Merge pull request #3 from anytypeio/nodesync

This commit is contained in:
Mikhail Rakhmanov 2023-01-14 14:51:42 +01:00 committed by GitHub
commit af753ec392
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 172 additions and 7 deletions

View File

@ -7,6 +7,7 @@ package ldiff
import (
"bytes"
"context"
"encoding/hex"
"errors"
"github.com/cespare/xxhash"
"github.com/huandu/skiplist"
@ -89,6 +90,8 @@ type Diff interface {
Elements() []Element
// Ids retrieves ids of all elements in the Diff
Ids() []string
// Hash returns hash of all elements in the diff
Hash() string
}
// Remote interface for using in the Diff
@ -169,6 +172,13 @@ func (d *diff) Elements() (elements []Element) {
return
}
func (d *diff) Hash() string {
d.mu.RLock()
defer d.mu.RUnlock()
res := d.getRange(Range{To: math.MaxUint64})
return hex.EncodeToString(res.Hash)
}
// RemoveId removes element by id
func (d *diff) RemoveId(id string) error {
d.mu.Lock()

View File

@ -138,3 +138,13 @@ func BenchmarkDiff_Ranges(b *testing.B) {
resBuf = resBuf[:0]
}
}
func TestDiff_Hash(t *testing.T) {
d := New(16, 16)
h1 := d.Hash()
assert.NotEmpty(t, h1)
d.Set(Element{Id: "1"})
h2 := d.Hash()
assert.NotEmpty(t, h2)
assert.NotEqual(t, h1, h2)
}

View File

@ -66,6 +66,20 @@ func (mr *MockDiffMockRecorder) Elements() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Elements", reflect.TypeOf((*MockDiff)(nil).Elements))
}
// Hash mocks base method.
func (m *MockDiff) Hash() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Hash")
ret0, _ := ret[0].(string)
return ret0
}
// Hash indicates an expected call of Hash.
func (mr *MockDiffMockRecorder) Hash() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Hash", reflect.TypeOf((*MockDiff)(nil).Hash))
}
// Ids mocks base method.
func (m *MockDiff) Ids() []string {
m.ctrl.T.Helper()

View File

@ -63,7 +63,10 @@ func (d *diffSyncer) Init(deletionState deletionstate.DeletionState) {
func (d *diffSyncer) RemoveObjects(ids []string) {
for _, id := range ids {
d.diff.RemoveId(id)
_ = d.diff.RemoveId(id)
}
if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil {
d.log.Error("can't write space hash", zap.Error(err))
}
}
@ -75,6 +78,9 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) {
Id: id,
Head: concatStrings(heads),
})
if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil {
d.log.Error("can't write space hash", zap.Error(err))
}
}
func (d *diffSyncer) Sync(ctx context.Context) error {

View File

@ -148,11 +148,14 @@ func TestDiffSyncer_Sync(t *testing.T) {
t.Run("update heads updates diff", func(t *testing.T) {
newId := "newId"
newHeads := []string{"h1", "h2"}
hash := "hash"
diffMock.EXPECT().Set(ldiff.Element{
Id: newId,
Head: concatStrings(newHeads),
})
diffMock.EXPECT().Hash().Return(hash)
delState.EXPECT().Exists(newId).Return(false)
stMock.EXPECT().WriteSpaceHash(hash)
diffSyncer.UpdateHeads(newId, newHeads)
})

View File

@ -22,13 +22,14 @@ type TreeHeads struct {
}
type HeadSync interface {
Init(objectIds []string, deletionState deletionstate.DeletionState)
UpdateHeads(id string, heads []string)
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
RemoveObjects(ids []string)
AllIds() []string
DebugAllHeads() (res []TreeHeads)
Init(objectIds []string, deletionState deletionstate.DeletionState)
Close() (err error)
}

View File

@ -26,6 +26,7 @@ import (
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
"github.com/zeebo/errs"
"go.uber.org/zap"
"strconv"
"sync"
"sync/atomic"
"time"
@ -65,7 +66,7 @@ type SpaceDescription struct {
}
func NewSpaceId(id string, repKey uint64) string {
return fmt.Sprintf("%s.%d", id, repKey)
return fmt.Sprintf("%s.%s", id, strconv.FormatUint(repKey, 36))
}
type Space interface {
@ -87,6 +88,7 @@ type Space interface {
BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error)
DeleteTree(ctx context.Context, id string) (err error)
HeadSync() headsync.HeadSync
SyncStatus() syncstatus.StatusUpdater
Storage() spacestorage.SpaceStorage

View File

@ -95,6 +95,21 @@ func (mr *MockSpaceStorageMockRecorder) Id() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockSpaceStorage)(nil).Id))
}
// ReadSpaceHash mocks base method.
func (m *MockSpaceStorage) ReadSpaceHash() (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ReadSpaceHash")
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ReadSpaceHash indicates an expected call of ReadSpaceHash.
func (mr *MockSpaceStorageMockRecorder) ReadSpaceHash() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadSpaceHash", reflect.TypeOf((*MockSpaceStorage)(nil).ReadSpaceHash))
}
// SetTreeDeletedStatus mocks base method.
func (m *MockSpaceStorage) SetTreeDeletedStatus(arg0, arg1 string) error {
m.ctrl.T.Helper()
@ -197,3 +212,17 @@ func (mr *MockSpaceStorageMockRecorder) TreeStorage(arg0 interface{}) *gomock.Ca
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TreeStorage", reflect.TypeOf((*MockSpaceStorage)(nil).TreeStorage), arg0)
}
// WriteSpaceHash mocks base method.
func (m *MockSpaceStorage) WriteSpaceHash(arg0 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WriteSpaceHash", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// WriteSpaceHash indicates an expected call of WriteSpaceHash.
func (mr *MockSpaceStorageMockRecorder) WriteSpaceHash(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteSpaceHash", reflect.TypeOf((*MockSpaceStorage)(nil).WriteSpaceHash), arg0)
}

View File

@ -34,10 +34,13 @@ type SpaceStorage interface {
AclStorage() (liststorage.ListStorage, error)
SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error)
StoredIds() ([]string, error)
Close() error
TreeRoot(id string) (*treechangeproto.RawTreeChangeWithId, error)
TreeStorage(id string) (treestorage.TreeStorage, error)
CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error)
WriteSpaceHash(head string) error
ReadSpaceHash() (hash string, err error)
Close() error
}
type SpaceStorageCreatePayload struct {

View File

@ -1,7 +1,10 @@
//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/any-sync/nodeconf Service,Configuration
package nodeconf
import "github.com/anytypeio/go-chash"
import (
"github.com/anytypeio/go-chash"
"strings"
)
type Configuration interface {
// Id returns current nodeconf id
@ -18,6 +21,8 @@ type Configuration interface {
Addresses() map[string][]string
// CHash returns nodes consistent table
CHash() chash.CHash
// Partition returns partition number by spaceId
Partition(spaceId string) (part int)
}
type configuration struct {
@ -34,7 +39,7 @@ func (c *configuration) Id() string {
}
func (c *configuration) NodeIds(spaceId string) []string {
members := c.chash.GetMembers(spaceId)
members := c.chash.GetMembers(ReplKey(spaceId))
res := make([]string, 0, len(members))
for _, m := range members {
if m.Id() != c.accountId {
@ -45,7 +50,7 @@ func (c *configuration) NodeIds(spaceId string) []string {
}
func (c *configuration) IsResponsible(spaceId string) bool {
for _, m := range c.chash.GetMembers(spaceId) {
for _, m := range c.chash.GetMembers(ReplKey(spaceId)) {
if m.Id() == c.accountId {
return true
}
@ -72,3 +77,14 @@ func (c *configuration) Addresses() map[string][]string {
func (c *configuration) CHash() chash.CHash {
return c.chash
}
func (c *configuration) Partition(spaceId string) (part int) {
return c.chash.GetPartition(ReplKey(spaceId))
}
func ReplKey(spaceId string) (replKey string) {
if i := strings.LastIndex(spaceId, "."); i != -1 {
return spaceId[i+1:]
}
return spaceId
}

View File

@ -0,0 +1,71 @@
package nodeconf
import (
"fmt"
"github.com/anytypeio/go-chash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"math/rand"
"testing"
)
func TestReplKey(t *testing.T) {
assert.Equal(t, "repl", ReplKey("id.repl"))
assert.Equal(t, "repl", ReplKey("repl"))
assert.Equal(t, "", ReplKey("."))
}
func TestConfiguration_NodeIds(t *testing.T) {
ch, err := chash.New(chash.Config{
PartitionCount: partitionCount,
ReplicationFactor: replicationFactor,
})
require.NoError(t, err)
conf := &configuration{
id: "last",
accountId: "1",
chash: ch,
}
for i := 0; i < 10; i++ {
require.NoError(t, conf.chash.AddMembers(testMember(fmt.Sprint(i+1))))
}
t.Run("random keys", func(t *testing.T) {
for i := 0; i < 10; i++ {
spaceId := fmt.Sprintf("%d.%d", rand.Int(), rand.Int())
members := conf.NodeIds(spaceId)
if conf.IsResponsible(spaceId) {
assert.Len(t, members, 2)
} else {
assert.Len(t, members, 3)
}
}
})
t.Run("same repl key", func(t *testing.T) {
var prevMemb []string
for i := 0; i < 10; i++ {
spaceId := fmt.Sprintf("%d.%d", rand.Int(), 42)
members := conf.NodeIds(spaceId)
if conf.IsResponsible(spaceId) {
assert.Len(t, members, 2)
} else {
assert.Len(t, members, 3)
}
if i != 0 {
assert.Equal(t, prevMemb, members)
}
prevMemb = members
}
})
}
type testMember string
func (t testMember) Id() string {
return string(t)
}
func (t testMember) Capacity() float64 {
return 1
}