Change proto and generalise logic

This commit is contained in:
mcrakhman 2022-10-22 16:05:06 +02:00 committed by Mikhail Iudin
parent 69211dfe66
commit 6518a0f971
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
14 changed files with 2159 additions and 1958 deletions

View File

@ -6,7 +6,6 @@ import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
@ -15,7 +14,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
aclstorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
@ -167,51 +165,6 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
err = ErrSpaceClosed
return
}
getTreeRemote := func() (*spacesyncproto.ObjectSyncMessage, error) {
// TODO: add empty context handling (when this is not happening due to head update)
peerId, err := syncservice.GetPeerIdFromStreamContext(ctx)
if err != nil {
return nil, err
}
return s.syncService.StreamPool().SendSync(
peerId,
synctree.GetRequestFactory().CreateNewTreeRequest(id),
)
}
store, err := s.storage.TreeStorage(id)
if err != nil && err != aclstorage.ErrUnknownTreeId {
return
}
isFirstBuild := false
if err == aclstorage.ErrUnknownTreeId {
isFirstBuild = true
var resp *spacesyncproto.ObjectSyncMessage
resp, err = getTreeRemote()
if err != nil {
return
}
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := aclstorage.TreeStorageCreatePayload{
TreeId: resp.TreeId,
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
}
// basically building tree with inmemory storage and validating that it was without errors
err = tree.ValidateRawTree(payload, s.aclList)
if err != nil {
return
}
// now we are sure that we can save it to the storage
store, err = s.storage.CreateTreeStorage(payload)
if err != nil {
return
}
}
deps := synctree.BuildDeps{
SpaceId: s.id,
StreamPool: s.syncService.StreamPool(),
@ -219,9 +172,9 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
Storage: store,
SpaceStorage: s.storage,
}
return synctree.BuildSyncTree(ctx, isFirstBuild, deps)
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
}
func (s *space) Close() error {

View File

@ -2,7 +2,6 @@ syntax = "proto3";
package anySpace;
option go_package = "commonspace/spacesyncproto";
import "pkg/acl/treechangeproto/protos/treechange.proto";
import "pkg/acl/aclrecordproto/protos/aclrecord.proto";
enum ErrCodes {
@ -55,51 +54,13 @@ message HeadSyncResponse {
// ObjectSyncMessage is a message sent on object sync
message ObjectSyncMessage {
string spaceId = 1;
ObjectSyncContentValue content = 2;
treechange.RawTreeChangeWithId rootChange = 3;
string treeId = 4;
string trackingId = 5;
string replyId = 2;
bytes payload = 3;
string objectId = 4;
// string identity = 5;
// string peerSignature = 6;
}
// ObjectSyncContentValue provides different types for object sync
message ObjectSyncContentValue {
oneof value {
ObjectHeadUpdate headUpdate = 1;
ObjectFullSyncRequest fullSyncRequest = 2;
ObjectFullSyncResponse fullSyncResponse = 3;
ObjectErrorResponse errorResponse = 4;
}
}
// ObjectHeadUpdate is a message sent on document head update
message ObjectHeadUpdate {
repeated string heads = 1;
repeated treechange.RawTreeChangeWithId changes = 2;
repeated string snapshotPath = 3;
}
// ObjectHeadUpdate is a message sent when document needs full sync
message ObjectFullSyncRequest {
repeated string heads = 1;
repeated treechange.RawTreeChangeWithId changes = 2;
repeated string snapshotPath = 3;
}
// ObjectFullSyncResponse is a message sent as a response for a specific full sync
message ObjectFullSyncResponse {
repeated string heads = 1;
repeated treechange.RawTreeChangeWithId changes = 2;
repeated string snapshotPath = 3;
}
// ObjectErrorResponse is an error sent as a response for a full sync request
message ObjectErrorResponse {
string error = 1;
}
// PushSpaceRequest is a request to add space on a node containing only one acl record
message PushSpaceRequest {
RawSpaceHeaderWithId spaceHeader = 2;

View File

@ -2,8 +2,6 @@
package spacesyncproto
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"storj.io/drpc"
)
@ -18,63 +16,3 @@ func (c ClientFactoryFunc) Client(cc drpc.Conn) DRPCSpaceClient {
type ClientFactory interface {
Client(cc drpc.Conn) DRPCSpaceClient
}
func WrapHeadUpdate(update *ObjectHeadUpdate, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_HeadUpdate{HeadUpdate: update},
},
RootChange: rootChange,
TreeId: treeId,
TrackingId: trackingId,
}
}
func WrapFullRequest(request *ObjectFullSyncRequest, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncRequest{FullSyncRequest: request},
},
RootChange: rootChange,
TreeId: treeId,
TrackingId: trackingId,
}
}
func WrapFullResponse(response *ObjectFullSyncResponse, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncResponse{FullSyncResponse: response},
},
RootChange: rootChange,
TreeId: treeId,
TrackingId: trackingId,
}
}
func WrapError(err error, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_ErrorResponse{ErrorResponse: &ObjectErrorResponse{Error: err.Error()}},
},
RootChange: rootChange,
TreeId: treeId,
TrackingId: trackingId,
}
}
func MessageDescription(msg *ObjectSyncMessage) (res string) {
content := msg.GetContent()
switch {
case content.GetHeadUpdate() != nil:
res = fmt.Sprintf("head update/%v", content.GetHeadUpdate().Heads)
case content.GetFullSyncRequest() != nil:
res = fmt.Sprintf("fullsync request/%v", content.GetFullSyncRequest().Heads)
case content.GetFullSyncResponse() != nil:
res = fmt.Sprintf("fullsync response/%v", content.GetFullSyncResponse().Heads)
case content.GetErrorResponse() != nil:
res = fmt.Sprintf("error response/%v", content.GetErrorResponse().Error)
}
res = fmt.Sprintf("%s/tracking=[%s]", res, msg.TrackingId)
return res
}

File diff suppressed because it is too large Load Diff

View File

@ -78,13 +78,13 @@ func (s *streamPool) SendSync(
peerId string,
msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
newCounter := s.counter.Add(1)
msg.TrackingId = genStreamPoolKey(peerId, msg.TreeId, newCounter)
msg.ReplyId = genStreamPoolKey(peerId, msg.ObjectId, newCounter)
s.waitersMx.Lock()
waiter := responseWaiter{
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
}
s.waiters[msg.TrackingId] = waiter
s.waiters[msg.ReplyId] = waiter
s.waitersMx.Unlock()
err = s.SendAsync([]string{peerId}, msg)
@ -95,10 +95,10 @@ func (s *streamPool) SendSync(
select {
case <-delay.C:
s.waitersMx.Lock()
delete(s.waiters, msg.TrackingId)
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
log.With("trackingId", msg.TrackingId).Error("time elapsed when waiting")
log.With("trackingId", msg.ReplyId).Error("time elapsed when waiting")
err = ErrSyncTimeout
case reply = <-waiter.ch:
if !delay.Stop() {
@ -125,8 +125,7 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn
streams := getStreams()
s.Unlock()
log.With("description", spacesyncproto.MessageDescription(message)).
With("treeId", message.TreeId).
log.With("objectId", message.ObjectId).
Debugf("sending message to %d peers", len(streams))
for _, s := range streams {
err = s.Send(message)
@ -174,8 +173,7 @@ Loop:
func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
streams := s.getAllStreams()
log.With("description", spacesyncproto.MessageDescription(message)).
With("treeId", message.TreeId).
log.With("objectId", message.ObjectId).
Debugf("broadcasting message to %d peers", len(streams))
for _, stream := range streams {
if err = stream.Send(message); err != nil {
@ -224,23 +222,23 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
process := func(msg *spacesyncproto.ObjectSyncMessage) {
s.lastUsage.Store(time.Now().Unix())
if msg.TrackingId == "" {
if msg.ReplyId == "" {
s.messageHandler(stream.Context(), peerId, msg)
return
}
log.With("trackingId", msg.TrackingId).Debug("getting message with tracking id")
log.With("trackingId", msg.ReplyId).Debug("getting message with tracking id")
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.TrackingId]
waiter, exists := s.waiters[msg.ReplyId]
if !exists {
log.With("trackingId", msg.TrackingId).Debug("tracking id not exists")
log.With("trackingId", msg.ReplyId).Debug("tracking id not exists")
s.waitersMx.Unlock()
s.messageHandler(stream.Context(), peerId, msg)
return
}
log.With("trackingId", msg.TrackingId).Debug("tracking id exists")
log.With("trackingId", msg.ReplyId).Debug("tracking id exists")
delete(s.waiters, msg.TrackingId)
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
}

View File

@ -84,7 +84,7 @@ func (s *syncService) LastUsage() time.Time {
}
func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
obj, err := s.objectGetter.GetObject(ctx, message.TreeId)
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
if err != nil {
return
}

View File

@ -2,17 +2,16 @@ package synctree
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
)
type RequestFactory interface {
CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage)
CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage)
CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error)
CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error)
CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *treechangeproto.TreeSyncMessage)
CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMessage)
CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string) (req *treechangeproto.TreeSyncMessage, err error)
CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string) (*treechangeproto.TreeSyncMessage, error)
}
var factory = &requestFactory{}
@ -23,20 +22,20 @@ func GetRequestFactory() RequestFactory {
type requestFactory struct{}
func (r *requestFactory) CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage) {
return spacesyncproto.WrapHeadUpdate(&spacesyncproto.ObjectHeadUpdate{
func (r *requestFactory) CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *treechangeproto.TreeSyncMessage) {
return treechangeproto.WrapHeadUpdate(&treechangeproto.TreeHeadUpdate{
Heads: t.Heads(),
Changes: added,
SnapshotPath: t.SnapshotPath(),
}, t.Header(), t.ID(), "")
}, t.Header())
}
func (r *requestFactory) CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage) {
return spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, "")
func (r *requestFactory) CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMessage) {
return treechangeproto.WrapFullRequest(&treechangeproto.TreeFullSyncRequest{}, nil)
}
func (r *requestFactory) CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
req := &spacesyncproto.ObjectFullSyncRequest{}
func (r *requestFactory) CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string) (msg *treechangeproto.TreeSyncMessage, err error) {
req := &treechangeproto.TreeFullSyncRequest{}
if t == nil {
return nil, fmt.Errorf("tree should not be empty")
}
@ -51,17 +50,17 @@ func (r *requestFactory) CreateFullSyncRequest(t tree.ObjectTree, theirHeads, th
}
req.Changes = changesAfterSnapshot
msg = spacesyncproto.WrapFullRequest(req, t.Header(), t.ID(), trackingId)
msg = treechangeproto.WrapFullRequest(req, t.Header())
return
}
func (r *requestFactory) CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
resp := &spacesyncproto.ObjectFullSyncResponse{
func (r *requestFactory) CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string) (msg *treechangeproto.TreeSyncMessage, err error) {
resp := &treechangeproto.TreeFullSyncResponse{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}
if slice.UnsortedEquals(theirHeads, t.Heads()) {
msg = spacesyncproto.WrapFullResponse(resp, t.Header(), t.ID(), trackingId)
msg = treechangeproto.WrapFullResponse(resp, t.Header())
return
}
@ -70,6 +69,6 @@ func (r *requestFactory) CreateFullSyncResponse(t tree.ObjectTree, theirHeads, t
return
}
resp.Changes = ourChanges
msg = spacesyncproto.WrapFullResponse(resp, t.Header(), t.ID(), trackingId)
msg = treechangeproto.WrapFullResponse(resp, t.Header())
return
}

View File

@ -5,15 +5,17 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"time"
)
type SyncClient interface {
syncservice.StreamPool
RequestFactory
ocache.ObjectLastUsage
BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error)
BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error)
SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error)
}
type syncClient struct {
@ -43,21 +45,51 @@ func (s *syncClient) LastUsage() time.Time {
return s.StreamPool.LastUsage()
}
func (s *syncClient) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) {
s.notifyIfNeeded(message)
return s.StreamPool.BroadcastAsync(message)
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
if err != nil {
return
}
return s.StreamPool.BroadcastAsync(objMsg)
}
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error) {
func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) {
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, replyId)
if err != nil {
return
}
return s.StreamPool.SendAsync([]string{peerId}, objMsg)
}
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) {
s.notifyIfNeeded(message)
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
if err != nil {
return
}
if s.configuration.IsResponsible(s.spaceId) {
return s.SendAsync(s.configuration.NodeIds(s.spaceId), message)
return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg)
}
return s.BroadcastAsync(message)
}
func (s *syncClient) notifyIfNeeded(message *spacesyncproto.ObjectSyncMessage) {
func (s *syncClient) notifyIfNeeded(message *treechangeproto.TreeSyncMessage) {
if message.GetContent().GetHeadUpdate() != nil {
update := message.GetContent().GetHeadUpdate()
s.notifiable.UpdateHeads(message.TreeId, update.Heads)
s.notifiable.UpdateHeads(message.RootChange.Id, update.Heads)
}
}
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
payload, err := message.Marshal()
if err != nil {
return
}
objMsg = &spacesyncproto.ObjectSyncMessage{
ReplyId: replyId,
Payload: payload,
ObjectId: id,
}
return
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
@ -13,6 +14,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/gogo/protobuf/proto"
)
var ErrSyncTreeClosed = errors.New("sync tree is closed")
@ -50,7 +52,8 @@ type BuildDeps struct {
HeadNotifiable diffservice.HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.ACLList
Storage storage.TreeStorage
SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage
}
func DeriveSyncTree(
@ -107,12 +110,72 @@ func CreateSyncTree(
return
}
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t tree.ObjectTree, err error) {
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
// TODO: add empty context handling (when this is not happening due to head update)
peerId, err := syncservice.GetPeerIdFromStreamContext(ctx)
if err != nil {
return
}
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
objMsg, err := marshallTreeMessage(newTreeRequest, id, "")
if err != nil {
return
}
resp, err := deps.StreamPool.SendSync(peerId, objMsg)
if resp != nil {
return
}
msg = &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(resp.Payload, msg)
return
}
store, err := deps.SpaceStorage.TreeStorage(id)
if err != nil && err != storage.ErrUnknownTreeId {
return
}
isFirstBuild := false
if err == storage.ErrUnknownTreeId {
isFirstBuild = true
var resp *treechangeproto.TreeSyncMessage
resp, err = getTreeRemote()
if err != nil {
return
}
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := storage.TreeStorageCreatePayload{
TreeId: id,
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
}
// basically building tree with inmemory storage and validating that it was without errors
err = tree.ValidateRawTree(payload, deps.AclList)
if err != nil {
return
}
// now we are sure that we can save it to the storage
store, err = deps.SpaceStorage.CreateTreeStorage(payload)
if err != nil {
return
}
}
deps.TreeStorage = store
return BuildSyncTree(ctx, isFirstBuild, deps)
}
func BuildSyncTree(
ctx context.Context,
isFirstBuild bool,
deps BuildDeps) (t tree.ObjectTree, err error) {
t, err = buildObjectTree(deps.Storage, deps.AclList)
t, err = buildObjectTree(deps.TreeStorage, deps.AclList)
if err != nil {
return
}

View File

@ -5,7 +5,9 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
"github.com/gogo/protobuf/proto"
)
type syncTreeHandler struct {
@ -20,15 +22,21 @@ func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchand
}
}
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) error {
content := msg.GetContent()
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, unmarshalled)
if err != nil {
return
}
content := unmarshalled.GetContent()
switch {
case content.GetFullSyncRequest() != nil:
return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), msg)
case content.GetFullSyncResponse() != nil:
return s.handleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse(), msg)
case content.GetHeadUpdate() != nil:
return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), msg)
return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), msg.ReplyId)
case content.GetFullSyncRequest() != nil:
return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), msg.ReplyId)
case content.GetFullSyncResponse() != nil:
return s.handleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
}
return nil
}
@ -36,14 +44,14 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
func (s *syncTreeHandler) handleHeadUpdate(
ctx context.Context,
senderId string,
update *spacesyncproto.ObjectHeadUpdate,
msg *spacesyncproto.ObjectSyncMessage) (err error) {
update *treechangeproto.TreeHeadUpdate,
replyId string) (err error) {
log.With("senderId", senderId).
With("heads", update.Heads).
With("treeId", msg.TreeId).
With("treeId", s.objTree.ID()).
Debug("received head update message")
var (
fullRequest *spacesyncproto.ObjectSyncMessage
fullRequest *treechangeproto.TreeSyncMessage
isEmptyUpdate = len(update.Changes) == 0
objTree = s.objTree
)
@ -54,12 +62,12 @@ func (s *syncTreeHandler) handleHeadUpdate(
// isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate {
log.With("treeId", msg.TreeId).Debug("is empty update")
log.With("treeId", objTree.ID()).Debug("is empty update")
if slice.UnsortedEquals(objTree.Heads(), update.Heads) {
return nil
}
// we need to sync in any case
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
return err
}
@ -76,20 +84,20 @@ func (s *syncTreeHandler) handleHeadUpdate(
return nil
}
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
return err
}()
if fullRequest != nil {
log.With("senderId", senderId).
With("heads", fullRequest.GetContent().GetFullSyncRequest().Heads).
With("treeId", msg.TreeId).
With("treeId", objTree.ID()).
Debug("sending full sync request")
return s.syncClient.SendAsync([]string{senderId}, fullRequest)
return s.syncClient.SendAsync(senderId, fullRequest, replyId)
}
log.With("senderId", senderId).
With("heads", update.Heads).
With("treeId", msg.TreeId).
With("treeId", objTree.ID()).
Debug("head update finished correctly")
return
}
@ -97,21 +105,21 @@ func (s *syncTreeHandler) handleHeadUpdate(
func (s *syncTreeHandler) handleFullSyncRequest(
ctx context.Context,
senderId string,
request *spacesyncproto.ObjectFullSyncRequest,
msg *spacesyncproto.ObjectSyncMessage) (err error) {
request *treechangeproto.TreeFullSyncRequest,
replyId string) (err error) {
log.With("senderId", senderId).
With("heads", request.Heads).
With("treeId", msg.TreeId).
With("trackingId", msg.TrackingId).
With("treeId", s.objTree.ID()).
With("trackingId", replyId).
Debug("received full sync request message")
var (
fullResponse *spacesyncproto.ObjectSyncMessage
header = msg.RootChange
fullResponse *treechangeproto.TreeSyncMessage
header = s.objTree.Header()
objTree = s.objTree
)
defer func() {
if err != nil {
s.syncClient.SendAsync([]string{senderId}, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId))
s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId)
}
}()
@ -130,30 +138,29 @@ func (s *syncTreeHandler) handleFullSyncRequest(
}
}
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId)
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath)
return err
}()
if err != nil {
return
}
return s.syncClient.SendAsync([]string{senderId}, fullResponse)
return s.syncClient.SendAsync(senderId, fullResponse, replyId)
}
func (s *syncTreeHandler) handleFullSyncResponse(
ctx context.Context,
senderId string,
response *spacesyncproto.ObjectFullSyncResponse,
msg *spacesyncproto.ObjectSyncMessage) (err error) {
response *treechangeproto.TreeFullSyncResponse) (err error) {
log.With("senderId", senderId).
With("heads", response.Heads).
With("treeId", msg.TreeId).
With("treeId", s.objTree.ID()).
Debug("received full sync response message")
objTree := s.objTree
if err != nil {
log.With("senderId", senderId).
With("heads", response.Heads).
With("treeId", msg.TreeId).
With("treeId", s.objTree.ID()).
Debug("failed to find the tree in full sync response")
return
}
@ -170,7 +177,7 @@ func (s *syncTreeHandler) handleFullSyncResponse(
}()
log.With("error", err != nil).
With("heads", response.Heads).
With("treeId", msg.TreeId).
With("treeId", s.objTree.ID()).
Debug("finished full sync response")
return

View File

@ -271,7 +271,7 @@ func (mr *MockObjectTreeMockRecorder) SnapshotPath() *gomock.Call {
// Storage mocks base method.
func (m *MockObjectTree) Storage() storage.TreeStorage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Storage")
ret := m.ctrl.Call(m, "SpaceStorage")
ret0, _ := ret[0].(storage.TreeStorage)
return ret0
}
@ -279,7 +279,7 @@ func (m *MockObjectTree) Storage() storage.TreeStorage {
// Storage indicates an expected call of Storage.
func (mr *MockObjectTreeMockRecorder) Storage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockObjectTree)(nil).Storage))
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SpaceStorage", reflect.TypeOf((*MockObjectTree)(nil).Storage))
}
// Unlock mocks base method.

View File

@ -53,3 +53,44 @@ message RawTreeChangeWithId {
// Id is a cid made from rawChange payload
string id = 2;
}
message TreeSyncMessage {
TreeSyncContentValue content = 1;
RawTreeChangeWithId rootChange = 2;
}
// TreeSyncContentValue provides different types for tree sync
message TreeSyncContentValue {
oneof value {
TreeHeadUpdate headUpdate = 1;
TreeFullSyncRequest fullSyncRequest = 2;
TreeFullSyncResponse fullSyncResponse = 3;
TreeErrorResponse errorResponse = 4;
}
}
// TreeHeadUpdate is a message sent on document head update
message TreeHeadUpdate {
repeated string heads = 1;
repeated RawTreeChangeWithId changes = 2;
repeated string snapshotPath = 3;
}
// TreeHeadUpdate is a message sent when document needs full sync
message TreeFullSyncRequest {
repeated string heads = 1;
repeated RawTreeChangeWithId changes = 2;
repeated string snapshotPath = 3;
}
// TreeFullSyncResponse is a message sent as a response for a specific full sync
message TreeFullSyncResponse {
repeated string heads = 1;
repeated RawTreeChangeWithId changes = 2;
repeated string snapshotPath = 3;
}
// TreeErrorResponse is an error sent as a response for a full sync request
message TreeErrorResponse {
string error = 1;
}

View File

@ -0,0 +1,37 @@
package treechangeproto
func WrapHeadUpdate(update *TreeHeadUpdate, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
return &TreeSyncMessage{
Content: &TreeSyncContentValue{
Value: &TreeSyncContentValue_HeadUpdate{HeadUpdate: update},
},
RootChange: rootChange,
}
}
func WrapFullRequest(request *TreeFullSyncRequest, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
return &TreeSyncMessage{
Content: &TreeSyncContentValue{
Value: &TreeSyncContentValue_FullSyncRequest{FullSyncRequest: request},
},
RootChange: rootChange,
}
}
func WrapFullResponse(response *TreeFullSyncResponse, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
return &TreeSyncMessage{
Content: &TreeSyncContentValue{
Value: &TreeSyncContentValue_FullSyncResponse{FullSyncResponse: response},
},
RootChange: rootChange,
}
}
func WrapError(err error, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
return &TreeSyncMessage{
Content: &TreeSyncContentValue{
Value: &TreeSyncContentValue_ErrorResponse{ErrorResponse: &TreeErrorResponse{Error: err.Error()}},
},
RootChange: rootChange,
}
}

File diff suppressed because it is too large Load Diff