From 8c60e77b6f81cc84342fd2a563a3918de4159a6f Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 5 Dec 2022 18:51:59 +0100 Subject: [PATCH] Update synctree ping logic --- common/commonspace/diffservice/diffservice.go | 2 +- common/commonspace/diffservice/diffsyncer.go | 15 ++++++++++- .../commonspace/diffservice/headnotifiable.go | 11 -------- .../settingsdocument/settingsdocument.go | 5 ++-- .../synctree/mock_synctree/mock_synctree.go | 14 ++++++++++ common/commonspace/synctree/synctree.go | 20 +++++++++----- common/commonspace/synctree/synctree_test.go | 3 +++ .../nodeconf/mock_nodeconf/mock_nodeconf.go | 26 ++++++++++++++----- 8 files changed, 68 insertions(+), 28 deletions(-) delete mode 100644 common/commonspace/diffservice/headnotifiable.go diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index d3ac3211..31a678a9 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -21,7 +21,7 @@ type TreeHeads struct { } type DiffService interface { - HeadNotifiable + UpdateHeads(id string, heads []string) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) RemoveObjects(ids []string) AllIds() []string diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go index 1618ee71..49947c0a 100644 --- a/common/commonspace/diffservice/diffsyncer.go +++ b/common/commonspace/diffservice/diffsyncer.go @@ -6,6 +6,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" @@ -116,7 +117,19 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { for _, tId := range trees { - _, _ = d.cache.GetTree(ctx, d.spaceId, tId) + tree, err := d.cache.GetTree(ctx, d.spaceId, tId) + if err != nil { + continue + } + syncTree, ok := tree.(synctree.SyncTree) + if !ok { + continue + } + // the idea why we call it directly is that if we try to get it from cache + // it may be already there (i.e. loaded) + // and build func will not be called, thus we won't sync the tree + // therefore we just do it manually + syncTree.Ping() } } diff --git a/common/commonspace/diffservice/headnotifiable.go b/common/commonspace/diffservice/headnotifiable.go deleted file mode 100644 index 8e987dc9..00000000 --- a/common/commonspace/diffservice/headnotifiable.go +++ /dev/null @@ -1,11 +0,0 @@ -package diffservice - -type HeadNotifiable interface { - UpdateHeads(id string, heads []string) -} - -type HeadNotifiableFunc func(id string, heads []string) - -func (h HeadNotifiableFunc) UpdateHeads(id string, heads []string) { - h(id, heads) -} diff --git a/common/commonspace/settingsdocument/settingsdocument.go b/common/commonspace/settingsdocument/settingsdocument.go index e0241229..644fd692 100644 --- a/common/commonspace/settingsdocument/settingsdocument.go +++ b/common/commonspace/settingsdocument/settingsdocument.go @@ -110,8 +110,9 @@ func (s *settingsDocument) Rebuild(tr tree.ObjectTree) { } func (s *settingsDocument) Init(ctx context.Context) (err error) { - log.Debug("space settings id", zap.String("id", s.store.SpaceSettingsId())) - s.SyncTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s) + settingsId := s.store.SpaceSettingsId() + log.Debug("space settings id", zap.String("id", settingsId)) + s.SyncTree, err = s.buildFunc(ctx, settingsId, s) if err != nil { return } diff --git a/common/commonspace/synctree/mock_synctree/mock_synctree.go b/common/commonspace/synctree/mock_synctree/mock_synctree.go index 8f726e61..c8f84390 100644 --- a/common/commonspace/synctree/mock_synctree/mock_synctree.go +++ b/common/commonspace/synctree/mock_synctree/mock_synctree.go @@ -368,6 +368,20 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockSyncTree)(nil).Lock)) } +// Ping mocks base method. +func (m *MockSyncTree) Ping() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ping") + ret0, _ := ret[0].(error) + return ret0 +} + +// Ping indicates an expected call of Ping. +func (mr *MockSyncTreeMockRecorder) Ping() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping)) +} + // RLock mocks base method. func (m *MockSyncTree) RLock() { m.ctrl.T.Helper() diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index c6461c8a..cd874462 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" @@ -26,9 +25,14 @@ var ( ErrSyncTreeDeleted = errors.New("sync tree is deleted") ) +type HeadNotifiable interface { + UpdateHeads(id string, heads []string) +} + type SyncTree interface { tree.ObjectTree synchandler.SyncHandler + Ping() (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -36,7 +40,7 @@ type syncTree struct { tree.ObjectTree synchandler.SyncHandler syncClient SyncClient - notifiable diffservice.HeadNotifiable + notifiable HeadNotifiable listener updatelistener.UpdateListener treeUsage *atomic.Int32 isClosed bool @@ -54,7 +58,7 @@ type CreateDeps struct { SpaceId string Payload tree.ObjectTreeCreatePayload Configuration nodeconf.Configuration - HeadNotifiable diffservice.HeadNotifiable + HeadNotifiable HeadNotifiable StreamPool syncservice.StreamPool Listener updatelistener.UpdateListener AclList list.ACLList @@ -66,7 +70,7 @@ type BuildDeps struct { SpaceId string StreamPool syncservice.StreamPool Configuration nodeconf.Configuration - HeadNotifiable diffservice.HeadNotifiable + HeadNotifiable HeadNotifiable Listener updatelistener.UpdateListener AclList list.ACLList SpaceStorage spacestorage.SpaceStorage @@ -246,9 +250,6 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy if isFirstBuild { // send to everybody, because everybody should know that the node or client got new tree err = syncTree.syncClient.BroadcastAsync(headUpdate) - } else { - // send either to everybody if client or to replica set if node - err = syncTree.syncClient.BroadcastAsyncOrSendResponsible(headUpdate) } return } @@ -347,3 +348,8 @@ func (s *syncTree) checkAlive() (err error) { } return } + +func (s *syncTree) Ping() (err error) { + headUpdate := s.syncClient.CreateHeadUpdate(s, nil) + return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate) +} diff --git a/common/commonspace/synctree/synctree_test.go b/common/commonspace/synctree/synctree_test.go index c55be822..f35cd396 100644 --- a/common/commonspace/synctree/synctree_test.go +++ b/common/commonspace/synctree/synctree_test.go @@ -16,6 +16,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "sync/atomic" "testing" ) @@ -67,6 +68,7 @@ func Test_DeriveSyncTree(t *testing.T) { Payload: expectedPayload, Listener: updateListenerMock, SpaceStorage: spaceStorageMock, + TreeUsage: &atomic.Int32{}, } _, err := DeriveSyncTree(ctx, deps) @@ -103,6 +105,7 @@ func Test_CreateSyncTree(t *testing.T) { Payload: expectedPayload, Listener: updateListenerMock, SpaceStorage: spaceStorageMock, + TreeUsage: &atomic.Int32{}, } _, err := CreateSyncTree(ctx, deps) diff --git a/common/nodeconf/mock_nodeconf/mock_nodeconf.go b/common/nodeconf/mock_nodeconf/mock_nodeconf.go index 9e3ad580..408bbab3 100644 --- a/common/nodeconf/mock_nodeconf/mock_nodeconf.go +++ b/common/nodeconf/mock_nodeconf/mock_nodeconf.go @@ -209,19 +209,33 @@ func (m *MockConfConnector) EXPECT() *MockConfConnectorMockRecorder { return m.recorder } -// DialResponsiblePeers mocks base method. -func (m *MockConfConnector) DialResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) { +// Configuration mocks base method. +func (m *MockConfConnector) Configuration() nodeconf.Configuration { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DialResponsiblePeers", arg0, arg1) + ret := m.ctrl.Call(m, "Configuration") + ret0, _ := ret[0].(nodeconf.Configuration) + return ret0 +} + +// Configuration indicates an expected call of Configuration. +func (mr *MockConfConnectorMockRecorder) Configuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configuration", reflect.TypeOf((*MockConfConnector)(nil).Configuration)) +} + +// DialInactiveResponsiblePeers mocks base method. +func (m *MockConfConnector) DialInactiveResponsiblePeers(arg0 context.Context, arg1 string, arg2 []string) ([]peer.Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DialInactiveResponsiblePeers", arg0, arg1, arg2) ret0, _ := ret[0].([]peer.Peer) ret1, _ := ret[1].(error) return ret0, ret1 } -// DialResponsiblePeers indicates an expected call of DialResponsiblePeers. -func (mr *MockConfConnectorMockRecorder) DialResponsiblePeers(arg0, arg1 interface{}) *gomock.Call { +// DialInactiveResponsiblePeers indicates an expected call of DialInactiveResponsiblePeers. +func (mr *MockConfConnectorMockRecorder) DialInactiveResponsiblePeers(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialResponsiblePeers), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialInactiveResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialInactiveResponsiblePeers), arg0, arg1, arg2) } // GetResponsiblePeers mocks base method.