Separate diff service into components
This commit is contained in:
parent
77aa5a65d9
commit
a775006e2b
@ -6,13 +6,10 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff"
|
||||
"go.uber.org/zap"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type DiffService interface {
|
||||
@ -26,11 +23,9 @@ type DiffService interface {
|
||||
|
||||
type diffService struct {
|
||||
spaceId string
|
||||
periodicSync *periodicSync
|
||||
periodicSync PeriodicSync
|
||||
storage storage.SpaceStorage
|
||||
nconf nodeconf.Configuration
|
||||
diff ldiff.Diff
|
||||
cache cache.TreeCache
|
||||
log *zap.Logger
|
||||
|
||||
syncPeriod int
|
||||
@ -40,23 +35,27 @@ func NewDiffService(
|
||||
spaceId string,
|
||||
syncPeriod int,
|
||||
storage storage.SpaceStorage,
|
||||
nconf nodeconf.Configuration,
|
||||
conf nodeconf.Configuration,
|
||||
cache cache.TreeCache,
|
||||
log *zap.Logger) DiffService {
|
||||
diff := ldiff.New(16, 16)
|
||||
l := log.With(zap.String("spaceId", spaceId))
|
||||
syncer := newDiffSyncer(spaceId, diff, conf, cache, storage, l)
|
||||
periodicSync := newPeriodicSync(syncPeriod, syncer, l)
|
||||
|
||||
return &diffService{
|
||||
spaceId: spaceId,
|
||||
storage: storage,
|
||||
nconf: nconf,
|
||||
cache: cache,
|
||||
log: log,
|
||||
syncPeriod: syncPeriod,
|
||||
spaceId: spaceId,
|
||||
storage: storage,
|
||||
periodicSync: periodicSync,
|
||||
diff: diff,
|
||||
log: log,
|
||||
syncPeriod: syncPeriod,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *diffService) Init(objectIds []string) {
|
||||
d.periodicSync = newPeriodicSync(d.syncPeriod, d.sync, d.log.With(zap.String("spaceId", d.spaceId)))
|
||||
d.diff = ldiff.New(16, 16)
|
||||
d.fillDiff(objectIds)
|
||||
d.periodicSync.Run()
|
||||
}
|
||||
|
||||
func (d *diffService) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
|
||||
@ -80,49 +79,6 @@ func (d *diffService) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *diffService) sync(ctx context.Context) error {
|
||||
st := time.Now()
|
||||
// diffing with responsible peers according to configuration
|
||||
peers, err := d.nconf.ResponsiblePeers(ctx, d.spaceId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, p := range peers {
|
||||
if err := d.syncWithPeer(ctx, p); err != nil {
|
||||
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *diffService) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||
cl := spacesyncproto.NewDRPCSpaceClient(p)
|
||||
rdiff := remotediff.NewRemoteDiff(d.spaceId, cl)
|
||||
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
|
||||
err = rpcerr.Unwrap(err)
|
||||
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
||||
return err
|
||||
}
|
||||
if err == spacesyncproto.ErrSpaceMissing {
|
||||
return d.sendPushSpaceRequest(ctx, cl)
|
||||
}
|
||||
|
||||
d.pingTreesInCache(ctx, newIds)
|
||||
d.pingTreesInCache(ctx, changedIds)
|
||||
|
||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||
zap.Int("changedIds", len(changedIds)),
|
||||
zap.Int("removedIds", len(removedIds)))
|
||||
return
|
||||
}
|
||||
|
||||
func (d *diffService) pingTreesInCache(ctx context.Context, trees []string) {
|
||||
for _, tId := range trees {
|
||||
_, _ = d.cache.GetTree(ctx, d.spaceId, tId)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *diffService) fillDiff(objectIds []string) {
|
||||
var els = make([]ldiff.Element, 0, len(objectIds))
|
||||
for _, id := range objectIds {
|
||||
@ -142,30 +98,6 @@ func (d *diffService) fillDiff(objectIds []string) {
|
||||
d.diff.Set(els...)
|
||||
}
|
||||
|
||||
func (d *diffService) sendPushSpaceRequest(ctx context.Context, cl spacesyncproto.DRPCSpaceClient) (err error) {
|
||||
aclStorage, err := d.storage.ACLStorage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
root, err := aclStorage.Root()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
header, err := d.storage.SpaceHeader()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = cl.PushSpace(ctx, &spacesyncproto.PushSpaceRequest{
|
||||
SpaceId: d.spaceId,
|
||||
SpaceHeader: header,
|
||||
AclRoot: root,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func concatStrings(strs []string) string {
|
||||
var (
|
||||
b strings.Builder
|
||||
|
||||
112
common/commonspace/diffservice/diffsyncer.go
Normal file
112
common/commonspace/diffservice/diffsyncer.go
Normal file
@ -0,0 +1,112 @@
|
||||
package diffservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type DiffSyncer interface {
|
||||
Sync(ctx context.Context) error
|
||||
}
|
||||
|
||||
func newDiffSyncer(
|
||||
spaceId string,
|
||||
diff ldiff.Diff,
|
||||
nconf nodeconf.Configuration,
|
||||
cache cache.TreeCache,
|
||||
storage storage.SpaceStorage,
|
||||
log *zap.Logger) DiffSyncer {
|
||||
return &diffSyncer{
|
||||
diff: diff,
|
||||
nconf: nconf,
|
||||
spaceId: spaceId,
|
||||
cache: cache,
|
||||
storage: storage,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
type diffSyncer struct {
|
||||
diff ldiff.Diff
|
||||
nconf nodeconf.Configuration
|
||||
spaceId string
|
||||
cache cache.TreeCache
|
||||
storage storage.SpaceStorage
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
func (d *diffSyncer) Sync(ctx context.Context) error {
|
||||
st := time.Now()
|
||||
// diffing with responsible peers according to configuration
|
||||
peers, err := d.nconf.ResponsiblePeers(ctx, d.spaceId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, p := range peers {
|
||||
if err := d.syncWithPeer(ctx, p); err != nil {
|
||||
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||
cl := spacesyncproto.NewDRPCSpaceClient(p)
|
||||
rdiff := remotediff.NewRemoteDiff(d.spaceId, cl)
|
||||
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
|
||||
err = rpcerr.Unwrap(err)
|
||||
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
||||
return err
|
||||
}
|
||||
if err == spacesyncproto.ErrSpaceMissing {
|
||||
return d.sendPushSpaceRequest(ctx, cl)
|
||||
}
|
||||
|
||||
d.pingTreesInCache(ctx, newIds)
|
||||
d.pingTreesInCache(ctx, changedIds)
|
||||
|
||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||
zap.Int("changedIds", len(changedIds)),
|
||||
zap.Int("removedIds", len(removedIds)))
|
||||
return
|
||||
}
|
||||
|
||||
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
||||
for _, tId := range trees {
|
||||
_, _ = d.cache.GetTree(ctx, d.spaceId, tId)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, cl spacesyncproto.DRPCSpaceClient) (err error) {
|
||||
aclStorage, err := d.storage.ACLStorage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
root, err := aclStorage.Root()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
header, err := d.storage.SpaceHeader()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = cl.PushSpace(ctx, &spacesyncproto.PushSpaceRequest{
|
||||
SpaceId: d.spaceId,
|
||||
SpaceHeader: header,
|
||||
AclRoot: root,
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -6,25 +6,34 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func newPeriodicSync(periodSeconds int, sync func(ctx context.Context) error, l *zap.Logger) *periodicSync {
|
||||
type PeriodicSync interface {
|
||||
Run()
|
||||
Close()
|
||||
}
|
||||
|
||||
func newPeriodicSync(periodSeconds int, syncer DiffSyncer, l *zap.Logger) *periodicSync {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ps := &periodicSync{
|
||||
log: l,
|
||||
sync: sync,
|
||||
syncCtx: ctx,
|
||||
syncCancel: cancel,
|
||||
syncLoopDone: make(chan struct{}),
|
||||
return &periodicSync{
|
||||
syncer: syncer,
|
||||
log: l,
|
||||
syncCtx: ctx,
|
||||
syncCancel: cancel,
|
||||
syncLoopDone: make(chan struct{}),
|
||||
periodSeconds: periodSeconds,
|
||||
}
|
||||
go ps.syncLoop(periodSeconds)
|
||||
return ps
|
||||
}
|
||||
|
||||
type periodicSync struct {
|
||||
log *zap.Logger
|
||||
sync func(ctx context.Context) error
|
||||
syncCtx context.Context
|
||||
syncCancel context.CancelFunc
|
||||
syncLoopDone chan struct{}
|
||||
log *zap.Logger
|
||||
syncer DiffSyncer
|
||||
syncCtx context.Context
|
||||
syncCancel context.CancelFunc
|
||||
syncLoopDone chan struct{}
|
||||
periodSeconds int
|
||||
}
|
||||
|
||||
func (p *periodicSync) Run() {
|
||||
go p.syncLoop(p.periodSeconds)
|
||||
}
|
||||
|
||||
func (p *periodicSync) syncLoop(periodSeconds int) {
|
||||
@ -33,7 +42,7 @@ func (p *periodicSync) syncLoop(periodSeconds int) {
|
||||
doSync := func() {
|
||||
ctx, cancel := context.WithTimeout(p.syncCtx, time.Minute)
|
||||
defer cancel()
|
||||
if err := p.sync(ctx); err != nil {
|
||||
if err := p.syncer.Sync(ctx); err != nil {
|
||||
p.log.Warn("periodic sync error", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,6 +103,8 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
|
||||
cl := spacesyncproto.NewDRPCSpaceClient(peer)
|
||||
stream, err := cl.Stream(ctx)
|
||||
if err != nil {
|
||||
err = rpcerr.Unwrap(err)
|
||||
log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err)
|
||||
// so here probably the request is failed because there is no such space,
|
||||
// but diffService should handle such cases by sending pushSpace
|
||||
continue
|
||||
@ -111,7 +113,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
|
||||
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
|
||||
if err != nil {
|
||||
err = rpcerr.Unwrap(err)
|
||||
log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err)
|
||||
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err)
|
||||
continue
|
||||
}
|
||||
s.streamPool.AddAndReadStreamAsync(stream)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user