From ed3d7a7720a5a70eeeb688da985b59213e84d3ae Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 9 Dec 2022 21:53:24 +0100 Subject: [PATCH] Few fixes to syncservice and streamchecker --- common/commonspace/diffservice/diffservice.go | 3 ++- .../commonspace/syncservice/streamchecker.go | 11 +++++++---- common/commonspace/syncservice/streampool.go | 1 + common/commonspace/syncservice/syncservice.go | 19 +++++++++++++++++-- common/commonspace/synctree/syncclient.go | 5 ++--- common/util/periodicsync/periodicsync.go | 12 +++++++++--- common/util/periodicsync/periodicsync_test.go | 4 ++-- 7 files changed, 40 insertions(+), 15 deletions(-) diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index 31a678a9..8bae2f08 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -13,6 +13,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync" "go.uber.org/zap" "strings" + "time" ) type TreeHeads struct { @@ -54,7 +55,7 @@ func NewDiffService( l := log.With(zap.String("spaceId", spaceId)) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient) syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, l) - periodicSync := periodicsync.NewPeriodicSync(syncPeriod, syncer.Sync, l) + periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l) return &diffService{ spaceId: spaceId, diff --git a/common/commonspace/syncservice/streamchecker.go b/common/commonspace/syncservice/streamchecker.go index cbc152dd..8881c3d3 100644 --- a/common/commonspace/syncservice/streamchecker.go +++ b/common/commonspace/syncservice/streamchecker.go @@ -9,7 +9,7 @@ import ( ) type StreamChecker interface { - CheckResponsiblePeers(ctx context.Context) error + CheckResponsiblePeers() } type streamChecker struct { @@ -18,6 +18,7 @@ type streamChecker struct { streamPool StreamPool clientFactory spacesyncproto.ClientFactory log *zap.Logger + syncCtx context.Context } func NewStreamChecker( @@ -25,6 +26,7 @@ func NewStreamChecker( connector nodeconf.ConfConnector, streamPool StreamPool, clientFactory spacesyncproto.ClientFactory, + syncCtx context.Context, log *zap.Logger) StreamChecker { return &streamChecker{ spaceId: spaceId, @@ -32,10 +34,11 @@ func NewStreamChecker( streamPool: streamPool, clientFactory: clientFactory, log: log, + syncCtx: syncCtx, } } -func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) { +func (s *streamChecker) CheckResponsiblePeers() { var ( activeNodeIds []string configuration = s.connector.Configuration() @@ -47,14 +50,14 @@ func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) { continue } } - newPeers, err := s.connector.DialInactiveResponsiblePeers(ctx, s.spaceId, activeNodeIds) + newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds) if err != nil { s.log.Error("failed to dial peers", zap.Error(err)) return } for _, p := range newPeers { - stream, err := s.clientFactory.Client(p).Stream(ctx) + stream, err := s.clientFactory.Client(p).Stream(s.syncCtx) if err != nil { err = rpcerr.Unwrap(err) s.log.Error("failed to open stream", zap.Error(err)) diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 578d3b59..0a715352 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -265,6 +265,7 @@ Loop: select { case <-limiter: case <-stream.Context().Done(): + log.Debug("stream context done") break Loop } go func() { diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 18d613c9..986fa5cc 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -34,6 +34,9 @@ type syncService struct { checker StreamChecker periodicSync periodicsync.PeriodicSync objectGetter objectgetter.ObjectGetter + + syncCtx context.Context + cancelSync context.CancelFunc } func NewSyncService( @@ -45,18 +48,25 @@ func NewSyncService( }) clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient) syncLog := log.Desugar().With(zap.String("id", spaceId)) + syncCtx, cancel := context.WithCancel(context.Background()) checker := NewStreamChecker( spaceId, confConnector, streamPool, clientFactory, + syncCtx, syncLog) - periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, checker.CheckResponsiblePeers, syncLog) + periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, 0, func(ctx context.Context) error { + checker.CheckResponsiblePeers() + return nil + }, syncLog) syncService = newSyncService( spaceId, streamPool, periodicSync, - checker) + checker, + syncCtx, + cancel) return } @@ -65,12 +75,16 @@ func newSyncService( streamPool StreamPool, periodicSync periodicsync.PeriodicSync, checker StreamChecker, + syncCtx context.Context, + cancel context.CancelFunc, ) *syncService { return &syncService{ periodicSync: periodicSync, streamPool: streamPool, spaceId: spaceId, checker: checker, + syncCtx: syncCtx, + cancelSync: cancel, } } @@ -81,6 +95,7 @@ func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) { func (s *syncService) Close() (err error) { s.periodicSync.Close() + s.cancelSync() return s.streamPool.Close() } diff --git a/common/commonspace/synctree/syncclient.go b/common/commonspace/synctree/syncclient.go index 54eed4de..4e1ad1e0 100644 --- a/common/commonspace/synctree/syncclient.go +++ b/common/commonspace/synctree/syncclient.go @@ -2,7 +2,6 @@ package synctree import ( - "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" @@ -45,7 +44,7 @@ func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (e if err != nil { return } - s.checker.CheckResponsiblePeers(context.Background()) + s.checker.CheckResponsiblePeers() return s.StreamPool.BroadcastAsync(objMsg) } @@ -63,7 +62,7 @@ func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.Tr return } if s.configuration.IsResponsible(s.spaceId) { - s.checker.CheckResponsiblePeers(context.Background()) + s.checker.CheckResponsiblePeers() return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg) } return s.BroadcastAsync(message) diff --git a/common/util/periodicsync/periodicsync.go b/common/util/periodicsync/periodicsync.go index 52db0935..d3d5e3e5 100644 --- a/common/util/periodicsync/periodicsync.go +++ b/common/util/periodicsync/periodicsync.go @@ -14,7 +14,7 @@ type PeriodicSync interface { type SyncerFunc func(ctx context.Context) error -func NewPeriodicSync(periodSeconds int, syncer SyncerFunc, l *zap.Logger) PeriodicSync { +func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l *zap.Logger) PeriodicSync { ctx, cancel := context.WithCancel(context.Background()) return &periodicSync{ syncer: syncer, @@ -23,6 +23,7 @@ func NewPeriodicSync(periodSeconds int, syncer SyncerFunc, l *zap.Logger) Period syncCancel: cancel, syncLoopDone: make(chan struct{}), periodSeconds: periodSeconds, + timeout: timeout, } } @@ -33,6 +34,7 @@ type periodicSync struct { syncCancel context.CancelFunc syncLoopDone chan struct{} periodSeconds int + timeout time.Duration } func (p *periodicSync) Run() { @@ -43,8 +45,12 @@ func (p *periodicSync) syncLoop(periodSeconds int) { period := time.Duration(periodSeconds) * time.Second defer close(p.syncLoopDone) doSync := func() { - ctx, cancel := context.WithTimeout(p.syncCtx, time.Minute) - defer cancel() + ctx := p.syncCtx + if p.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(p.syncCtx, p.timeout) + defer cancel() + } if err := p.syncer(ctx); err != nil { p.log.Warn("periodic sync error", zap.Error(err)) } diff --git a/common/util/periodicsync/periodicsync_test.go b/common/util/periodicsync/periodicsync_test.go index c4463e41..c7802323 100644 --- a/common/util/periodicsync/periodicsync_test.go +++ b/common/util/periodicsync/periodicsync_test.go @@ -23,7 +23,7 @@ func TestPeriodicSync_Run(t *testing.T) { times += 1 return nil } - pSync := NewPeriodicSync(secs, diffSyncer, l) + pSync := NewPeriodicSync(secs, 0, diffSyncer, l) pSync.Run() pSync.Close() @@ -38,7 +38,7 @@ func TestPeriodicSync_Run(t *testing.T) { times += 1 return nil } - pSync := NewPeriodicSync(secs, diffSyncer, l) + pSync := NewPeriodicSync(secs, 0, diffSyncer, l) pSync.Run() time.Sleep(time.Second * time.Duration(secs))