Add preliminary interfaces for syncstatus

This commit is contained in:
mcrakhman 2022-12-17 16:55:15 +01:00
parent 4eba413631
commit 5df4f13938
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
6 changed files with 60 additions and 13 deletions

View File

@ -5,6 +5,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
@ -51,6 +52,7 @@ type diffSyncer struct {
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
deletionState deletionstate.DeletionState
statusService statusservice.StatusService
}
func (d *diffSyncer) Init(deletionState deletionstate.DeletionState) {
@ -91,8 +93,14 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
}
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
cl := d.clientFactory.Client(p)
rdiff := remotediff.NewRemoteDiff(d.spaceId, cl)
var (
cl = d.clientFactory.Client(p)
rdiff = remotediff.NewRemoteDiff(d.spaceId, cl)
stateCounter uint64 = 0
)
if d.statusService != nil {
stateCounter = d.statusService.StateCounter()
}
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
err = rpcerr.Unwrap(err)
if err != nil && err != spacesyncproto.ErrSpaceMissing {
@ -105,6 +113,10 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
// not syncing ids which were removed through settings document
filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds)
if d.statusService != nil {
d.statusService.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
}
ctx = peer.CtxWithPeerId(ctx, p.Id())
d.pingTreesInCache(ctx, filteredIds)

View File

@ -0,0 +1,10 @@
package statusservice
type StatusService interface {
HeadsChange(treeId string, heads []string)
HeadsReceive(senderId, treeId string, heads []string)
Watch(treeId string, ch chan struct{})
Unwatch(treeId string)
StateCounter() uint64
RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64)
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
@ -39,12 +40,13 @@ type SyncTree interface {
type syncTree struct {
tree.ObjectTree
synchandler.SyncHandler
syncClient SyncClient
notifiable HeadNotifiable
listener updatelistener.UpdateListener
treeUsage *atomic.Int32
isClosed bool
isDeleted bool
syncClient SyncClient
statusService statusservice.StatusService
notifiable HeadNotifiable
listener updatelistener.UpdateListener
treeUsage *atomic.Int32
isClosed bool
isDeleted bool
}
var log = logger.NewNamed("commonspace.synctree").Sugar()
@ -241,6 +243,9 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.ID(), res.Heads)
}
if s.statusService != nil {
s.statusService.HeadsChange(s.ID(), res.Heads)
}
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate)
return

View File

@ -3,6 +3,7 @@ package synctree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
@ -13,10 +14,11 @@ import (
)
type syncTreeHandler struct {
objTree tree.ObjectTree
syncClient SyncClient
handlerLock sync.Mutex
queue ReceiveQueue
objTree tree.ObjectTree
syncClient SyncClient
statusService statusservice.StatusService
handlerLock sync.Mutex
queue ReceiveQueue
}
const maxQueueSize = 5
@ -37,6 +39,10 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
return
}
if s.statusService != nil {
s.statusService.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled))
}
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.ReplyId)
if queueFull {
return

View File

@ -35,3 +35,17 @@ func WrapError(err error, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
RootChange: rootChange,
}
}
func GetHeads(msg *TreeSyncMessage) (heads []string) {
content := msg.GetContent()
switch {
case content.GetHeadUpdate() != nil:
return content.GetHeadUpdate().Heads
case content.GetFullSyncRequest() != nil:
return content.GetFullSyncRequest().Heads
case content.GetFullSyncResponse() != nil:
return content.GetFullSyncResponse().Heads
default:
return nil
}
}

View File

@ -48,7 +48,7 @@ func (s *service) registerScripts() {
SpaceId: space,
DocumentId: document,
Text: args[0],
IsSnapshot: rand.Int()%2 == 0,
IsSnapshot: rand.Int()%10 == 0,
})
if err != nil {
mError.Add(err)