diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index ff376d31..eab5c441 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -6,13 +6,10 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" "go.uber.org/zap" "strings" - "time" ) type DiffService interface { @@ -26,11 +23,9 @@ type DiffService interface { type diffService struct { spaceId string - periodicSync *periodicSync + periodicSync PeriodicSync storage storage.SpaceStorage - nconf nodeconf.Configuration diff ldiff.Diff - cache cache.TreeCache log *zap.Logger syncPeriod int @@ -40,23 +35,27 @@ func NewDiffService( spaceId string, syncPeriod int, storage storage.SpaceStorage, - nconf nodeconf.Configuration, + conf nodeconf.Configuration, cache cache.TreeCache, log *zap.Logger) DiffService { + diff := ldiff.New(16, 16) + l := log.With(zap.String("spaceId", spaceId)) + syncer := newDiffSyncer(spaceId, diff, conf, cache, storage, l) + periodicSync := newPeriodicSync(syncPeriod, syncer, l) + return &diffService{ - spaceId: spaceId, - storage: storage, - nconf: nconf, - cache: cache, - log: log, - syncPeriod: syncPeriod, + spaceId: spaceId, + storage: storage, + periodicSync: periodicSync, + diff: diff, + log: log, + syncPeriod: syncPeriod, } } func (d *diffService) Init(objectIds []string) { - d.periodicSync = newPeriodicSync(d.syncPeriod, d.sync, d.log.With(zap.String("spaceId", d.spaceId))) - d.diff = ldiff.New(16, 16) d.fillDiff(objectIds) + d.periodicSync.Run() } func (d *diffService) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { @@ -80,49 +79,6 @@ func (d *diffService) Close() (err error) { return nil } -func (d *diffService) sync(ctx context.Context) error { - st := time.Now() - // diffing with responsible peers according to configuration - peers, err := d.nconf.ResponsiblePeers(ctx, d.spaceId) - if err != nil { - return err - } - for _, p := range peers { - if err := d.syncWithPeer(ctx, p); err != nil { - d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) - } - } - d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) - return nil -} - -func (d *diffService) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { - cl := spacesyncproto.NewDRPCSpaceClient(p) - rdiff := remotediff.NewRemoteDiff(d.spaceId, cl) - newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) - err = rpcerr.Unwrap(err) - if err != nil && err != spacesyncproto.ErrSpaceMissing { - return err - } - if err == spacesyncproto.ErrSpaceMissing { - return d.sendPushSpaceRequest(ctx, cl) - } - - d.pingTreesInCache(ctx, newIds) - d.pingTreesInCache(ctx, changedIds) - - d.log.Info("sync done:", zap.Int("newIds", len(newIds)), - zap.Int("changedIds", len(changedIds)), - zap.Int("removedIds", len(removedIds))) - return -} - -func (d *diffService) pingTreesInCache(ctx context.Context, trees []string) { - for _, tId := range trees { - _, _ = d.cache.GetTree(ctx, d.spaceId, tId) - } -} - func (d *diffService) fillDiff(objectIds []string) { var els = make([]ldiff.Element, 0, len(objectIds)) for _, id := range objectIds { @@ -142,30 +98,6 @@ func (d *diffService) fillDiff(objectIds []string) { d.diff.Set(els...) } -func (d *diffService) sendPushSpaceRequest(ctx context.Context, cl spacesyncproto.DRPCSpaceClient) (err error) { - aclStorage, err := d.storage.ACLStorage() - if err != nil { - return - } - - root, err := aclStorage.Root() - if err != nil { - return - } - - header, err := d.storage.SpaceHeader() - if err != nil { - return - } - - _, err = cl.PushSpace(ctx, &spacesyncproto.PushSpaceRequest{ - SpaceId: d.spaceId, - SpaceHeader: header, - AclRoot: root, - }) - return -} - func concatStrings(strs []string) string { var ( b strings.Builder diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go new file mode 100644 index 00000000..8eec6c9b --- /dev/null +++ b/common/commonspace/diffservice/diffsyncer.go @@ -0,0 +1,112 @@ +package diffservice + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" + "go.uber.org/zap" + "time" +) + +type DiffSyncer interface { + Sync(ctx context.Context) error +} + +func newDiffSyncer( + spaceId string, + diff ldiff.Diff, + nconf nodeconf.Configuration, + cache cache.TreeCache, + storage storage.SpaceStorage, + log *zap.Logger) DiffSyncer { + return &diffSyncer{ + diff: diff, + nconf: nconf, + spaceId: spaceId, + cache: cache, + storage: storage, + log: log, + } +} + +type diffSyncer struct { + diff ldiff.Diff + nconf nodeconf.Configuration + spaceId string + cache cache.TreeCache + storage storage.SpaceStorage + log *zap.Logger +} + +func (d *diffSyncer) Sync(ctx context.Context) error { + st := time.Now() + // diffing with responsible peers according to configuration + peers, err := d.nconf.ResponsiblePeers(ctx, d.spaceId) + if err != nil { + return err + } + for _, p := range peers { + if err := d.syncWithPeer(ctx, p); err != nil { + d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) + } + } + d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) + return nil +} + +func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { + cl := spacesyncproto.NewDRPCSpaceClient(p) + rdiff := remotediff.NewRemoteDiff(d.spaceId, cl) + newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) + err = rpcerr.Unwrap(err) + if err != nil && err != spacesyncproto.ErrSpaceMissing { + return err + } + if err == spacesyncproto.ErrSpaceMissing { + return d.sendPushSpaceRequest(ctx, cl) + } + + d.pingTreesInCache(ctx, newIds) + d.pingTreesInCache(ctx, changedIds) + + d.log.Info("sync done:", zap.Int("newIds", len(newIds)), + zap.Int("changedIds", len(changedIds)), + zap.Int("removedIds", len(removedIds))) + return +} + +func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { + for _, tId := range trees { + _, _ = d.cache.GetTree(ctx, d.spaceId, tId) + } +} + +func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, cl spacesyncproto.DRPCSpaceClient) (err error) { + aclStorage, err := d.storage.ACLStorage() + if err != nil { + return + } + + root, err := aclStorage.Root() + if err != nil { + return + } + + header, err := d.storage.SpaceHeader() + if err != nil { + return + } + + _, err = cl.PushSpace(ctx, &spacesyncproto.PushSpaceRequest{ + SpaceId: d.spaceId, + SpaceHeader: header, + AclRoot: root, + }) + return +} diff --git a/common/commonspace/diffservice/periodicsync.go b/common/commonspace/diffservice/periodicsync.go index 59616eab..a74b25cf 100644 --- a/common/commonspace/diffservice/periodicsync.go +++ b/common/commonspace/diffservice/periodicsync.go @@ -6,25 +6,34 @@ import ( "time" ) -func newPeriodicSync(periodSeconds int, sync func(ctx context.Context) error, l *zap.Logger) *periodicSync { +type PeriodicSync interface { + Run() + Close() +} + +func newPeriodicSync(periodSeconds int, syncer DiffSyncer, l *zap.Logger) *periodicSync { ctx, cancel := context.WithCancel(context.Background()) - ps := &periodicSync{ - log: l, - sync: sync, - syncCtx: ctx, - syncCancel: cancel, - syncLoopDone: make(chan struct{}), + return &periodicSync{ + syncer: syncer, + log: l, + syncCtx: ctx, + syncCancel: cancel, + syncLoopDone: make(chan struct{}), + periodSeconds: periodSeconds, } - go ps.syncLoop(periodSeconds) - return ps } type periodicSync struct { - log *zap.Logger - sync func(ctx context.Context) error - syncCtx context.Context - syncCancel context.CancelFunc - syncLoopDone chan struct{} + log *zap.Logger + syncer DiffSyncer + syncCtx context.Context + syncCancel context.CancelFunc + syncLoopDone chan struct{} + periodSeconds int +} + +func (p *periodicSync) Run() { + go p.syncLoop(p.periodSeconds) } func (p *periodicSync) syncLoop(periodSeconds int) { @@ -33,7 +42,7 @@ func (p *periodicSync) syncLoop(periodSeconds int) { doSync := func() { ctx, cancel := context.WithTimeout(p.syncCtx, time.Minute) defer cancel() - if err := p.sync(ctx); err != nil { + if err := p.syncer.Sync(ctx); err != nil { p.log.Warn("periodic sync error", zap.Error(err)) } } diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 59aa3621..0d9bf722 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -103,6 +103,8 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { cl := spacesyncproto.NewDRPCSpaceClient(peer) stream, err := cl.Stream(ctx) if err != nil { + err = rpcerr.Unwrap(err) + log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err) // so here probably the request is failed because there is no such space, // but diffService should handle such cases by sending pushSpace continue @@ -111,7 +113,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) if err != nil { err = rpcerr.Unwrap(err) - log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err) + log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err) continue } s.streamPool.AddAndReadStreamAsync(stream)