Update sync handler

This commit is contained in:
mcrakhman 2022-10-02 11:54:34 +02:00
parent edc9a14ef5
commit 20234269e9
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
3 changed files with 80 additions and 58 deletions

View File

@ -0,0 +1,59 @@
package syncservice
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
type RequestFactory interface {
FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error)
FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error)
}
func newRequestFactory() RequestFactory {
return &requestFactory{}
}
type requestFactory struct{}
func (r *requestFactory) FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
req := &spacesyncproto.ObjectFullSyncRequest{}
if t == nil {
msg = spacesyncproto.WrapFullRequest(req, t.Header(), t.ID(), trackingId)
return
}
req.Heads = t.Heads()
req.SnapshotPath = t.SnapshotPath()
var changesAfterSnapshot []*treechangeproto.RawTreeChangeWithId
changesAfterSnapshot, err = t.ChangesAfterCommonSnapshot(theirSnapshotPath, theirHeads)
if err != nil {
return
}
req.Changes = changesAfterSnapshot
msg = spacesyncproto.WrapFullRequest(req, t.Header(), t.ID(), trackingId)
return
}
func (r *requestFactory) FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
resp := &spacesyncproto.ObjectFullSyncResponse{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}
if slice.UnsortedEquals(theirHeads, t.Heads()) {
msg = spacesyncproto.WrapFullResponse(resp, t.Header(), t.ID(), trackingId)
return
}
ourChanges, err := t.ChangesAfterCommonSnapshot(theirSnapshotPath, theirHeads)
if err != nil {
return
}
resp.Changes = ourChanges
msg = spacesyncproto.WrapFullResponse(resp, t.Header(), t.ID(), trackingId)
return
}

View File

@ -6,7 +6,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
) )
@ -14,17 +13,19 @@ type syncHandler struct {
spaceId string spaceId string
treeCache cache.TreeCache treeCache cache.TreeCache
syncClient SyncClient syncClient SyncClient
factory RequestFactory
} }
type SyncHandler interface { type SyncHandler interface {
HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
} }
func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient) *syncHandler { func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient, factory RequestFactory) *syncHandler {
return &syncHandler{ return &syncHandler{
spaceId: spaceId, spaceId: spaceId,
treeCache: treeCache, treeCache: treeCache,
syncClient: syncClient, syncClient: syncClient,
factory: factory,
} }
} }
@ -47,10 +48,7 @@ func (s *syncHandler) handleHeadUpdate(
update *spacesyncproto.ObjectHeadUpdate, update *spacesyncproto.ObjectHeadUpdate,
msg *spacesyncproto.ObjectSyncMessage) (err error) { msg *spacesyncproto.ObjectSyncMessage) (err error) {
var ( var fullRequest *spacesyncproto.ObjectSyncMessage
fullRequest *spacesyncproto.ObjectFullSyncRequest
result tree.AddResult
)
res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId) res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId)
if err != nil { if err != nil {
return return
@ -62,21 +60,20 @@ func (s *syncHandler) handleHeadUpdate(
defer res.Release() defer res.Release()
defer objTree.Unlock() defer objTree.Unlock()
if slice.UnsortedEquals(update.Heads, objTree.Heads()) { if s.alreadyHaveHeads(objTree, update.Heads) {
return nil return nil
} }
result, err = objTree.AddRawChanges(ctx, update.Changes...) _, err = objTree.AddRawChanges(ctx, update.Changes...)
if err != nil { if err != nil {
return err return err
} }
// if after the heads are equal, or we have them locally if s.alreadyHaveHeads(objTree, update.Heads) {
if slice.UnsortedEquals(update.Heads, result.Heads) || objTree.HasChanges(update.Heads...) {
return nil return nil
} }
fullRequest, err = s.prepareFullSyncRequest(objTree, update) fullRequest, err = s.factory.FullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
if err != nil { if err != nil {
return err return err
} }
@ -84,8 +81,7 @@ func (s *syncHandler) handleHeadUpdate(
}() }()
if fullRequest != nil { if fullRequest != nil {
return s.syncClient.SendAsync(senderId, return s.syncClient.SendAsync(senderId, fullRequest)
spacesyncproto.WrapFullRequest(fullRequest, msg.RootChange, msg.TreeId, msg.TrackingId))
} }
return return
} }
@ -96,7 +92,7 @@ func (s *syncHandler) handleFullSyncRequest(
request *spacesyncproto.ObjectFullSyncRequest, request *spacesyncproto.ObjectFullSyncRequest,
msg *spacesyncproto.ObjectSyncMessage) (err error) { msg *spacesyncproto.ObjectSyncMessage) (err error) {
var ( var (
fullResponse *spacesyncproto.ObjectFullSyncResponse fullResponse *spacesyncproto.ObjectSyncMessage
header = msg.RootChange header = msg.RootChange
) )
defer func() { defer func() {
@ -120,20 +116,21 @@ func (s *syncHandler) handleFullSyncRequest(
header = objTree.Header() header = objTree.Header()
} }
if !s.alreadyHaveHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, request.Changes...) _, err = objTree.AddRawChanges(ctx, request.Changes...)
if err != nil { if err != nil {
return err return err
} }
}
fullResponse, err = s.prepareFullSyncResponse(request.SnapshotPath, request.Heads, objTree) fullResponse, err = s.factory.FullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId)
return err return err
}() }()
if err != nil { if err != nil {
return return
} }
return s.syncClient.SendAsync(senderId, return s.syncClient.SendAsync(senderId, fullResponse)
spacesyncproto.WrapFullResponse(fullResponse, header, msg.TreeId, msg.TrackingId))
} }
func (s *syncHandler) handleFullSyncResponse( func (s *syncHandler) handleFullSyncResponse(
@ -152,8 +149,7 @@ func (s *syncHandler) handleFullSyncResponse(
defer res.Release() defer res.Release()
defer objTree.Unlock() defer objTree.Unlock()
// if we already have the heads for whatever reason if s.alreadyHaveHeads(objTree, response.Heads) {
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
return nil return nil
} }
@ -164,39 +160,6 @@ func (s *syncHandler) handleFullSyncResponse(
return return
} }
func (s *syncHandler) prepareFullSyncRequest( func (s *syncHandler) alreadyHaveHeads(t tree.ObjectTree, heads []string) bool {
t tree.ObjectTree, return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...)
update *spacesyncproto.ObjectHeadUpdate) (req *spacesyncproto.ObjectFullSyncRequest, err error) {
req = &spacesyncproto.ObjectFullSyncRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}
if len(update.Changes) != 0 {
var changesAfterSnapshot []*treechangeproto.RawTreeChangeWithId
changesAfterSnapshot, err = t.ChangesAfterCommonSnapshot(update.SnapshotPath, update.Heads)
if err != nil {
return
}
req.Changes = changesAfterSnapshot
}
return &spacesyncproto.ObjectFullSyncRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (s *syncHandler) prepareFullSyncResponse(
theirPath,
theirHeads []string,
t tree.ObjectTree) (*spacesyncproto.ObjectFullSyncResponse, error) {
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads)
if err != nil {
return nil, err
}
return &spacesyncproto.ObjectFullSyncResponse{
Heads: t.Heads(),
Changes: ourChanges,
SnapshotPath: t.SnapshotPath(),
}, nil
} }

View File

@ -50,7 +50,7 @@ func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.T
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncHandler.HandleMessage(ctx, senderId, message) return syncHandler.HandleMessage(ctx, senderId, message)
}) })
syncHandler = newSyncHandler(spaceId, cache, streamPool) syncHandler = newSyncHandler(spaceId, cache, streamPool, newRequestFactory())
return newSyncService( return newSyncService(
spaceId, spaceId,
headNotifiable, headNotifiable,