diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index cb0f5d98..becf5779 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -8,44 +8,104 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" "github.com/gogo/protobuf/proto" + "go.uber.org/zap" + "sync" ) type syncTreeHandler struct { - objTree tree.ObjectTree - syncClient SyncClient + objTree tree.ObjectTree + syncClient SyncClient + handlerLock sync.Mutex + handlerMap map[string][]treeMsg +} + +const maxQueueSize = 5 + +type treeMsg struct { + replyId string + syncMessage *treechangeproto.TreeSyncMessage } func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler { return &syncTreeHandler{ objTree: objTree, syncClient: syncClient, + handlerMap: map[string][]treeMsg{}, } } +type sendFunc = func() error + func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { unmarshalled := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, unmarshalled) if err != nil { return } + + s.handlerLock.Lock() + queue := s.handlerMap[senderId] + queueFull := len(queue) >= maxQueueSize + queue = append(queue, treeMsg{msg.ReplyId, unmarshalled}) + s.handlerMap[senderId] = queue + s.handlerLock.Unlock() + + if queueFull { + return + } + + actions, err := s.handleMessage(ctx, senderId) + if err != nil { + log.With(zap.Error(err)).Debug("handling message finished with error") + } + for _, action := range actions { + err := action() + if err != nil { + log.With(zap.Error(err)).Debug("error while sending action") + } + } + + return +} + +func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (actions []sendFunc, err error) { + s.objTree.Lock() + defer s.objTree.Unlock() + s.handlerLock.Lock() + treeMessage := s.handlerMap[senderId][0] + unmarshalled := treeMessage.syncMessage + replyId := treeMessage.replyId + s.handlerLock.Unlock() + + defer func() { + s.handlerLock.Lock() + defer s.handlerLock.Unlock() + queue := s.handlerMap[senderId] + excessLen := len(queue) - maxQueueSize + 1 + if excessLen <= 0 { + excessLen = 1 + } + queue = queue[excessLen:] + s.handlerMap[senderId] = queue + }() content := unmarshalled.GetContent() switch { case content.GetHeadUpdate() != nil: - return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), msg.ReplyId) + return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), replyId) case content.GetFullSyncRequest() != nil: - return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), msg.ReplyId) + return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), replyId) case content.GetFullSyncResponse() != nil: return s.handleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse()) } - return nil + return } func (s *syncTreeHandler) handleHeadUpdate( ctx context.Context, senderId string, update *treechangeproto.TreeHeadUpdate, - replyId string) (err error) { + replyId string) (actions []sendFunc, err error) { log.With("senderId", senderId). With("heads", update.Heads). With("treeId", s.objTree.ID()). @@ -57,60 +117,62 @@ func (s *syncTreeHandler) handleHeadUpdate( objTree = s.objTree addResult tree.AddResult ) - - err = func() error { - objTree.Lock() - defer objTree.Unlock() - - // isEmptyUpdate is sent when the tree is brought up from cache - if isEmptyUpdate { - log.With("treeId", objTree.ID()).Debug("is empty update") - if slice.UnsortedEquals(objTree.Heads(), update.Heads) { - return nil - } - // we need to sync in any case - fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) - return err + defer func() { + if headUpdate != nil { + actions = append(actions, func() error { + return s.syncClient.BroadcastAsync(headUpdate) + }) } - - if s.alreadyHasHeads(objTree, update.Heads) { - return nil + if fullRequest != nil { + actions = append(actions, func() error { + return s.syncClient.SendAsync(senderId, fullRequest, replyId) + }) } - - addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ - NewHeads: update.Heads, - RawChanges: update.Changes, - }) - if err != nil { - return err - } - if addResult.Mode != tree.Nothing { - headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) - } - - if s.alreadyHasHeads(objTree, update.Heads) { - return nil - } - - fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) - return err }() - if headUpdate != nil { - s.syncClient.BroadcastAsync(headUpdate) + // isEmptyUpdate is sent when the tree is brought up from cache + if isEmptyUpdate { + log.With("treeId", objTree.ID()).Debug("is empty update") + if slice.UnsortedEquals(objTree.Heads(), update.Heads) { + return + } + // we need to sync in any case + fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) + return } + if s.alreadyHasHeads(objTree, update.Heads) { + return + } + + addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + NewHeads: update.Heads, + RawChanges: update.Changes, + }) + if err != nil { + return + } + if addResult.Mode != tree.Nothing { + headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) + } + + if s.alreadyHasHeads(objTree, update.Heads) { + return + } + + fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) + if fullRequest != nil { log.With("senderId", senderId). With("heads", objTree.Heads()). With("treeId", objTree.ID()). Debug("sending full sync request") - return s.syncClient.SendAsync(senderId, fullRequest, replyId) + } else { + log.With("senderId", senderId). + With("heads", update.Heads). + With("treeId", objTree.ID()). + Debug("head update finished correctly") } - log.With("senderId", senderId). - With("heads", update.Heads). - With("treeId", objTree.ID()). - Debug("head update finished correctly") return } @@ -118,7 +180,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( ctx context.Context, senderId string, request *treechangeproto.TreeFullSyncRequest, - replyId string) (err error) { + replyId string) (actions []sendFunc, err error) { log.With("senderId", senderId). With("heads", request.Heads). With("treeId", s.objTree.ID()). @@ -133,43 +195,44 @@ func (s *syncTreeHandler) handleFullSyncRequest( ) defer func() { if err != nil { - s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId) - } - }() - - err = func() error { - objTree.Lock() - defer objTree.Unlock() - - if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) { - addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ - NewHeads: request.Heads, - RawChanges: request.Changes, + actions = append(actions, func() error { + return s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId) }) - if err != nil { - return err - } - if addResult.Mode != tree.Nothing { - headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) - } + return } - fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath) - return err - }() - if headUpdate != nil { - s.syncClient.BroadcastAsync(headUpdate) - } - if err != nil { - return + if headUpdate != nil { + actions = append(actions, func() error { + return s.syncClient.BroadcastAsync(headUpdate) + }) + } + if fullResponse != nil { + actions = append(actions, func() error { + return s.syncClient.SendAsync(senderId, fullResponse, replyId) + }) + } + }() + + if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) { + addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + NewHeads: request.Heads, + RawChanges: request.Changes, + }) + if err != nil { + return + } + if addResult.Mode != tree.Nothing { + headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) + } } - return s.syncClient.SendAsync(senderId, fullResponse, replyId) + fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath) + return } func (s *syncTreeHandler) handleFullSyncResponse( ctx context.Context, senderId string, - response *treechangeproto.TreeFullSyncResponse) (err error) { + response *treechangeproto.TreeFullSyncResponse) (actions []sendFunc, err error) { log.With("senderId", senderId). With("heads", response.Heads). With("treeId", s.objTree.ID()). @@ -179,35 +242,33 @@ func (s *syncTreeHandler) handleFullSyncResponse( addResult tree.AddResult headUpdate *treechangeproto.TreeSyncMessage ) - err = func() error { - objTree.Lock() - defer objTree.Unlock() - - if s.alreadyHasHeads(objTree, response.Heads) { - return nil + defer func() { + if headUpdate != nil { + actions = append(actions, func() error { + return s.syncClient.BroadcastAsync(headUpdate) + }) } - - addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ - NewHeads: response.Heads, - RawChanges: response.Changes, - }) - if err != nil { - return err - } - if addResult.Mode != tree.Nothing { - headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) - } - return err }() - if headUpdate != nil { - s.syncClient.BroadcastAsync(headUpdate) + + if s.alreadyHasHeads(objTree, response.Heads) { + return + } + + addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + NewHeads: response.Heads, + RawChanges: response.Changes, + }) + if err != nil { + return + } + if addResult.Mode != tree.Nothing { + headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) } log.With("error", err != nil). With("heads", response.Heads). With("treeId", s.objTree.ID()). Debug("finished full sync response") - return }