Add syncstatusprovider

This commit is contained in:
mcrakhman 2023-06-05 20:44:27 +02:00
parent 69e607eddb
commit 66775873c7
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
6 changed files with 65 additions and 36 deletions

View File

@ -57,7 +57,7 @@ type headSync struct {
peerManager peermanager.PeerManager peerManager peermanager.PeerManager
treeManager treemanager.TreeManager treeManager treemanager.TreeManager
credentialProvider credentialprovider.CredentialProvider credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusProvider syncStatus syncstatus.StatusService
deletionState deletionstate.ObjectDeletionState deletionState deletionstate.ObjectDeletionState
} }
@ -77,7 +77,7 @@ func (h *headSync) Init(a *app.App) (err error) {
h.diff = ldiff.New(16, 16) h.diff = ldiff.New(16, 16)
h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider)
h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusService)
h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState) h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState)
h.syncer = newDiffSyncer(h) h.syncer = newDiffSyncer(h)

View File

@ -90,7 +90,7 @@ type space struct {
treeBuilder objecttreebuilder.TreeBuilderComponent treeBuilder objecttreebuilder.TreeBuilderComponent
headSync headsync.HeadSync headSync headsync.HeadSync
objectSync objectsync.ObjectSync objectSync objectsync.ObjectSync
syncStatus syncstatus.StatusProvider syncStatus syncstatus.StatusService
settings settings.Settings settings settings.Settings
storage spacestorage.SpaceStorage storage spacestorage.SpaceStorage
aclList list.AclList aclList list.AclList
@ -164,7 +164,7 @@ func (s *space) Init(ctx context.Context) (err error) {
} }
s.treeBuilder = s.app.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent) s.treeBuilder = s.app.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent)
s.headSync = s.app.MustComponent(headsync.CName).(headsync.HeadSync) s.headSync = s.app.MustComponent(headsync.CName).(headsync.HeadSync)
s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusService)
s.settings = s.app.MustComponent(settings.CName).(settings.Settings) s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync) s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync)
s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)

View File

@ -23,6 +23,7 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/pool"
@ -53,16 +54,17 @@ type SpaceService interface {
} }
type spaceService struct { type spaceService struct {
config config.Config config config.Config
account accountservice.Service account accountservice.Service
configurationService nodeconf.Service configurationService nodeconf.Service
storageProvider spacestorage.SpaceStorageProvider storageProvider spacestorage.SpaceStorageProvider
peermanagerProvider peermanager.PeerManagerProvider peerManagerProvider peermanager.PeerManagerProvider
credentialProvider credentialprovider.CredentialProvider credentialProvider credentialprovider.CredentialProvider
treeManager treemanager.TreeManager statusServiceProvider syncstatus.StatusServiceProvider
pool pool.Pool treeManager treemanager.TreeManager
metric metric.Metric pool pool.Pool
app *app.App metric metric.Metric
app *app.App
} }
func (s *spaceService) Init(a *app.App) (err error) { func (s *spaceService) Init(a *app.App) (err error) {
@ -71,7 +73,8 @@ func (s *spaceService) Init(a *app.App) (err error) {
s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider) s.peerManagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider)
s.statusServiceProvider = a.MustComponent(syncstatus.CName).(syncstatus.StatusServiceProvider)
s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.metric, _ = a.Component(metric.CName).(metric.Metric) s.metric, _ = a.Component(metric.CName).(metric.Metric)
s.app = a s.app = a
@ -162,14 +165,16 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
} else { } else {
state.TreeBuilderFunc = objecttree.BuildEmptyDataObjectTree state.TreeBuilderFunc = objecttree.BuildEmptyDataObjectTree
} }
peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id) peerManager, err := s.peerManagerProvider.NewPeerManager(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
statusService := s.statusServiceProvider.NewStatusService()
spaceApp := s.app.ChildApp() spaceApp := s.app.ChildApp()
spaceApp.Register(state). spaceApp.Register(state).
Register(peerManager). Register(peerManager).
Register(newCommonStorage(st)). Register(newCommonStorage(st)).
Register(statusService).
Register(syncacl.New()). Register(syncacl.New()).
Register(requestmanager.New()). Register(requestmanager.New()).
Register(deletionstate.New()). Register(deletionstate.New()).

View File

@ -170,6 +170,25 @@ func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId st
return &mockPeerManager{}, nil return &mockPeerManager{}, nil
} }
//
// Mock StatusServiceProvider
//
type mockStatusServiceProvider struct {
}
func (m *mockStatusServiceProvider) Init(a *app.App) (err error) {
return nil
}
func (m *mockStatusServiceProvider) Name() (name string) {
return syncstatus.CName
}
func (m *mockStatusServiceProvider) NewStatusService() syncstatus.StatusService {
return syncstatus.NewNoOpSyncStatus()
}
// //
// Mock Pool // Mock Pool
// //
@ -339,8 +358,8 @@ func newFixture(t *testing.T) *spaceFixture {
} }
fx.app.Register(fx.account). fx.app.Register(fx.account).
Register(fx.config). Register(fx.config).
Register(syncstatus.NewNoOpSyncStatus()).
Register(credentialprovider.NewNoOp()). Register(credentialprovider.NewNoOp()).
Register(&mockStatusServiceProvider{}).
Register(fx.configurationService). Register(fx.configurationService).
Register(fx.storageProvider). Register(fx.storageProvider).
Register(fx.peermanagerProvider). Register(fx.peermanagerProvider).

View File

@ -5,7 +5,7 @@ import (
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
) )
func NewNoOpSyncStatus() StatusProvider { func NewNoOpSyncStatus() StatusService {
return &noOpSyncStatus{} return &noOpSyncStatus{}
} }

View File

@ -46,7 +46,12 @@ type StatusWatcher interface {
SetUpdateReceiver(updater UpdateReceiver) SetUpdateReceiver(updater UpdateReceiver)
} }
type StatusProvider interface { type StatusServiceProvider interface {
app.Component
NewStatusService() StatusService
}
type StatusService interface {
app.ComponentRunnable app.ComponentRunnable
StatusUpdater StatusUpdater
StatusWatcher StatusWatcher
@ -72,7 +77,7 @@ type treeStatus struct {
heads []string heads []string
} }
type syncStatusProvider struct { type syncStatusService struct {
sync.Mutex sync.Mutex
configuration nodeconf.NodeConf configuration nodeconf.NodeConf
periodicSync periodicsync.PeriodicSync periodicSync periodicsync.PeriodicSync
@ -91,14 +96,14 @@ type syncStatusProvider struct {
updateTimeout time.Duration updateTimeout time.Duration
} }
func NewSyncStatusProvider() StatusProvider { func NewSyncStatusProvider() StatusService {
return &syncStatusProvider{ return &syncStatusService{
treeHeads: map[string]treeHeadsEntry{}, treeHeads: map[string]treeHeadsEntry{},
watchers: map[string]struct{}{}, watchers: map[string]struct{}{},
} }
} }
func (s *syncStatusProvider) Init(a *app.App) (err error) { func (s *syncStatusService) Init(a *app.App) (err error) {
sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
s.updateIntervalSecs = syncUpdateInterval s.updateIntervalSecs = syncUpdateInterval
s.updateTimeout = syncTimeout s.updateTimeout = syncTimeout
@ -108,18 +113,18 @@ func (s *syncStatusProvider) Init(a *app.App) (err error) {
return return
} }
func (s *syncStatusProvider) Name() (name string) { func (s *syncStatusService) Name() (name string) {
return CName return CName
} }
func (s *syncStatusProvider) SetUpdateReceiver(updater UpdateReceiver) { func (s *syncStatusService) SetUpdateReceiver(updater UpdateReceiver) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
s.updateReceiver = updater s.updateReceiver = updater
} }
func (s *syncStatusProvider) Run(ctx context.Context) error { func (s *syncStatusService) Run(ctx context.Context) error {
s.periodicSync = periodicsync.NewPeriodicSync( s.periodicSync = periodicsync.NewPeriodicSync(
s.updateIntervalSecs, s.updateIntervalSecs,
s.updateTimeout, s.updateTimeout,
@ -129,7 +134,7 @@ func (s *syncStatusProvider) Run(ctx context.Context) error {
return nil return nil
} }
func (s *syncStatusProvider) HeadsChange(treeId string, heads []string) { func (s *syncStatusService) HeadsChange(treeId string, heads []string) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -144,7 +149,7 @@ func (s *syncStatusProvider) HeadsChange(treeId string, heads []string) {
s.stateCounter++ s.stateCounter++
} }
func (s *syncStatusProvider) SetNodesOnline(senderId string, online bool) { func (s *syncStatusService) SetNodesOnline(senderId string, online bool) {
if !s.isSenderResponsible(senderId) { if !s.isSenderResponsible(senderId) {
return return
} }
@ -155,7 +160,7 @@ func (s *syncStatusProvider) SetNodesOnline(senderId string, online bool) {
s.nodesOnline = online s.nodesOnline = online
} }
func (s *syncStatusProvider) update(ctx context.Context) (err error) { func (s *syncStatusService) update(ctx context.Context) (err error) {
s.treeStatusBuf = s.treeStatusBuf[:0] s.treeStatusBuf = s.treeStatusBuf[:0]
s.Lock() s.Lock()
@ -184,7 +189,7 @@ func (s *syncStatusProvider) update(ctx context.Context) (err error) {
return return
} }
func (s *syncStatusProvider) HeadsReceive(senderId, treeId string, heads []string) { func (s *syncStatusService) HeadsReceive(senderId, treeId string, heads []string) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -213,7 +218,7 @@ func (s *syncStatusProvider) HeadsReceive(senderId, treeId string, heads []strin
s.treeHeads[treeId] = curTreeHeads s.treeHeads[treeId] = curTreeHeads
} }
func (s *syncStatusProvider) Watch(treeId string) (err error) { func (s *syncStatusService) Watch(treeId string) (err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
_, ok := s.treeHeads[treeId] _, ok := s.treeHeads[treeId]
@ -243,7 +248,7 @@ func (s *syncStatusProvider) Watch(treeId string) (err error) {
return return
} }
func (s *syncStatusProvider) Unwatch(treeId string) { func (s *syncStatusService) Unwatch(treeId string) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -252,14 +257,14 @@ func (s *syncStatusProvider) Unwatch(treeId string) {
} }
} }
func (s *syncStatusProvider) StateCounter() uint64 { func (s *syncStatusService) StateCounter() uint64 {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
return s.stateCounter return s.stateCounter
} }
func (s *syncStatusProvider) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) { func (s *syncStatusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
// if sender is not a responsible node, then this should have no effect // if sender is not a responsible node, then this should have no effect
if !s.isSenderResponsible(senderId) { if !s.isSenderResponsible(senderId) {
return return
@ -282,11 +287,11 @@ func (s *syncStatusProvider) RemoveAllExcept(senderId string, differentRemoteIds
} }
} }
func (s *syncStatusProvider) Close(ctx context.Context) error { func (s *syncStatusService) Close(ctx context.Context) error {
s.periodicSync.Close() s.periodicSync.Close()
return nil return nil
} }
func (s *syncStatusProvider) isSenderResponsible(senderId string) bool { func (s *syncStatusService) isSenderResponsible(senderId string) bool {
return slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) return slices.Contains(s.configuration.NodeIds(s.spaceId), senderId)
} }