From 15ccf29f0b9c130af645415e2a50726ba0f6d0c4 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 19 Dec 2022 17:54:06 +0100 Subject: [PATCH] Statusservice refactoring --- client/clientspace/service.go | 8 +-- client/clientspace/statusreceiver.go | 20 ++++++++ common/commonspace/diffservice/diffsyncer.go | 10 ++-- common/commonspace/service.go | 4 +- common/commonspace/space.go | 11 ++--- common/commonspace/statusservice/noop.go | 40 +++++++++++++++ .../statusservice/statusservice.go | 49 +++++++++++++------ common/commonspace/synctree/synctree.go | 13 ++--- common/commonspace/synctree/synctree_test.go | 30 +++++++----- .../commonspace/synctree/synctreehandler.go | 4 +- .../synctree/synctreehandler_test.go | 8 +-- 11 files changed, 129 insertions(+), 68 deletions(-) create mode 100644 client/clientspace/statusreceiver.go create mode 100644 common/commonspace/statusservice/noop.go diff --git a/client/clientspace/service.go b/client/clientspace/service.go index a0326464..ebd2efd0 100644 --- a/client/clientspace/service.go +++ b/client/clientspace/service.go @@ -6,12 +6,10 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" - "go.uber.org/zap" "time" ) @@ -101,11 +99,7 @@ func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object if err != nil { return } - ns.StatusService().SetUpdater(func(ctx context.Context, treeId string, status statusservice.SyncStatus) (err error) { - log.With(zap.String("treeId", treeId), zap.Bool("synced", status == statusservice.SyncStatusSynced)). - Debug("updating sync status") - return - }) + ns.StatusService().SetUpdateReceiver(&statusReceiver{}) if err = ns.Init(ctx); err != nil { return } diff --git a/client/clientspace/statusreceiver.go b/client/clientspace/statusreceiver.go new file mode 100644 index 00000000..5a22130e --- /dev/null +++ b/client/clientspace/statusreceiver.go @@ -0,0 +1,20 @@ +package clientspace + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" + "go.uber.org/zap" +) + +type statusReceiver struct { +} + +func (s *statusReceiver) UpdateTree(ctx context.Context, treeId string, status statusservice.SyncStatus) (err error) { + log.With(zap.String("treeId", treeId), zap.Bool("synced", status == statusservice.SyncStatusSynced)). + Debug("updating sync status") + return nil +} + +func (s *statusReceiver) UpdateNodeConnection(online bool) { + log.With(zap.Bool("nodes online", online)).Debug("updating node connection") +} diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go index 70de2067..9dc3beda 100644 --- a/common/commonspace/diffservice/diffsyncer.go +++ b/common/commonspace/diffservice/diffsyncer.go @@ -100,14 +100,14 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) rdiff = remotediff.NewRemoteDiff(d.spaceId, cl) stateCounter uint64 = 0 ) - if d.statusService != nil { - stateCounter = d.statusService.StateCounter() - } + stateCounter = d.statusService.StateCounter() newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) err = rpcerr.Unwrap(err) if err != nil && err != spacesyncproto.ErrSpaceMissing { + d.statusService.SetNodesOnline(p.Id(), false) return err } + d.statusService.SetNodesOnline(p.Id(), true) if err == spacesyncproto.ErrSpaceMissing { return d.sendPushSpaceRequest(ctx, cl) } @@ -115,9 +115,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) // not syncing ids which were removed through settings document filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds) - if d.statusService != nil { - d.statusService.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - } + d.statusService.RemoveAllExcept(p.Id(), filteredIds, stateCounter) ctx = peer.CtxWithPeerId(ctx, p.Id()) d.pingTreesInCache(ctx, filteredIds) diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 4aeda2b1..b9e11357 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -110,10 +110,8 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { lastConfiguration := s.configurationService.GetLast() confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool) - var statusService statusservice.StatusService + statusService := statusservice.NewNoOpStatusService() // this will work only for clients, not the best solution, but... - // TODO: maybe change this to dependency injection where we would inject the method `ProvideStatusService` - // and for nodes there would be NoOpStatusService if !lastConfiguration.IsResponsible(st.Id()) { statusService = statusservice.NewStatusService(st.Id(), lastConfiguration, st) } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 4e744060..d243e265 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -190,9 +190,7 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } - if s.statusService != nil { - s.statusService.Run() - } + s.statusService.Run() return nil } @@ -300,10 +298,9 @@ func (s *space) Close() error { if err := s.storage.Close(); err != nil { mError.Add(err) } - if s.statusService != nil { - if err := s.statusService.Close(); err != nil { - mError.Add(err) - } + if err := s.statusService.Close(); err != nil { + mError.Add(err) } + return mError.Err() } diff --git a/common/commonspace/statusservice/noop.go b/common/commonspace/statusservice/noop.go new file mode 100644 index 00000000..80455d9f --- /dev/null +++ b/common/commonspace/statusservice/noop.go @@ -0,0 +1,40 @@ +package statusservice + +type noOpStatusService struct{} + +func NewNoOpStatusService() StatusService { + return &noOpStatusService{} +} + +func (n *noOpStatusService) HeadsChange(treeId string, heads []string) { +} + +func (n *noOpStatusService) HeadsReceive(senderId, treeId string, heads []string) { +} + +func (n *noOpStatusService) Watch(treeId string) (err error) { + return +} + +func (n *noOpStatusService) Unwatch(treeId string) { +} + +func (n *noOpStatusService) SetNodesOnline(senderId string, online bool) { +} + +func (n *noOpStatusService) StateCounter() uint64 { + return 0 +} + +func (n *noOpStatusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) { +} + +func (n *noOpStatusService) SetUpdateReceiver(updater UpdateReceiver) { +} + +func (n *noOpStatusService) Run() { +} + +func (n *noOpStatusService) Close() error { + return nil +} diff --git a/common/commonspace/statusservice/statusservice.go b/common/commonspace/statusservice/statusservice.go index 464645c6..a43bec29 100644 --- a/common/commonspace/statusservice/statusservice.go +++ b/common/commonspace/statusservice/statusservice.go @@ -9,7 +9,6 @@ import ( treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" - "go.uber.org/zap" "golang.org/x/exp/slices" "sync" "time" @@ -22,17 +21,21 @@ const ( var log = logger.NewNamed("commonspace.statusservice") -type Updater func(ctx context.Context, treeId string, status SyncStatus) (err error) +type UpdateReceiver interface { + UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error) + UpdateNodeConnection(online bool) +} type StatusService interface { HeadsChange(treeId string, heads []string) HeadsReceive(senderId, treeId string, heads []string) Watch(treeId string) (err error) Unwatch(treeId string) + SetNodesOnline(senderId string, online bool) StateCounter() uint64 RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) - SetUpdater(updater Updater) + SetUpdateReceiver(updater UpdateReceiver) Run() Close() error } @@ -59,16 +62,16 @@ type treeStatus struct { type statusService struct { sync.Mutex - configuration nodeconf.Configuration - periodicSync periodicsync.PeriodicSync - updater Updater - storage storage.SpaceStorage + configuration nodeconf.Configuration + periodicSync periodicsync.PeriodicSync + updateReceiver UpdateReceiver + storage storage.SpaceStorage spaceId string treeHeads map[string]treeHeadsEntry watchers map[string]struct{} stateCounter uint64 - closed bool + nodesOnline bool treeStatusBuf []treeStatus } @@ -84,11 +87,11 @@ func NewStatusService(spaceId string, configuration nodeconf.Configuration, stor } } -func (s *statusService) SetUpdater(updater Updater) { +func (s *statusService) SetUpdateReceiver(updater UpdateReceiver) { s.Lock() defer s.Unlock() - s.updater = updater + s.updateReceiver = updater } func (s *statusService) Run() { @@ -115,11 +118,22 @@ func (s *statusService) HeadsChange(treeId string, heads []string) { s.stateCounter++ } +func (s *statusService) SetNodesOnline(senderId string, online bool) { + if !s.isSenderResponsible(senderId) { + return + } + + s.Lock() + defer s.Unlock() + + s.nodesOnline = online +} + func (s *statusService) update(ctx context.Context) (err error) { s.treeStatusBuf = s.treeStatusBuf[:0] s.Lock() - if s.updater == nil { + if s.updateReceiver == nil { s.Unlock() return } @@ -134,10 +148,9 @@ func (s *statusService) update(ctx context.Context) (err error) { s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus, treeHeads.heads}) } s.Unlock() - + s.updateReceiver.UpdateNodeConnection(s.nodesOnline) for _, entry := range s.treeStatusBuf { - log.With(zap.Bool("status", entry.status == SyncStatusSynced), zap.Strings("heads", entry.heads)).Debug("updating status") - err = s.updater(ctx, entry.treeId, entry.status) + err = s.updateReceiver.UpdateTree(ctx, entry.treeId, entry.status) if err != nil { return } @@ -155,7 +168,7 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) { } // checking if other node is responsible - if len(heads) == 0 || !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { + if len(heads) == 0 || !s.isSenderResponsible(senderId) { return } @@ -227,7 +240,7 @@ func (s *statusService) StateCounter() uint64 { func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) { // if sender is not a responsible node, then this should have no effect - if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { + if !s.isSenderResponsible(senderId) { return } @@ -247,3 +260,7 @@ func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []st } } } + +func (s *statusService) isSenderResponsible(senderId string) bool { + return slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) +} diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index b12751f7..e94ce67f 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -101,10 +101,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) deps.Configuration) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) - if deps.StatusService != nil { - // TODO: maybe change to no-op status service - deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) - } + deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) syncClient.BroadcastAsync(headUpdate) id = objTree.ID() return @@ -123,9 +120,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) - if deps.StatusService != nil { - deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) - } + deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) syncClient.BroadcastAsync(headUpdate) id = objTree.ID() return @@ -258,9 +253,7 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo if s.notifiable != nil { s.notifiable.UpdateHeads(s.ID(), res.Heads) } - if s.statusService != nil { - s.statusService.HeadsChange(s.ID(), res.Heads) - } + s.statusService.HeadsChange(s.ID(), res.Heads) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) err = s.syncClient.BroadcastAsync(headUpdate) return diff --git a/common/commonspace/synctree/synctree_test.go b/common/commonspace/synctree/synctree_test.go index 5c8acc6c..a65a0485 100644 --- a/common/commonspace/synctree/synctree_test.go +++ b/common/commonspace/synctree/synctree_test.go @@ -2,6 +2,7 @@ package synctree import ( "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree" @@ -64,10 +65,11 @@ func Test_DeriveSyncTree(t *testing.T) { syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate) syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil) deps := CreateDeps{ - AclList: aclListMock, - SpaceId: spaceId, - Payload: expectedPayload, - SpaceStorage: spaceStorageMock, + AclList: aclListMock, + SpaceId: spaceId, + Payload: expectedPayload, + SpaceStorage: spaceStorageMock, + StatusService: statusservice.NewNoOpStatusService(), } objTreeMock.EXPECT().ID().Return("id") @@ -98,10 +100,11 @@ func Test_CreateSyncTree(t *testing.T) { syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil) objTreeMock.EXPECT().ID().Return("id") deps := CreateDeps{ - AclList: aclListMock, - SpaceId: spaceId, - Payload: expectedPayload, - SpaceStorage: spaceStorageMock, + AclList: aclListMock, + SpaceId: spaceId, + Payload: expectedPayload, + SpaceStorage: spaceStorageMock, + StatusService: statusservice.NewNoOpStatusService(), } _, err := CreateSyncTree(ctx, deps) @@ -117,11 +120,12 @@ func Test_BuildSyncTree(t *testing.T) { syncClientMock := mock_synctree.NewMockSyncClient(ctrl) objTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl)) tr := &syncTree{ - ObjectTree: objTreeMock, - SyncHandler: nil, - syncClient: syncClientMock, - listener: updateListenerMock, - isClosed: false, + ObjectTree: objTreeMock, + SyncHandler: nil, + syncClient: syncClientMock, + listener: updateListenerMock, + isClosed: false, + statusService: statusservice.NewNoOpStatusService(), } headUpdate := &treechangeproto.TreeSyncMessage{} diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index 7994e1f1..c1d37298 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -40,9 +40,7 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms return } - if s.statusService != nil { - s.statusService.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled)) - } + s.statusService.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled)) queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.ReplyId) if queueFull { diff --git a/common/commonspace/synctree/synctreehandler_test.go b/common/commonspace/synctree/synctreehandler_test.go index 40cfa65f..c1a71982 100644 --- a/common/commonspace/synctree/synctreehandler_test.go +++ b/common/commonspace/synctree/synctreehandler_test.go @@ -3,6 +3,7 @@ package synctree import ( "context" "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree/mock_objecttree" @@ -49,9 +50,10 @@ func newSyncHandlerFixture(t *testing.T) *syncHandlerFixture { receiveQueueMock := mock_synctree.NewMockReceiveQueue(ctrl) syncHandler := &syncTreeHandler{ - objTree: objectTreeMock, - syncClient: syncClientMock, - queue: receiveQueueMock, + objTree: objectTreeMock, + syncClient: syncClientMock, + queue: receiveQueueMock, + statusService: statusservice.NewNoOpStatusService(), } return &syncHandlerFixture{ ctrl: ctrl,