diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index c74afd8e..51192e9f 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -8,6 +8,7 @@ import ( "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/confconnector/mock_confconnector" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" + "github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage" "github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter" @@ -160,7 +161,7 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync space missing", func(t *testing.T) { - aclStorageMock := mock_treestorage.NewMockListStorage(ctrl) + aclStorageMock := mock_liststorage.NewMockListStorage(ctrl) settingsStorage := mock_treestorage.NewMockTreeStorage(ctrl) settingsId := "settingsId" aclRoot := &aclrecordproto.RawAclRecordWithId{ diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index 1809ec58..176d7192 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -175,21 +175,21 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont panic(err) } - err = ot.treeStorage.AddRawChange(rawChange) + err = ot.treeStorage.TransactionAdd([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id}) if err != nil { return } - err = ot.treeStorage.SetHeads([]string{objChange.Id}) - if err != nil { - return + mode := Append + if content.IsSnapshot { + mode = Rebuild } res = AddResult{ OldHeads: oldHeads, Heads: []string{objChange.Id}, Added: []*treechangeproto.RawTreeChangeWithId{rawChange}, - Mode: Append, + Mode: mode, } log.With("treeId", ot.id).With("head", objChange.Id). Debug("finished adding content") @@ -234,6 +234,7 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt } func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChangesPayload) (addResult AddResult, err error) { + lastHeadId := ot.tree.lastIteratedHeadId addResult, err = ot.addRawChanges(ctx, changesPayload) if err != nil { return @@ -242,16 +243,12 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang // reducing tree if we have new roots ot.tree.reduceTree() - // adding to database all the added changes only after they are good - for _, ch := range addResult.Added { - err = ot.treeStorage.AddRawChange(ch) - if err != nil { - return - } + // that means that we removed the ids while reducing + if _, exists := ot.tree.attached[lastHeadId]; !exists { + addResult.Mode = Rebuild } - // setting heads - err = ot.treeStorage.SetHeads(ot.tree.Heads()) + err = ot.treeStorage.TransactionAdd(addResult.Added, addResult.Heads) return } diff --git a/commonspace/object/tree/objecttree/objecttree_test.go b/commonspace/object/tree/objecttree/objecttree_test.go index 13956e2c..03f4322b 100644 --- a/commonspace/object/tree/objecttree/objecttree_test.go +++ b/commonspace/object/tree/objecttree/objecttree_test.go @@ -270,7 +270,8 @@ func TestObjectTree(t *testing.T) { assert.Equal(t, []string{"0"}, res.OldHeads) assert.Equal(t, []string{"4"}, res.Heads) assert.Equal(t, len(rawChanges), len(res.Added)) - assert.Equal(t, Append, res.Mode) + // here we have rebuild, because we reduced tree to new snapshot + assert.Equal(t, Rebuild, res.Mode) // check tree heads assert.Equal(t, []string{"4"}, objTree.Heads()) diff --git a/commonspace/object/tree/objecttree/objecttreefactory.go b/commonspace/object/tree/objecttree/objecttreefactory.go index ed34a91a..1bd5a885 100644 --- a/commonspace/object/tree/objecttree/objecttreefactory.go +++ b/commonspace/object/tree/objecttree/objecttreefactory.go @@ -7,8 +7,6 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey" "github.com/anytypeio/any-sync/util/keys/symmetric" - "github.com/anytypeio/any-sync/util/slice" - "go.uber.org/zap" "math/rand" "time" ) @@ -132,22 +130,6 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { if err != nil { return nil, err } - storageHeads, err := objTree.treeStorage.Heads() - if err != nil { - return nil, err - } - - // comparing rebuilt heads with heads in storage - // in theory it can happen that we didn't set heads because the process has crashed - // therefore we want to set them later - if !slice.UnsortedEquals(storageHeads, objTree.tree.Heads()) { - log.With(zap.Strings("storage", storageHeads), zap.Strings("rebuilt", objTree.tree.Heads())). - Errorf("the heads in storage and objTree are different") - err = objTree.treeStorage.SetHeads(objTree.tree.Heads()) - if err != nil { - return nil, err - } - } objTree.id = objTree.treeStorage.Id() objTree.rawRoot, err = objTree.treeStorage.Root() diff --git a/commonspace/object/tree/treestorage/inmemory.go b/commonspace/object/tree/treestorage/inmemory.go index 29db6f6e..c5c5a5cc 100644 --- a/commonspace/object/tree/treestorage/inmemory.go +++ b/commonspace/object/tree/treestorage/inmemory.go @@ -16,6 +16,17 @@ type inMemoryTreeStorage struct { sync.RWMutex } +func (t *inMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error { + t.RLock() + defer t.RUnlock() + + for _, ch := range changes { + t.changes[ch.Id] = ch + } + t.heads = append(t.heads[:0], heads...) + return nil +} + func NewInMemoryTreeStorage( root *treechangeproto.RawTreeChangeWithId, heads []string, @@ -61,11 +72,7 @@ func (t *inMemoryTreeStorage) Heads() ([]string, error) { func (t *inMemoryTreeStorage) SetHeads(heads []string) error { t.Lock() defer t.Unlock() - t.heads = t.heads[:0] - - for _, h := range heads { - t.heads = append(t.heads, h) - } + t.heads = append(t.heads[:0], heads...) return nil } diff --git a/commonspace/object/tree/treestorage/mock_treestorage/mock_storage.go b/commonspace/object/tree/treestorage/mock_treestorage/mock_treestorage.go similarity index 55% rename from commonspace/object/tree/treestorage/mock_treestorage/mock_storage.go rename to commonspace/object/tree/treestorage/mock_treestorage/mock_treestorage.go index 20a62933..2fe6029f 100644 --- a/commonspace/object/tree/treestorage/mock_treestorage/mock_storage.go +++ b/commonspace/object/tree/treestorage/mock_treestorage/mock_treestorage.go @@ -1,128 +1,17 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anytypeio/any-sync/pkg/acl/storage (interfaces: ListStorage,TreeStorage) +// Source: github.com/anytypeio/any-sync/commonspace/object/tree/treestorage (interfaces: TreeStorage) -// Package mock_storage is a generated GoMock package. +// Package mock_treestorage is a generated GoMock package. package mock_treestorage import ( context "context" - "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" - "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" reflect "reflect" + treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" gomock "github.com/golang/mock/gomock" ) -// MockListStorage is a mock of ListStorage interface. -type MockListStorage struct { - ctrl *gomock.Controller - recorder *MockListStorageMockRecorder -} - -// MockListStorageMockRecorder is the mock recorder for MockListStorage. -type MockListStorageMockRecorder struct { - mock *MockListStorage -} - -// NewMockListStorage creates a new mock instance. -func NewMockListStorage(ctrl *gomock.Controller) *MockListStorage { - mock := &MockListStorage{ctrl: ctrl} - mock.recorder = &MockListStorageMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockListStorage) EXPECT() *MockListStorageMockRecorder { - return m.recorder -} - -// AddRawRecord mocks base method. -func (m *MockListStorage) AddRawRecord(arg0 context.Context, arg1 *aclrecordproto.RawAclRecordWithId) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddRawRecord", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddRawRecord indicates an expected call of AddRawRecord. -func (mr *MockListStorageMockRecorder) AddRawRecord(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawRecord", reflect.TypeOf((*MockListStorage)(nil).AddRawRecord), arg0, arg1) -} - -// GetRawRecord mocks base method. -func (m *MockListStorage) GetRawRecord(arg0 context.Context, arg1 string) (*aclrecordproto.RawAclRecordWithId, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetRawRecord", arg0, arg1) - ret0, _ := ret[0].(*aclrecordproto.RawAclRecordWithId) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetRawRecord indicates an expected call of GetRawRecord. -func (mr *MockListStorageMockRecorder) GetRawRecord(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRawRecord", reflect.TypeOf((*MockListStorage)(nil).GetRawRecord), arg0, arg1) -} - -// Head mocks base method. -func (m *MockListStorage) Head() (string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Head") - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Head indicates an expected call of Head. -func (mr *MockListStorageMockRecorder) Head() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Head", reflect.TypeOf((*MockListStorage)(nil).Head)) -} - -// Id mocks base method. -func (m *MockListStorage) Id() string { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Id") - ret0, _ := ret[0].(string) - return ret0 -} - -// Id indicates an expected call of Id. -func (mr *MockListStorageMockRecorder) Id() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockListStorage)(nil).Id)) -} - -// Root mocks base method. -func (m *MockListStorage) Root() (*aclrecordproto.RawAclRecordWithId, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Root") - ret0, _ := ret[0].(*aclrecordproto.RawAclRecordWithId) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Root indicates an expected call of Root. -func (mr *MockListStorageMockRecorder) Root() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockListStorage)(nil).Root)) -} - -// SetHead mocks base method. -func (m *MockListStorage) SetHead(arg0 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetHead", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetHead indicates an expected call of SetHead. -func (mr *MockListStorageMockRecorder) SetHead(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHead", reflect.TypeOf((*MockListStorage)(nil).SetHead), arg0) -} - // MockTreeStorage is a mock of TreeStorage interface. type MockTreeStorage struct { ctrl *gomock.Controller @@ -261,3 +150,17 @@ func (mr *MockTreeStorageMockRecorder) SetHeads(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeads", reflect.TypeOf((*MockTreeStorage)(nil).SetHeads), arg0) } + +// TransactionAdd mocks base method. +func (m *MockTreeStorage) TransactionAdd(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TransactionAdd", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// TransactionAdd indicates an expected call of TransactionAdd. +func (mr *MockTreeStorageMockRecorder) TransactionAdd(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionAdd", reflect.TypeOf((*MockTreeStorage)(nil).TransactionAdd), arg0, arg1) +} diff --git a/commonspace/object/tree/treestorage/treestorage.go b/commonspace/object/tree/treestorage/treestorage.go index 10c6cf70..b0051d76 100644 --- a/commonspace/object/tree/treestorage/treestorage.go +++ b/commonspace/object/tree/treestorage/treestorage.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_treestorage/mock_treestorage.go github.com/anytypeio/any-sync/commonspace/object/tree/treestorage TreeStorage package treestorage import ( @@ -25,8 +26,9 @@ type TreeStorage interface { Root() (*treechangeproto.RawTreeChangeWithId, error) Heads() ([]string, error) SetHeads(heads []string) error - AddRawChange(change *treechangeproto.RawTreeChangeWithId) error + TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error + GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error) HasChange(ctx context.Context, id string) (bool, error) Delete() error diff --git a/commonspace/settings/idprovider.go b/commonspace/settings/idprovider.go index fdb568f8..d681e0d7 100644 --- a/commonspace/settings/idprovider.go +++ b/commonspace/settings/idprovider.go @@ -51,9 +51,8 @@ func (p *provider) ProvideIds(tr objecttree.ObjectTree, startId string) (ids []s } if startId == "" { - err = tr.IterateFrom(tr.Id(), convert, process) - } else { - err = tr.IterateFrom(startId, convert, process) + startId = rootId } + err = tr.IterateFrom(startId, convert, process) return } diff --git a/commonspace/settings/idprovider_test.go b/commonspace/settings/idprovider_test.go index 66194ae4..6a64b78b 100644 --- a/commonspace/settings/idprovider_test.go +++ b/commonspace/settings/idprovider_test.go @@ -78,8 +78,7 @@ func TestProvider_ProvideIds(t *testing.T) { t.Run("startId is empty", func(t *testing.T) { ch := &objecttree.Change{Id: "rootId"} objTree.EXPECT().Root().Return(ch) - objTree.EXPECT().Id().Return("id") - objTree.EXPECT().IterateFrom("id", gomock.Any(), gomock.Any()).Return(nil) + objTree.EXPECT().IterateFrom("rootId", gomock.Any(), gomock.Any()).Return(nil) _, _, err := prov.ProvideIds(objTree, "") require.NoError(t, err) }) diff --git a/commonspace/space.go b/commonspace/space.go index 1f28e327..ec944fab 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -24,6 +24,7 @@ import ( "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey" + "github.com/anytypeio/any-sync/util/slice" "github.com/zeebo/errs" "go.uber.org/zap" "strconv" @@ -228,7 +229,9 @@ func (s *space) Storage() spacestorage.SpaceStorage { } func (s *space) StoredIds() []string { - return s.headSync.AllIds() + return slice.DiscardFromSlice(s.headSync.AllIds(), func(id string) bool { + return id == s.settingsObject.Id() + }) } func (s *space) DebugAllHeads() []headsync.TreeHeads { diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 2915bd0c..664b3e1e 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -170,7 +170,7 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) return } - p, err = s.pool.DialOneOf(ctx, lastConfiguration.NodeIds(id)) + p, err = s.pool.GetOneOf(ctx, lastConfiguration.NodeIds(id)) if err != nil { return } diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 16476497..a5dfd0a0 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -76,9 +76,8 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro for _, addr := range addrs { conn, sc, err = d.handshake(ctx, addr) if err != nil { - log.Info("can't connect to host", zap.String("addr", addr)) + log.Info("can't connect to host", zap.String("addr", addr), zap.Error(err)) } else { - err = nil break } } @@ -99,7 +98,7 @@ func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc if err != nil { return } - log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("per", sc.LocalPeer().String())) + log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("addr", addr)) conn = drpcconn.NewWithOptions(sc, drpcconn.Options{Manager: drpcmanager.Options{ Reader: drpcwire.ReaderOptions{MaximumBufferSize: d.config.Stream.MaxMsgSizeMb * (1 << 20)}, }}) diff --git a/net/rpc/server/baseserver.go b/net/rpc/server/baseserver.go index a4457bbb..e7282904 100644 --- a/net/rpc/server/baseserver.go +++ b/net/rpc/server/baseserver.go @@ -66,13 +66,12 @@ func (s *BaseDrpcServer) serve(ctx context.Context, lis secureservice.ContextLis return default: } - ctx, conn, err := lis.Accept(ctx) + cctx, conn, err := lis.Accept(ctx) if err != nil { if isTemporary(err) { l.Debug("listener temporary accept error", zap.Error(err)) - t := time.NewTimer(500 * time.Millisecond) select { - case <-t.C: + case <-time.After(time.Second): case <-ctx.Done(): return } @@ -85,7 +84,7 @@ func (s *BaseDrpcServer) serve(ctx context.Context, lis secureservice.ContextLis l.Error("listener accept error", zap.Error(err)) return } - go s.serveConn(ctx, conn) + go s.serveConn(cctx, conn) } } diff --git a/nodeconf/mock_nodeconf/mock_nodeconf.go b/nodeconf/mock_nodeconf/mock_nodeconf.go index 9e0bd4f4..5d9e65e2 100644 --- a/nodeconf/mock_nodeconf/mock_nodeconf.go +++ b/nodeconf/mock_nodeconf/mock_nodeconf.go @@ -212,3 +212,17 @@ func (mr *MockConfigurationMockRecorder) NodeIds(arg0 interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NodeIds", reflect.TypeOf((*MockConfiguration)(nil).NodeIds), arg0) } + +// Partition mocks base method. +func (m *MockConfiguration) Partition(arg0 string) int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Partition", arg0) + ret0, _ := ret[0].(int) + return ret0 +} + +// Partition indicates an expected call of Partition. +func (mr *MockConfigurationMockRecorder) Partition(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Partition", reflect.TypeOf((*MockConfiguration)(nil).Partition), arg0) +}