Add preliminary interfaces for syncstatus
This commit is contained in:
parent
a510fa21b7
commit
85321db4d7
@ -5,6 +5,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
|
||||
@ -51,6 +52,7 @@ type diffSyncer struct {
|
||||
clientFactory spacesyncproto.ClientFactory
|
||||
log *zap.Logger
|
||||
deletionState deletionstate.DeletionState
|
||||
statusService statusservice.StatusService
|
||||
}
|
||||
|
||||
func (d *diffSyncer) Init(deletionState deletionstate.DeletionState) {
|
||||
@ -91,8 +93,14 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||
cl := d.clientFactory.Client(p)
|
||||
rdiff := remotediff.NewRemoteDiff(d.spaceId, cl)
|
||||
var (
|
||||
cl = d.clientFactory.Client(p)
|
||||
rdiff = remotediff.NewRemoteDiff(d.spaceId, cl)
|
||||
stateCounter uint64 = 0
|
||||
)
|
||||
if d.statusService != nil {
|
||||
stateCounter = d.statusService.StateCounter()
|
||||
}
|
||||
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
|
||||
err = rpcerr.Unwrap(err)
|
||||
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
||||
@ -105,6 +113,10 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
// not syncing ids which were removed through settings document
|
||||
filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds)
|
||||
|
||||
if d.statusService != nil {
|
||||
d.statusService.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
|
||||
}
|
||||
|
||||
ctx = peer.CtxWithPeerId(ctx, p.Id())
|
||||
d.pingTreesInCache(ctx, filteredIds)
|
||||
|
||||
|
||||
10
common/commonspace/statusservice/statusservice.go
Normal file
10
common/commonspace/statusservice/statusservice.go
Normal file
@ -0,0 +1,10 @@
|
||||
package statusservice
|
||||
|
||||
type StatusService interface {
|
||||
HeadsChange(treeId string, heads []string)
|
||||
HeadsReceive(senderId, treeId string, heads []string)
|
||||
Watch(treeId string, ch chan struct{})
|
||||
Unwatch(treeId string)
|
||||
StateCounter() uint64
|
||||
RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64)
|
||||
}
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
|
||||
@ -40,6 +41,7 @@ type syncTree struct {
|
||||
tree.ObjectTree
|
||||
synchandler.SyncHandler
|
||||
syncClient SyncClient
|
||||
statusService statusservice.StatusService
|
||||
notifiable HeadNotifiable
|
||||
listener updatelistener.UpdateListener
|
||||
treeUsage *atomic.Int32
|
||||
@ -241,6 +243,9 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
|
||||
if s.notifiable != nil {
|
||||
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
||||
}
|
||||
if s.statusService != nil {
|
||||
s.statusService.HeadsChange(s.ID(), res.Heads)
|
||||
}
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||
return
|
||||
|
||||
@ -3,6 +3,7 @@ package synctree
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||
@ -15,6 +16,7 @@ import (
|
||||
type syncTreeHandler struct {
|
||||
objTree tree.ObjectTree
|
||||
syncClient SyncClient
|
||||
statusService statusservice.StatusService
|
||||
handlerLock sync.Mutex
|
||||
queue ReceiveQueue
|
||||
}
|
||||
@ -37,6 +39,10 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
|
||||
return
|
||||
}
|
||||
|
||||
if s.statusService != nil {
|
||||
s.statusService.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled))
|
||||
}
|
||||
|
||||
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.ReplyId)
|
||||
if queueFull {
|
||||
return
|
||||
|
||||
@ -35,3 +35,17 @@ func WrapError(err error, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
|
||||
RootChange: rootChange,
|
||||
}
|
||||
}
|
||||
|
||||
func GetHeads(msg *TreeSyncMessage) (heads []string) {
|
||||
content := msg.GetContent()
|
||||
switch {
|
||||
case content.GetHeadUpdate() != nil:
|
||||
return content.GetHeadUpdate().Heads
|
||||
case content.GetFullSyncRequest() != nil:
|
||||
return content.GetFullSyncRequest().Heads
|
||||
case content.GetFullSyncResponse() != nil:
|
||||
return content.GetFullSyncResponse().Heads
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ func (s *service) registerScripts() {
|
||||
SpaceId: space,
|
||||
DocumentId: document,
|
||||
Text: args[0],
|
||||
IsSnapshot: rand.Int()%2 == 0,
|
||||
IsSnapshot: rand.Int()%10 == 0,
|
||||
})
|
||||
if err != nil {
|
||||
mError.Add(err)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user