Few fixes to syncservice and streamchecker

This commit is contained in:
mcrakhman 2022-12-09 21:53:24 +01:00
parent 5eaa3bfe44
commit d7cce3c9fb
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
7 changed files with 40 additions and 15 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync"
"go.uber.org/zap"
"strings"
"time"
)
type TreeHeads struct {
@ -54,7 +55,7 @@ func NewDiffService(
l := log.With(zap.String("spaceId", spaceId))
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, l)
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, syncer.Sync, l)
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
return &diffService{
spaceId: spaceId,

View File

@ -9,7 +9,7 @@ import (
)
type StreamChecker interface {
CheckResponsiblePeers(ctx context.Context) error
CheckResponsiblePeers()
}
type streamChecker struct {
@ -18,6 +18,7 @@ type streamChecker struct {
streamPool StreamPool
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
syncCtx context.Context
}
func NewStreamChecker(
@ -25,6 +26,7 @@ func NewStreamChecker(
connector nodeconf.ConfConnector,
streamPool StreamPool,
clientFactory spacesyncproto.ClientFactory,
syncCtx context.Context,
log *zap.Logger) StreamChecker {
return &streamChecker{
spaceId: spaceId,
@ -32,10 +34,11 @@ func NewStreamChecker(
streamPool: streamPool,
clientFactory: clientFactory,
log: log,
syncCtx: syncCtx,
}
}
func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) {
func (s *streamChecker) CheckResponsiblePeers() {
var (
activeNodeIds []string
configuration = s.connector.Configuration()
@ -47,14 +50,14 @@ func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) {
continue
}
}
newPeers, err := s.connector.DialInactiveResponsiblePeers(ctx, s.spaceId, activeNodeIds)
newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds)
if err != nil {
s.log.Error("failed to dial peers", zap.Error(err))
return
}
for _, p := range newPeers {
stream, err := s.clientFactory.Client(p).Stream(ctx)
stream, err := s.clientFactory.Client(p).Stream(s.syncCtx)
if err != nil {
err = rpcerr.Unwrap(err)
s.log.Error("failed to open stream", zap.Error(err))

View File

@ -265,6 +265,7 @@ Loop:
select {
case <-limiter:
case <-stream.Context().Done():
log.Debug("stream context done")
break Loop
}
go func() {

View File

@ -34,6 +34,9 @@ type syncService struct {
checker StreamChecker
periodicSync periodicsync.PeriodicSync
objectGetter objectgetter.ObjectGetter
syncCtx context.Context
cancelSync context.CancelFunc
}
func NewSyncService(
@ -45,18 +48,25 @@ func NewSyncService(
})
clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
syncLog := log.Desugar().With(zap.String("id", spaceId))
syncCtx, cancel := context.WithCancel(context.Background())
checker := NewStreamChecker(
spaceId,
confConnector,
streamPool,
clientFactory,
syncCtx,
syncLog)
periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, checker.CheckResponsiblePeers, syncLog)
periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, 0, func(ctx context.Context) error {
checker.CheckResponsiblePeers()
return nil
}, syncLog)
syncService = newSyncService(
spaceId,
streamPool,
periodicSync,
checker)
checker,
syncCtx,
cancel)
return
}
@ -65,12 +75,16 @@ func newSyncService(
streamPool StreamPool,
periodicSync periodicsync.PeriodicSync,
checker StreamChecker,
syncCtx context.Context,
cancel context.CancelFunc,
) *syncService {
return &syncService{
periodicSync: periodicSync,
streamPool: streamPool,
spaceId: spaceId,
checker: checker,
syncCtx: syncCtx,
cancelSync: cancel,
}
}
@ -81,6 +95,7 @@ func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) {
func (s *syncService) Close() (err error) {
s.periodicSync.Close()
s.cancelSync()
return s.streamPool.Close()
}

View File

@ -2,7 +2,6 @@
package synctree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
@ -45,7 +44,7 @@ func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (e
if err != nil {
return
}
s.checker.CheckResponsiblePeers(context.Background())
s.checker.CheckResponsiblePeers()
return s.StreamPool.BroadcastAsync(objMsg)
}
@ -63,7 +62,7 @@ func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.Tr
return
}
if s.configuration.IsResponsible(s.spaceId) {
s.checker.CheckResponsiblePeers(context.Background())
s.checker.CheckResponsiblePeers()
return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg)
}
return s.BroadcastAsync(message)

View File

@ -14,7 +14,7 @@ type PeriodicSync interface {
type SyncerFunc func(ctx context.Context) error
func NewPeriodicSync(periodSeconds int, syncer SyncerFunc, l *zap.Logger) PeriodicSync {
func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l *zap.Logger) PeriodicSync {
ctx, cancel := context.WithCancel(context.Background())
return &periodicSync{
syncer: syncer,
@ -23,6 +23,7 @@ func NewPeriodicSync(periodSeconds int, syncer SyncerFunc, l *zap.Logger) Period
syncCancel: cancel,
syncLoopDone: make(chan struct{}),
periodSeconds: periodSeconds,
timeout: timeout,
}
}
@ -33,6 +34,7 @@ type periodicSync struct {
syncCancel context.CancelFunc
syncLoopDone chan struct{}
periodSeconds int
timeout time.Duration
}
func (p *periodicSync) Run() {
@ -43,8 +45,12 @@ func (p *periodicSync) syncLoop(periodSeconds int) {
period := time.Duration(periodSeconds) * time.Second
defer close(p.syncLoopDone)
doSync := func() {
ctx, cancel := context.WithTimeout(p.syncCtx, time.Minute)
defer cancel()
ctx := p.syncCtx
if p.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(p.syncCtx, p.timeout)
defer cancel()
}
if err := p.syncer(ctx); err != nil {
p.log.Warn("periodic sync error", zap.Error(err))
}

View File

@ -23,7 +23,7 @@ func TestPeriodicSync_Run(t *testing.T) {
times += 1
return nil
}
pSync := NewPeriodicSync(secs, diffSyncer, l)
pSync := NewPeriodicSync(secs, 0, diffSyncer, l)
pSync.Run()
pSync.Close()
@ -38,7 +38,7 @@ func TestPeriodicSync_Run(t *testing.T) {
times += 1
return nil
}
pSync := NewPeriodicSync(secs, diffSyncer, l)
pSync := NewPeriodicSync(secs, 0, diffSyncer, l)
pSync.Run()
time.Sleep(time.Second * time.Duration(secs))