Statusservice refactoring
This commit is contained in:
parent
ad87d5e545
commit
15ccf29f0b
@ -6,12 +6,10 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -101,11 +99,7 @@ func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ns.StatusService().SetUpdater(func(ctx context.Context, treeId string, status statusservice.SyncStatus) (err error) {
|
||||
log.With(zap.String("treeId", treeId), zap.Bool("synced", status == statusservice.SyncStatusSynced)).
|
||||
Debug("updating sync status")
|
||||
return
|
||||
})
|
||||
ns.StatusService().SetUpdateReceiver(&statusReceiver{})
|
||||
if err = ns.Init(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
20
client/clientspace/statusreceiver.go
Normal file
20
client/clientspace/statusreceiver.go
Normal file
@ -0,0 +1,20 @@
|
||||
package clientspace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type statusReceiver struct {
|
||||
}
|
||||
|
||||
func (s *statusReceiver) UpdateTree(ctx context.Context, treeId string, status statusservice.SyncStatus) (err error) {
|
||||
log.With(zap.String("treeId", treeId), zap.Bool("synced", status == statusservice.SyncStatusSynced)).
|
||||
Debug("updating sync status")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *statusReceiver) UpdateNodeConnection(online bool) {
|
||||
log.With(zap.Bool("nodes online", online)).Debug("updating node connection")
|
||||
}
|
||||
@ -100,14 +100,14 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
rdiff = remotediff.NewRemoteDiff(d.spaceId, cl)
|
||||
stateCounter uint64 = 0
|
||||
)
|
||||
if d.statusService != nil {
|
||||
stateCounter = d.statusService.StateCounter()
|
||||
}
|
||||
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
|
||||
err = rpcerr.Unwrap(err)
|
||||
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
||||
d.statusService.SetNodesOnline(p.Id(), false)
|
||||
return err
|
||||
}
|
||||
d.statusService.SetNodesOnline(p.Id(), true)
|
||||
if err == spacesyncproto.ErrSpaceMissing {
|
||||
return d.sendPushSpaceRequest(ctx, cl)
|
||||
}
|
||||
@ -115,9 +115,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
// not syncing ids which were removed through settings document
|
||||
filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds)
|
||||
|
||||
if d.statusService != nil {
|
||||
d.statusService.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
|
||||
}
|
||||
|
||||
ctx = peer.CtxWithPeerId(ctx, p.Id())
|
||||
d.pingTreesInCache(ctx, filteredIds)
|
||||
|
||||
@ -110,10 +110,8 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
|
||||
lastConfiguration := s.configurationService.GetLast()
|
||||
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
|
||||
|
||||
var statusService statusservice.StatusService
|
||||
statusService := statusservice.NewNoOpStatusService()
|
||||
// this will work only for clients, not the best solution, but...
|
||||
// TODO: maybe change this to dependency injection where we would inject the method `ProvideStatusService`
|
||||
// and for nodes there would be NoOpStatusService
|
||||
if !lastConfiguration.IsResponsible(st.Id()) {
|
||||
statusService = statusservice.NewStatusService(st.Id(), lastConfiguration, st)
|
||||
}
|
||||
|
||||
@ -190,9 +190,7 @@ func (s *space) Init(ctx context.Context) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.statusService != nil {
|
||||
s.statusService.Run()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -300,10 +298,9 @@ func (s *space) Close() error {
|
||||
if err := s.storage.Close(); err != nil {
|
||||
mError.Add(err)
|
||||
}
|
||||
if s.statusService != nil {
|
||||
if err := s.statusService.Close(); err != nil {
|
||||
mError.Add(err)
|
||||
}
|
||||
}
|
||||
|
||||
return mError.Err()
|
||||
}
|
||||
|
||||
40
common/commonspace/statusservice/noop.go
Normal file
40
common/commonspace/statusservice/noop.go
Normal file
@ -0,0 +1,40 @@
|
||||
package statusservice
|
||||
|
||||
type noOpStatusService struct{}
|
||||
|
||||
func NewNoOpStatusService() StatusService {
|
||||
return &noOpStatusService{}
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) HeadsChange(treeId string, heads []string) {
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) HeadsReceive(senderId, treeId string, heads []string) {
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) Watch(treeId string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) Unwatch(treeId string) {
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) SetNodesOnline(senderId string, online bool) {
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) StateCounter() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) SetUpdateReceiver(updater UpdateReceiver) {
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) Run() {
|
||||
}
|
||||
|
||||
func (n *noOpStatusService) Close() error {
|
||||
return nil
|
||||
}
|
||||
@ -9,7 +9,6 @@ import (
|
||||
treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"sync"
|
||||
"time"
|
||||
@ -22,17 +21,21 @@ const (
|
||||
|
||||
var log = logger.NewNamed("commonspace.statusservice")
|
||||
|
||||
type Updater func(ctx context.Context, treeId string, status SyncStatus) (err error)
|
||||
type UpdateReceiver interface {
|
||||
UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error)
|
||||
UpdateNodeConnection(online bool)
|
||||
}
|
||||
|
||||
type StatusService interface {
|
||||
HeadsChange(treeId string, heads []string)
|
||||
HeadsReceive(senderId, treeId string, heads []string)
|
||||
Watch(treeId string) (err error)
|
||||
Unwatch(treeId string)
|
||||
SetNodesOnline(senderId string, online bool)
|
||||
StateCounter() uint64
|
||||
RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64)
|
||||
|
||||
SetUpdater(updater Updater)
|
||||
SetUpdateReceiver(updater UpdateReceiver)
|
||||
Run()
|
||||
Close() error
|
||||
}
|
||||
@ -61,14 +64,14 @@ type statusService struct {
|
||||
sync.Mutex
|
||||
configuration nodeconf.Configuration
|
||||
periodicSync periodicsync.PeriodicSync
|
||||
updater Updater
|
||||
updateReceiver UpdateReceiver
|
||||
storage storage.SpaceStorage
|
||||
|
||||
spaceId string
|
||||
treeHeads map[string]treeHeadsEntry
|
||||
watchers map[string]struct{}
|
||||
stateCounter uint64
|
||||
closed bool
|
||||
nodesOnline bool
|
||||
|
||||
treeStatusBuf []treeStatus
|
||||
}
|
||||
@ -84,11 +87,11 @@ func NewStatusService(spaceId string, configuration nodeconf.Configuration, stor
|
||||
}
|
||||
}
|
||||
|
||||
func (s *statusService) SetUpdater(updater Updater) {
|
||||
func (s *statusService) SetUpdateReceiver(updater UpdateReceiver) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.updater = updater
|
||||
s.updateReceiver = updater
|
||||
}
|
||||
|
||||
func (s *statusService) Run() {
|
||||
@ -115,11 +118,22 @@ func (s *statusService) HeadsChange(treeId string, heads []string) {
|
||||
s.stateCounter++
|
||||
}
|
||||
|
||||
func (s *statusService) SetNodesOnline(senderId string, online bool) {
|
||||
if !s.isSenderResponsible(senderId) {
|
||||
return
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.nodesOnline = online
|
||||
}
|
||||
|
||||
func (s *statusService) update(ctx context.Context) (err error) {
|
||||
s.treeStatusBuf = s.treeStatusBuf[:0]
|
||||
|
||||
s.Lock()
|
||||
if s.updater == nil {
|
||||
if s.updateReceiver == nil {
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
@ -134,10 +148,9 @@ func (s *statusService) update(ctx context.Context) (err error) {
|
||||
s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus, treeHeads.heads})
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
s.updateReceiver.UpdateNodeConnection(s.nodesOnline)
|
||||
for _, entry := range s.treeStatusBuf {
|
||||
log.With(zap.Bool("status", entry.status == SyncStatusSynced), zap.Strings("heads", entry.heads)).Debug("updating status")
|
||||
err = s.updater(ctx, entry.treeId, entry.status)
|
||||
err = s.updateReceiver.UpdateTree(ctx, entry.treeId, entry.status)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -155,7 +168,7 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) {
|
||||
}
|
||||
|
||||
// checking if other node is responsible
|
||||
if len(heads) == 0 || !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) {
|
||||
if len(heads) == 0 || !s.isSenderResponsible(senderId) {
|
||||
return
|
||||
}
|
||||
|
||||
@ -227,7 +240,7 @@ func (s *statusService) StateCounter() uint64 {
|
||||
|
||||
func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
|
||||
// if sender is not a responsible node, then this should have no effect
|
||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) {
|
||||
if !s.isSenderResponsible(senderId) {
|
||||
return
|
||||
}
|
||||
|
||||
@ -247,3 +260,7 @@ func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []st
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *statusService) isSenderResponsible(senderId string) bool {
|
||||
return slices.Contains(s.configuration.NodeIds(s.spaceId), senderId)
|
||||
}
|
||||
|
||||
@ -101,10 +101,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
|
||||
deps.Configuration)
|
||||
|
||||
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
|
||||
if deps.StatusService != nil {
|
||||
// TODO: maybe change to no-op status service
|
||||
deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads())
|
||||
}
|
||||
syncClient.BroadcastAsync(headUpdate)
|
||||
id = objTree.ID()
|
||||
return
|
||||
@ -123,9 +120,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
|
||||
|
||||
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
|
||||
|
||||
if deps.StatusService != nil {
|
||||
deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads())
|
||||
}
|
||||
syncClient.BroadcastAsync(headUpdate)
|
||||
id = objTree.ID()
|
||||
return
|
||||
@ -258,9 +253,7 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
|
||||
if s.notifiable != nil {
|
||||
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
||||
}
|
||||
if s.statusService != nil {
|
||||
s.statusService.HeadsChange(s.ID(), res.Heads)
|
||||
}
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||
return
|
||||
|
||||
@ -2,6 +2,7 @@ package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree"
|
||||
@ -68,6 +69,7 @@ func Test_DeriveSyncTree(t *testing.T) {
|
||||
SpaceId: spaceId,
|
||||
Payload: expectedPayload,
|
||||
SpaceStorage: spaceStorageMock,
|
||||
StatusService: statusservice.NewNoOpStatusService(),
|
||||
}
|
||||
objTreeMock.EXPECT().ID().Return("id")
|
||||
|
||||
@ -102,6 +104,7 @@ func Test_CreateSyncTree(t *testing.T) {
|
||||
SpaceId: spaceId,
|
||||
Payload: expectedPayload,
|
||||
SpaceStorage: spaceStorageMock,
|
||||
StatusService: statusservice.NewNoOpStatusService(),
|
||||
}
|
||||
|
||||
_, err := CreateSyncTree(ctx, deps)
|
||||
@ -122,6 +125,7 @@ func Test_BuildSyncTree(t *testing.T) {
|
||||
syncClient: syncClientMock,
|
||||
listener: updateListenerMock,
|
||||
isClosed: false,
|
||||
statusService: statusservice.NewNoOpStatusService(),
|
||||
}
|
||||
|
||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||
|
||||
@ -40,9 +40,7 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
|
||||
return
|
||||
}
|
||||
|
||||
if s.statusService != nil {
|
||||
s.statusService.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled))
|
||||
}
|
||||
|
||||
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.ReplyId)
|
||||
if queueFull {
|
||||
|
||||
@ -3,6 +3,7 @@ package synctree
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree/mock_objecttree"
|
||||
@ -52,6 +53,7 @@ func newSyncHandlerFixture(t *testing.T) *syncHandlerFixture {
|
||||
objTree: objectTreeMock,
|
||||
syncClient: syncClientMock,
|
||||
queue: receiveQueueMock,
|
||||
statusService: statusservice.NewNoOpStatusService(),
|
||||
}
|
||||
return &syncHandlerFixture{
|
||||
ctrl: ctrl,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user