From ad87d5e545007199251cdfbb8669688a014ce21a Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 19 Dec 2022 16:59:43 +0100 Subject: [PATCH] Update status service --- client/clientspace/service.go | 6 +- common/commonspace/diffservice/diffservice.go | 4 +- common/commonspace/diffservice/diffsyncer.go | 2 + .../diffservice/diffsyncer_test.go | 2 +- common/commonspace/service.go | 4 +- .../statusservice/statusservice.go | 104 ++++++++++++++---- common/pkg/acl/tree/objecttree.go | 3 +- common/pkg/acl/tree/rawloader.go | 5 +- common/pkg/acl/tree/util.go | 18 --- common/util/slice/slice.go | 18 +++ 10 files changed, 116 insertions(+), 50 deletions(-) diff --git a/client/clientspace/service.go b/client/clientspace/service.go index 8ef871fa..a0326464 100644 --- a/client/clientspace/service.go +++ b/client/clientspace/service.go @@ -6,6 +6,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" @@ -100,8 +101,9 @@ func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object if err != nil { return } - ns.StatusService().SetUpdater(func(ctx context.Context, treeId string, status bool) (err error) { - log.With(zap.String("treeId", treeId), zap.Bool("synced", status)).Debug("updating sync status") + ns.StatusService().SetUpdater(func(ctx context.Context, treeId string, status statusservice.SyncStatus) (err error) { + log.With(zap.String("treeId", treeId), zap.Bool("synced", status == statusservice.SyncStatusSynced)). + Debug("updating sync status") return }) if err = ns.Init(ctx); err != nil { diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index 8bae2f08..5e277fdb 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -6,6 +6,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" @@ -49,12 +50,13 @@ func NewDiffService( storage storage.SpaceStorage, confConnector nodeconf.ConfConnector, cache treegetter.TreeGetter, + statusService statusservice.StatusService, log *zap.Logger) DiffService { diff := ldiff.New(16, 16) l := log.With(zap.String("spaceId", spaceId)) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient) - syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, l) + syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, statusService, l) periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l) return &diffService{ diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go index 962e5cac..70de2067 100644 --- a/common/commonspace/diffservice/diffsyncer.go +++ b/common/commonspace/diffservice/diffsyncer.go @@ -31,6 +31,7 @@ func newDiffSyncer( cache treegetter.TreeGetter, storage storage.SpaceStorage, clientFactory spacesyncproto.ClientFactory, + statusService statusservice.StatusService, log *zap.Logger) DiffSyncer { return &diffSyncer{ diff: diff, @@ -40,6 +41,7 @@ func newDiffSyncer( confConnector: confConnector, clientFactory: clientFactory, log: log, + statusService: statusService, } } diff --git a/common/commonspace/diffservice/diffsyncer_test.go b/common/commonspace/diffservice/diffsyncer_test.go index 88238b9a..cf41a154 100644 --- a/common/commonspace/diffservice/diffsyncer_test.go +++ b/common/commonspace/diffservice/diffsyncer_test.go @@ -110,7 +110,7 @@ func TestDiffSyncer_Sync(t *testing.T) { spaceId := "spaceId" aclRootId := "aclRootId" l := logger.NewNamed(spaceId) - diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, l) + diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, nil, l) delState.EXPECT().AddObserver(gomock.Any()) diffSyncer.Init(delState) diff --git a/common/commonspace/service.go b/common/commonspace/service.go index ae0e704b..4aeda2b1 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -115,10 +115,10 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { // TODO: maybe change this to dependency injection where we would inject the method `ProvideStatusService` // and for nodes there would be NoOpStatusService if !lastConfiguration.IsResponsible(st.Id()) { - statusService = statusservice.NewStatusService(st.Id(), lastConfiguration) + statusService = statusservice.NewStatusService(st.Id(), lastConfiguration, st) } - diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log) + diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, statusService, log) syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod) sp := &space{ id: id, diff --git a/common/commonspace/statusservice/statusservice.go b/common/commonspace/statusservice/statusservice.go index 2054ebd2..464645c6 100644 --- a/common/commonspace/statusservice/statusservice.go +++ b/common/commonspace/statusservice/statusservice.go @@ -2,9 +2,14 @@ package statusservice import ( "context" + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" + "go.uber.org/zap" "golang.org/x/exp/slices" "sync" "time" @@ -17,12 +22,12 @@ const ( var log = logger.NewNamed("commonspace.statusservice") -type Updater func(ctx context.Context, treeId string, status bool) (err error) +type Updater func(ctx context.Context, treeId string, status SyncStatus) (err error) type StatusService interface { HeadsChange(treeId string, heads []string) HeadsReceive(senderId, treeId string, heads []string) - Watch(treeId string) + Watch(treeId string) (err error) Unwatch(treeId string) StateCounter() uint64 RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) @@ -32,14 +37,24 @@ type StatusService interface { Close() error } -type statusEntry struct { - head string +type SyncStatus int + +const ( + SyncStatusUnknown SyncStatus = iota + SyncStatusSynced + SyncStatusNotSynced +) + +type treeHeadsEntry struct { + heads []string stateCounter uint64 + syncStatus SyncStatus } type treeStatus struct { treeId string - status bool + status SyncStatus + heads []string } type statusService struct { @@ -47,9 +62,10 @@ type statusService struct { configuration nodeconf.Configuration periodicSync periodicsync.PeriodicSync updater Updater + storage storage.SpaceStorage spaceId string - treeHeads map[string]statusEntry + treeHeads map[string]treeHeadsEntry watchers map[string]struct{} stateCounter uint64 closed bool @@ -57,12 +73,13 @@ type statusService struct { treeStatusBuf []treeStatus } -func NewStatusService(spaceId string, configuration nodeconf.Configuration) StatusService { +func NewStatusService(spaceId string, configuration nodeconf.Configuration, store storage.SpaceStorage) StatusService { return &statusService{ spaceId: spaceId, - treeHeads: map[string]statusEntry{}, + treeHeads: map[string]treeHeadsEntry{}, watchers: map[string]struct{}{}, configuration: configuration, + storage: store, stateCounter: 0, } } @@ -87,10 +104,13 @@ func (s *statusService) HeadsChange(treeId string, heads []string) { s.Lock() defer s.Unlock() - // TODO: save to storage - s.treeHeads[treeId] = statusEntry{ - head: heads[0], + var headsCopy []string + headsCopy = append(headsCopy, heads...) + + s.treeHeads[treeId] = treeHeadsEntry{ + heads: headsCopy, stateCounter: s.stateCounter, + syncStatus: SyncStatusNotSynced, } s.stateCounter++ } @@ -105,12 +125,18 @@ func (s *statusService) update(ctx context.Context) (err error) { } for treeId := range s.watchers { // that means that we haven't yet got the status update - _, exists := s.treeHeads[treeId] - s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, !exists}) + treeHeads, exists := s.treeHeads[treeId] + if !exists { + err = fmt.Errorf("treeHeads should always exist for watchers") + s.Unlock() + return + } + s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus, treeHeads.heads}) } s.Unlock() for _, entry := range s.treeStatusBuf { + log.With(zap.Bool("status", entry.status == SyncStatusSynced), zap.Strings("heads", entry.heads)).Debug("updating status") err = s.updater(ctx, entry.treeId, entry.status) if err != nil { return @@ -123,28 +149,59 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) { s.Lock() defer s.Unlock() - curHead, ok := s.treeHeads[treeId] - if !ok { + curTreeHeads, ok := s.treeHeads[treeId] + if !ok || curTreeHeads.syncStatus == SyncStatusSynced { return } + // checking if other node is responsible if len(heads) == 0 || !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { return } - // checking if we received the head that we are interested in - if !slices.Contains(heads, curHead.head) { - return - } - // TODO: save to storage - delete(s.treeHeads, treeId) + // checking if we received the head that we are interested in + for _, head := range heads { + if idx, found := slices.BinarySearch(curTreeHeads.heads, head); found { + curTreeHeads.heads[idx] = "" + } + } + curTreeHeads.heads = slice.DiscardFromSlice(curTreeHeads.heads, func(h string) bool { + return h == "" + }) + if len(curTreeHeads.heads) == 0 { + curTreeHeads.syncStatus = SyncStatusSynced + } + s.treeHeads[treeId] = curTreeHeads } -func (s *statusService) Watch(treeId string) { +func (s *statusService) Watch(treeId string) (err error) { s.Lock() defer s.Unlock() + _, ok := s.treeHeads[treeId] + if !ok { + var ( + st treestorage.TreeStorage + heads []string + ) + st, err = s.storage.TreeStorage(treeId) + if err != nil { + return + } + heads, err = st.Heads() + if err != nil { + return + } + slices.Sort(heads) + s.stateCounter++ + s.treeHeads[treeId] = treeHeadsEntry{ + heads: heads, + stateCounter: s.stateCounter, + syncStatus: SyncStatusUnknown, + } + } s.watchers[treeId] = struct{}{} + return } func (s *statusService) Unwatch(treeId string) { @@ -185,7 +242,8 @@ func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []st } // if we didn't find our treeId in heads ids which are different from us and node if _, found := slices.BinarySearch(differentRemoteIds, treeId); !found { - delete(s.treeHeads, treeId) + entry.syncStatus = SyncStatusSynced + s.treeHeads[treeId] = entry } } } diff --git a/common/pkg/acl/tree/objecttree.go b/common/pkg/acl/tree/objecttree.go index 772a7f1e..56e3b7a3 100644 --- a/common/pkg/acl/tree/objecttree.go +++ b/common/pkg/acl/tree/objecttree.go @@ -10,6 +10,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" "sync" ) @@ -341,7 +342,7 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang } } // discarding all previously seen changes - ot.newChangesBuf = discardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil }) + ot.newChangesBuf = slice.DiscardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil }) if shouldRebuildFromStorage { err = ot.rebuildFromStorage(changesPayload.NewHeads, ot.newChangesBuf) diff --git a/common/pkg/acl/tree/rawloader.go b/common/pkg/acl/tree/rawloader.go index bbb6509b..78b5114f 100644 --- a/common/pkg/acl/tree/rawloader.go +++ b/common/pkg/acl/tree/rawloader.go @@ -4,6 +4,7 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" "time" ) @@ -87,7 +88,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*treech return true }, func(visited []*Change) { - results = discardFromSlice(results, func(change *Change) bool { + results = slice.DiscardFromSlice(results, func(change *Change) bool { return change.visited }) }, @@ -209,7 +210,7 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi }) // discarding visited - buffer = discardFromSlice(buffer, func(change *treechangeproto.RawTreeChangeWithId) bool { + buffer = slice.DiscardFromSlice(buffer, func(change *treechangeproto.RawTreeChangeWithId) bool { return change == nil }) diff --git a/common/pkg/acl/tree/util.go b/common/pkg/acl/tree/util.go index baf7be14..0e6cc7cd 100644 --- a/common/pkg/acl/tree/util.go +++ b/common/pkg/acl/tree/util.go @@ -27,21 +27,3 @@ OuterLoop: } return ourPath[i+1], nil } - -func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) []T { - var ( - finishedIdx = 0 - currentIdx = 0 - ) - for currentIdx < len(elements) { - if !isDiscarded(elements[currentIdx]) { - if finishedIdx != currentIdx { - elements[finishedIdx] = elements[currentIdx] - } - finishedIdx++ - } - currentIdx++ - } - elements = elements[:finishedIdx] - return elements -} diff --git a/common/util/slice/slice.go b/common/util/slice/slice.go index cc7e9067..816a01c0 100644 --- a/common/util/slice/slice.go +++ b/common/util/slice/slice.go @@ -118,3 +118,21 @@ func UnsortedEquals(s1, s2 []string) bool { return SortedEquals(s1Sorted, s2Sorted) } + +func DiscardFromSlice[T any](elements []T, isDiscarded func(T) bool) []T { + var ( + finishedIdx = 0 + currentIdx = 0 + ) + for currentIdx < len(elements) { + if !isDiscarded(elements[currentIdx]) { + if finishedIdx != currentIdx { + elements[finishedIdx] = elements[currentIdx] + } + finishedIdx++ + } + currentIdx++ + } + elements = elements[:finishedIdx] + return elements +}