From 4aa9ce1a274ec0a5f1d611a680169fd58e61dc00 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 18 Dec 2022 13:16:37 +0100 Subject: [PATCH] Change sync status update logic --- client/api/rpchandler.go | 25 +--- client/api/watcher.go | 37 ----- client/clientspace/service.go | 5 + common/commonspace/space.go | 3 + .../statusservice/statusservice.go | 139 ++++++++++++------ 5 files changed, 100 insertions(+), 109 deletions(-) delete mode 100644 client/api/watcher.go diff --git a/client/api/rpchandler.go b/client/api/rpchandler.go index 3c3edcba..ba6aca50 100644 --- a/client/api/rpchandler.go +++ b/client/api/rpchandler.go @@ -10,7 +10,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric" "math/rand" - "sync" ) type rpcHandler struct { @@ -18,8 +17,6 @@ type rpcHandler struct { storageService storage.ClientStorage docService document.Service account account.Service - treeWatcher *watcher - sync.Mutex } func (r *rpcHandler) Watch(ctx context.Context, request *apiproto.WatchRequest) (resp *apiproto.WatchResponse, err error) { @@ -27,13 +24,8 @@ func (r *rpcHandler) Watch(ctx context.Context, request *apiproto.WatchRequest) if err != nil { return } - r.Lock() - defer r.Unlock() - ch := make(chan bool) - r.treeWatcher = newWatcher(request.SpaceId, request.TreeId, ch) - space.StatusService().Watch(request.TreeId, ch) - go r.treeWatcher.run() + space.StatusService().Watch(request.TreeId) resp = &apiproto.WatchResponse{} return } @@ -43,22 +35,7 @@ func (r *rpcHandler) Unwatch(ctx context.Context, request *apiproto.UnwatchReque if err != nil { return } - var treeWatcher *watcher space.StatusService().Unwatch(request.TreeId) - - r.Lock() - if r.treeWatcher != nil { - treeWatcher = r.treeWatcher - } - r.Unlock() - - treeWatcher.close() - - r.Lock() - if r.treeWatcher == treeWatcher { - r.treeWatcher = nil - } - r.Unlock() resp = &apiproto.UnwatchResponse{} return } diff --git a/client/api/watcher.go b/client/api/watcher.go deleted file mode 100644 index e7d13a9b..00000000 --- a/client/api/watcher.go +++ /dev/null @@ -1,37 +0,0 @@ -package api - -import "go.uber.org/zap" - -type watcher struct { - spaceId string - treeId string - watcher chan bool - watcherDone chan struct{} -} - -func newWatcher(spaceId, treeId string, ch chan bool) *watcher { - return &watcher{ - spaceId: spaceId, - treeId: treeId, - watcher: ch, - watcherDone: make(chan struct{}), - } -} - -func (w *watcher) run() { - log := log.With(zap.String("spaceId", w.spaceId), zap.String("treeId", w.treeId)) - log.Debug("started watching") - defer close(w.watcherDone) - for { - synced, ok := <-w.watcher - if !ok { - log.Debug("stopped watching") - return - } - log.With(zap.Bool("synced", synced)).Debug("updated sync status") - } -} - -func (w *watcher) close() { - <-w.watcherDone -} diff --git a/client/clientspace/service.go b/client/clientspace/service.go index fc43d7b8..8ef871fa 100644 --- a/client/clientspace/service.go +++ b/client/clientspace/service.go @@ -10,6 +10,7 @@ import ( config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" + "go.uber.org/zap" "time" ) @@ -99,6 +100,10 @@ func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object if err != nil { return } + ns.StatusService().SetUpdater(func(ctx context.Context, treeId string, status bool) (err error) { + log.With(zap.String("treeId", treeId), zap.Bool("synced", status)).Debug("updating sync status") + return + }) if err = ns.Init(ctx); err != nil { return } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index fdf6e633..4e744060 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -190,6 +190,9 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } + if s.statusService != nil { + s.statusService.Run() + } return nil } diff --git a/common/commonspace/statusservice/statusservice.go b/common/commonspace/statusservice/statusservice.go index 02f803d8..2054ebd2 100644 --- a/common/commonspace/statusservice/statusservice.go +++ b/common/commonspace/statusservice/statusservice.go @@ -1,22 +1,34 @@ package statusservice import ( + "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync" "golang.org/x/exp/slices" "sync" + "time" +) + +const ( + statusServiceUpdateInterval = 5 + statusServiceTimeout = time.Second ) var log = logger.NewNamed("commonspace.statusservice") +type Updater func(ctx context.Context, treeId string, status bool) (err error) + type StatusService interface { HeadsChange(treeId string, heads []string) HeadsReceive(senderId, treeId string, heads []string) - Watch(treeId string, ch chan bool) + Watch(treeId string) Unwatch(treeId string) StateCounter() uint64 RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) + + SetUpdater(updater Updater) + Run() Close() error } @@ -25,122 +37,153 @@ type statusEntry struct { stateCounter uint64 } +type treeStatus struct { + treeId string + status bool +} + type statusService struct { sync.Mutex - spaceId string - treeHeads map[string]statusEntry - watchers map[string]chan bool configuration nodeconf.Configuration - stateCounter uint64 - closed bool + periodicSync periodicsync.PeriodicSync + updater Updater + + spaceId string + treeHeads map[string]statusEntry + watchers map[string]struct{} + stateCounter uint64 + closed bool + + treeStatusBuf []treeStatus } func NewStatusService(spaceId string, configuration nodeconf.Configuration) StatusService { return &statusService{ spaceId: spaceId, treeHeads: map[string]statusEntry{}, - watchers: map[string]chan bool{}, + watchers: map[string]struct{}{}, configuration: configuration, stateCounter: 0, } } +func (s *statusService) SetUpdater(updater Updater) { + s.Lock() + defer s.Unlock() + + s.updater = updater +} + +func (s *statusService) Run() { + s.periodicSync = periodicsync.NewPeriodicSync( + statusServiceUpdateInterval, + statusServiceTimeout, + s.update, + log) + s.periodicSync.Run() +} + func (s *statusService) HeadsChange(treeId string, heads []string) { s.Lock() defer s.Unlock() - if s.closed { - return - } + // TODO: save to storage s.treeHeads[treeId] = statusEntry{ head: heads[0], stateCounter: s.stateCounter, } - if watcher, ok := s.watchers[treeId]; ok { - select { - case watcher <- false: - default: + s.stateCounter++ +} + +func (s *statusService) update(ctx context.Context) (err error) { + s.treeStatusBuf = s.treeStatusBuf[:0] + + s.Lock() + if s.updater == nil { + s.Unlock() + return + } + for treeId := range s.watchers { + // that means that we haven't yet got the status update + _, exists := s.treeHeads[treeId] + s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, !exists}) + } + s.Unlock() + + for _, entry := range s.treeStatusBuf { + err = s.updater(ctx, entry.treeId, entry.status) + if err != nil { + return } } - s.stateCounter++ + return } func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) { s.Lock() defer s.Unlock() - if s.closed { - return - } curHead, ok := s.treeHeads[treeId] if !ok { return } + // checking if other node is responsible if len(heads) == 0 || !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { return } - if slice.FindPos(heads, curHead.head) == -1 { + // checking if we received the head that we are interested in + if !slices.Contains(heads, curHead.head) { return } + + // TODO: save to storage delete(s.treeHeads, treeId) - if watcher, ok := s.watchers[treeId]; ok { - select { - case watcher <- true: - default: - } - } } -func (s *statusService) Watch(treeId string, ch chan bool) { +func (s *statusService) Watch(treeId string) { s.Lock() defer s.Unlock() - if s.closed { - return - } - s.watchers[treeId] = ch + + s.watchers[treeId] = struct{}{} } func (s *statusService) Unwatch(treeId string) { s.Lock() defer s.Unlock() - if s.closed { - return - } - if ch, ok := s.watchers[treeId]; ok { - close(ch) + + if _, ok := s.watchers[treeId]; ok { delete(s.watchers, treeId) } } +func (s *statusService) Close() (err error) { + s.periodicSync.Close() + return +} + func (s *statusService) StateCounter() uint64 { s.Lock() defer s.Unlock() + return s.stateCounter } -func (s *statusService) Close() (err error) { - s.Lock() - defer s.Unlock() - if s.closed { - return - } - for _, ch := range s.watchers { - close(ch) - } - return -} - func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) { + // if sender is not a responsible node, then this should have no effect if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { return } + s.Lock() defer s.Unlock() + slices.Sort(differentRemoteIds) for treeId, entry := range s.treeHeads { + // if the current update is outdated if entry.stateCounter > stateCounter { continue } + // if we didn't find our treeId in heads ids which are different from us and node if _, found := slices.BinarySearch(differentRemoteIds, treeId); !found { delete(s.treeHeads, treeId) }