Few fixes to syncservice and streamchecker
This commit is contained in:
parent
30a8194638
commit
ed3d7a7720
@ -13,6 +13,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync"
|
||||
"go.uber.org/zap"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TreeHeads struct {
|
||||
@ -54,7 +55,7 @@ func NewDiffService(
|
||||
l := log.With(zap.String("spaceId", spaceId))
|
||||
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
|
||||
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, l)
|
||||
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, syncer.Sync, l)
|
||||
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
|
||||
|
||||
return &diffService{
|
||||
spaceId: spaceId,
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
type StreamChecker interface {
|
||||
CheckResponsiblePeers(ctx context.Context) error
|
||||
CheckResponsiblePeers()
|
||||
}
|
||||
|
||||
type streamChecker struct {
|
||||
@ -18,6 +18,7 @@ type streamChecker struct {
|
||||
streamPool StreamPool
|
||||
clientFactory spacesyncproto.ClientFactory
|
||||
log *zap.Logger
|
||||
syncCtx context.Context
|
||||
}
|
||||
|
||||
func NewStreamChecker(
|
||||
@ -25,6 +26,7 @@ func NewStreamChecker(
|
||||
connector nodeconf.ConfConnector,
|
||||
streamPool StreamPool,
|
||||
clientFactory spacesyncproto.ClientFactory,
|
||||
syncCtx context.Context,
|
||||
log *zap.Logger) StreamChecker {
|
||||
return &streamChecker{
|
||||
spaceId: spaceId,
|
||||
@ -32,10 +34,11 @@ func NewStreamChecker(
|
||||
streamPool: streamPool,
|
||||
clientFactory: clientFactory,
|
||||
log: log,
|
||||
syncCtx: syncCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) {
|
||||
func (s *streamChecker) CheckResponsiblePeers() {
|
||||
var (
|
||||
activeNodeIds []string
|
||||
configuration = s.connector.Configuration()
|
||||
@ -47,14 +50,14 @@ func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
newPeers, err := s.connector.DialInactiveResponsiblePeers(ctx, s.spaceId, activeNodeIds)
|
||||
newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds)
|
||||
if err != nil {
|
||||
s.log.Error("failed to dial peers", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, p := range newPeers {
|
||||
stream, err := s.clientFactory.Client(p).Stream(ctx)
|
||||
stream, err := s.clientFactory.Client(p).Stream(s.syncCtx)
|
||||
if err != nil {
|
||||
err = rpcerr.Unwrap(err)
|
||||
s.log.Error("failed to open stream", zap.Error(err))
|
||||
|
||||
@ -265,6 +265,7 @@ Loop:
|
||||
select {
|
||||
case <-limiter:
|
||||
case <-stream.Context().Done():
|
||||
log.Debug("stream context done")
|
||||
break Loop
|
||||
}
|
||||
go func() {
|
||||
|
||||
@ -34,6 +34,9 @@ type syncService struct {
|
||||
checker StreamChecker
|
||||
periodicSync periodicsync.PeriodicSync
|
||||
objectGetter objectgetter.ObjectGetter
|
||||
|
||||
syncCtx context.Context
|
||||
cancelSync context.CancelFunc
|
||||
}
|
||||
|
||||
func NewSyncService(
|
||||
@ -45,18 +48,25 @@ func NewSyncService(
|
||||
})
|
||||
clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
|
||||
syncLog := log.Desugar().With(zap.String("id", spaceId))
|
||||
syncCtx, cancel := context.WithCancel(context.Background())
|
||||
checker := NewStreamChecker(
|
||||
spaceId,
|
||||
confConnector,
|
||||
streamPool,
|
||||
clientFactory,
|
||||
syncCtx,
|
||||
syncLog)
|
||||
periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, checker.CheckResponsiblePeers, syncLog)
|
||||
periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, 0, func(ctx context.Context) error {
|
||||
checker.CheckResponsiblePeers()
|
||||
return nil
|
||||
}, syncLog)
|
||||
syncService = newSyncService(
|
||||
spaceId,
|
||||
streamPool,
|
||||
periodicSync,
|
||||
checker)
|
||||
checker,
|
||||
syncCtx,
|
||||
cancel)
|
||||
return
|
||||
}
|
||||
|
||||
@ -65,12 +75,16 @@ func newSyncService(
|
||||
streamPool StreamPool,
|
||||
periodicSync periodicsync.PeriodicSync,
|
||||
checker StreamChecker,
|
||||
syncCtx context.Context,
|
||||
cancel context.CancelFunc,
|
||||
) *syncService {
|
||||
return &syncService{
|
||||
periodicSync: periodicSync,
|
||||
streamPool: streamPool,
|
||||
spaceId: spaceId,
|
||||
checker: checker,
|
||||
syncCtx: syncCtx,
|
||||
cancelSync: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,6 +95,7 @@ func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) {
|
||||
|
||||
func (s *syncService) Close() (err error) {
|
||||
s.periodicSync.Close()
|
||||
s.cancelSync()
|
||||
return s.streamPool.Close()
|
||||
}
|
||||
|
||||
|
||||
@ -2,7 +2,6 @@
|
||||
package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
@ -45,7 +44,7 @@ func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (e
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.checker.CheckResponsiblePeers(context.Background())
|
||||
s.checker.CheckResponsiblePeers()
|
||||
return s.StreamPool.BroadcastAsync(objMsg)
|
||||
}
|
||||
|
||||
@ -63,7 +62,7 @@ func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.Tr
|
||||
return
|
||||
}
|
||||
if s.configuration.IsResponsible(s.spaceId) {
|
||||
s.checker.CheckResponsiblePeers(context.Background())
|
||||
s.checker.CheckResponsiblePeers()
|
||||
return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg)
|
||||
}
|
||||
return s.BroadcastAsync(message)
|
||||
|
||||
@ -14,7 +14,7 @@ type PeriodicSync interface {
|
||||
|
||||
type SyncerFunc func(ctx context.Context) error
|
||||
|
||||
func NewPeriodicSync(periodSeconds int, syncer SyncerFunc, l *zap.Logger) PeriodicSync {
|
||||
func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l *zap.Logger) PeriodicSync {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &periodicSync{
|
||||
syncer: syncer,
|
||||
@ -23,6 +23,7 @@ func NewPeriodicSync(periodSeconds int, syncer SyncerFunc, l *zap.Logger) Period
|
||||
syncCancel: cancel,
|
||||
syncLoopDone: make(chan struct{}),
|
||||
periodSeconds: periodSeconds,
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@ -33,6 +34,7 @@ type periodicSync struct {
|
||||
syncCancel context.CancelFunc
|
||||
syncLoopDone chan struct{}
|
||||
periodSeconds int
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func (p *periodicSync) Run() {
|
||||
@ -43,8 +45,12 @@ func (p *periodicSync) syncLoop(periodSeconds int) {
|
||||
period := time.Duration(periodSeconds) * time.Second
|
||||
defer close(p.syncLoopDone)
|
||||
doSync := func() {
|
||||
ctx, cancel := context.WithTimeout(p.syncCtx, time.Minute)
|
||||
defer cancel()
|
||||
ctx := p.syncCtx
|
||||
if p.timeout != 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(p.syncCtx, p.timeout)
|
||||
defer cancel()
|
||||
}
|
||||
if err := p.syncer(ctx); err != nil {
|
||||
p.log.Warn("periodic sync error", zap.Error(err))
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ func TestPeriodicSync_Run(t *testing.T) {
|
||||
times += 1
|
||||
return nil
|
||||
}
|
||||
pSync := NewPeriodicSync(secs, diffSyncer, l)
|
||||
pSync := NewPeriodicSync(secs, 0, diffSyncer, l)
|
||||
|
||||
pSync.Run()
|
||||
pSync.Close()
|
||||
@ -38,7 +38,7 @@ func TestPeriodicSync_Run(t *testing.T) {
|
||||
times += 1
|
||||
return nil
|
||||
}
|
||||
pSync := NewPeriodicSync(secs, diffSyncer, l)
|
||||
pSync := NewPeriodicSync(secs, 0, diffSyncer, l)
|
||||
|
||||
pSync.Run()
|
||||
time.Sleep(time.Second * time.Duration(secs))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user