diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 2813d997..9a656ee7 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -218,7 +218,7 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay deps := synctree.CreateDeps{ SpaceId: s.id, Payload: payload, - StreamPool: s.syncService.StreamPool(), + SyncService: s.syncService, Configuration: s.configuration, AclList: s.aclList, SpaceStorage: s.storage, @@ -234,8 +234,7 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay deps := synctree.CreateDeps{ SpaceId: s.id, Payload: payload, - StreamPool: s.syncService.StreamPool(), - Checker: s.syncService.StreamChecker(), + SyncService: s.syncService, Configuration: s.configuration, AclList: s.aclList, SpaceStorage: s.storage, @@ -250,8 +249,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene } deps := synctree.BuildDeps{ SpaceId: s.id, - StreamPool: s.syncService.StreamPool(), - Checker: s.syncService.StreamChecker(), + SyncService: s.syncService, Configuration: s.configuration, HeadNotifiable: s.diffService, Listener: listener, diff --git a/common/commonspace/syncservice/actionqueue.go b/common/commonspace/syncservice/actionqueue.go new file mode 100644 index 00000000..17a5400a --- /dev/null +++ b/common/commonspace/syncservice/actionqueue.go @@ -0,0 +1,87 @@ +package syncservice + +import ( + "context" + "github.com/cheggaaa/mb/v3" + "go.uber.org/zap" +) + +type ActionFunc func() error + +type ActionQueue interface { + Send(action ActionFunc) (err error) + Run() + Close() +} + +type actionQueue struct { + batcher *mb.MB[ActionFunc] + ctx context.Context + cancel context.CancelFunc + queueDone chan struct{} +} + +func NewActionQueue() ActionQueue { + return &actionQueue{ + batcher: mb.New[ActionFunc](0), + ctx: nil, + cancel: nil, + queueDone: make(chan struct{}), + } +} + +func (q *actionQueue) Send(action ActionFunc) (err error) { + log.Debug("adding action to batcher") + return q.batcher.Add(q.ctx, action) +} + +func (q *actionQueue) Run() { + log.Debug("running the queue") + q.ctx, q.cancel = context.WithCancel(context.Background()) + go q.read() +} + +func (q *actionQueue) read() { + limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) + for i := 0; i < maxSimultaneousOperationsPerStream; i++ { + limiter <- struct{}{} + } + defer func() { + // wait until all operations are done + for i := 0; i < maxSimultaneousOperationsPerStream; i++ { + <-limiter + } + close(q.queueDone) + }() + doSendActions := func() { + actions, err := q.batcher.Wait(q.ctx) + log.Debug("reading from batcher") + if err != nil { + log.With(zap.Error(err)).Error("queue finished") + return + } + for _, msg := range actions { + <-limiter + go func(action ActionFunc) { + err = action() + if err != nil { + log.With(zap.Error(err)).Debug("action errored out") + } + limiter <- struct{}{} + }(msg) + } + } + for { + select { + case <-q.ctx.Done(): + return + default: + doSendActions() + } + } +} + +func (q *actionQueue) Close() { + q.cancel() + <-q.queueDone +} diff --git a/common/commonspace/syncservice/streamchecker.go b/common/commonspace/syncservice/streamchecker.go index 35f9d3a6..85bd928d 100644 --- a/common/commonspace/syncservice/streamchecker.go +++ b/common/commonspace/syncservice/streamchecker.go @@ -5,7 +5,9 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + "go.uber.org/atomic" "go.uber.org/zap" + "time" ) type StreamChecker interface { @@ -19,8 +21,11 @@ type streamChecker struct { clientFactory spacesyncproto.ClientFactory log *zap.Logger syncCtx context.Context + lastCheck *atomic.Time } +const streamCheckerInterval = time.Second * 10 + func NewStreamChecker( spaceId string, connector nodeconf.ConfConnector, @@ -35,10 +40,18 @@ func NewStreamChecker( clientFactory: clientFactory, log: log, syncCtx: syncCtx, + lastCheck: atomic.NewTime(time.Time{}), } } func (s *streamChecker) CheckResponsiblePeers() { + lastCheck := s.lastCheck.Load() + now := time.Now() + if lastCheck.Add(streamCheckerInterval).After(now) { + return + } + s.lastCheck.Store(now) + var ( activeNodeIds []string configuration = s.connector.Configuration() diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 3a68a109..300b6b9e 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -20,13 +20,12 @@ type SyncService interface { synchandler.SyncHandler StreamPool() StreamPool StreamChecker() StreamChecker + ActionQueue() ActionQueue Init(getter objectgetter.ObjectGetter) Close() (err error) } -const respPeersStreamCheckInterval = 3000 - type syncService struct { spaceId string @@ -34,6 +33,7 @@ type syncService struct { checker StreamChecker periodicSync periodicsync.PeriodicSync objectGetter objectgetter.ObjectGetter + actionQueue ActionQueue syncCtx context.Context cancelSync context.CancelFunc @@ -85,15 +85,18 @@ func newSyncService( checker: checker, syncCtx: syncCtx, cancelSync: cancel, + actionQueue: NewActionQueue(), } } func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) { s.objectGetter = objectGetter + s.actionQueue.Run() s.periodicSync.Run() } func (s *syncService) Close() (err error) { + s.actionQueue.Close() s.periodicSync.Close() s.cancelSync() return s.streamPool.Close() @@ -119,3 +122,7 @@ func (s *syncService) StreamPool() StreamPool { func (s *syncService) StreamChecker() StreamChecker { return s.checker } + +func (s *syncService) ActionQueue() ActionQueue { + return s.actionQueue +} diff --git a/common/commonspace/synctree/queuedclient.go b/common/commonspace/synctree/queuedclient.go new file mode 100644 index 00000000..a0fceff9 --- /dev/null +++ b/common/commonspace/synctree/queuedclient.go @@ -0,0 +1,36 @@ +package synctree + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" +) + +type queuedClient struct { + SyncClient + queue syncservice.ActionQueue +} + +func newQueuedClient(client SyncClient, queue syncservice.ActionQueue) SyncClient { + return &queuedClient{ + SyncClient: client, + queue: queue, + } +} + +func (q *queuedClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { + return q.queue.Send(func() error { + return q.SyncClient.BroadcastAsync(message) + }) +} + +func (q *queuedClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { + return q.queue.Send(func() error { + return q.SyncClient.SendAsync(peerId, message, replyId) + }) +} + +func (q *queuedClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { + return q.queue.Send(func() error { + return q.SyncClient.BroadcastAsyncOrSendResponsible(message) + }) +} diff --git a/common/commonspace/synctree/queue.go b/common/commonspace/synctree/receivequeue.go similarity index 100% rename from common/commonspace/synctree/queue.go rename to common/commonspace/synctree/receivequeue.go diff --git a/common/commonspace/synctree/syncclient.go b/common/commonspace/synctree/syncclient.go index 4e1ad1e0..0d7df9fc 100644 --- a/common/commonspace/synctree/syncclient.go +++ b/common/commonspace/synctree/syncclient.go @@ -21,7 +21,8 @@ type syncClient struct { spaceId string connector nodeconf.ConfConnector configuration nodeconf.Configuration - checker syncservice.StreamChecker + + checker syncservice.StreamChecker } func newSyncClient( diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 4524cd6a..64c9f809 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -52,22 +52,20 @@ var log = logger.NewNamed("commonspace.synctree").Sugar() var createDerivedObjectTree = tree.CreateDerivedObjectTree var createObjectTree = tree.CreateObjectTree var buildObjectTree = tree.BuildObjectTree -var createSyncClient = newSyncClient +var createSyncClient = newWrappedSyncClient type CreateDeps struct { SpaceId string Payload tree.ObjectTreeCreatePayload Configuration nodeconf.Configuration - StreamPool syncservice.StreamPool - Checker syncservice.StreamChecker + SyncService syncservice.SyncService AclList list.ACLList SpaceStorage spacestorage.SpaceStorage } type BuildDeps struct { SpaceId string - StreamPool syncservice.StreamPool - Checker syncservice.StreamChecker + SyncService syncservice.SyncService Configuration nodeconf.Configuration HeadNotifiable HeadNotifiable Listener updatelistener.UpdateListener @@ -77,6 +75,11 @@ type BuildDeps struct { TreeUsage *atomic.Int32 } +func newWrappedSyncClient(spaceId string, factory RequestFactory, syncService syncservice.SyncService, configuration nodeconf.Configuration) SyncClient { + syncClient := newSyncClient(spaceId, syncService.StreamPool(), factory, configuration, syncService.StreamChecker()) + return newQueuedClient(syncClient, syncService.ActionQueue()) +} + func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) { objTree, err := createDerivedObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage) if err != nil { @@ -85,10 +88,9 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) syncClient := createSyncClient( deps.SpaceId, - deps.StreamPool, sharedFactory, - deps.Configuration, - deps.Checker) + deps.SyncService, + deps.Configuration) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) syncClient.BroadcastAsync(headUpdate) @@ -103,10 +105,9 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) } syncClient := createSyncClient( deps.SpaceId, - deps.StreamPool, - GetRequestFactory(), - deps.Configuration, - deps.Checker) + sharedFactory, + deps.SyncService, + deps.Configuration) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) syncClient.BroadcastAsync(headUpdate) @@ -126,7 +127,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t return } - resp, err := deps.StreamPool.SendSync(peerId, objMsg) + resp, err := deps.SyncService.StreamPool().SendSync(peerId, objMsg) if err != nil { return } @@ -190,10 +191,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy } syncClient := createSyncClient( deps.SpaceId, - deps.StreamPool, - GetRequestFactory(), - deps.Configuration, - deps.Checker) + sharedFactory, + deps.SyncService, + deps.Configuration) syncTree := &syncTree{ ObjectTree: objTree, syncClient: syncClient, @@ -241,7 +241,6 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo if s.notifiable != nil { s.notifiable.UpdateHeads(s.ID(), res.Heads) } - // it is more or less safe to send head updates when creating content (under lock) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) err = s.syncClient.BroadcastAsync(headUpdate) return @@ -269,8 +268,8 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload tree.RawCha if s.notifiable != nil { s.notifiable.UpdateHeads(s.ID(), res.Heads) } - // we removed the sending head updates from here, because this method can be called under a lock - // thus this can block access to the tree + headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) + err = s.syncClient.BroadcastAsync(headUpdate) } return } diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index 77f17bc1..f0bfe1fd 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -34,8 +34,6 @@ func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchand } } -type sendFunc = func() error - func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { unmarshalled := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, unmarshalled) @@ -48,21 +46,10 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms return } - actions, err := s.handleMessage(ctx, senderId) - if err != nil { - log.With(zap.Error(err)).Debug("handling message finished with error") - } - for _, action := range actions { - err := action() - if err != nil { - log.With(zap.Error(err)).Debug("error while sending action") - } - } - - return + return s.handleMessage(ctx, senderId) } -func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (actions []sendFunc, err error) { +func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) { s.objTree.Lock() defer s.objTree.Unlock() msg, err := s.queue.GetMessage(senderId) @@ -88,28 +75,27 @@ func (s *syncTreeHandler) handleHeadUpdate( ctx context.Context, senderId string, update *treechangeproto.TreeHeadUpdate, - replyId string) (actions []sendFunc, err error) { - log.With("senderId", senderId). - With("heads", update.Heads). - With("treeId", s.objTree.ID()). - Debug("received head update message") + replyId string) (err error) { var ( fullRequest *treechangeproto.TreeSyncMessage - headUpdate *treechangeproto.TreeSyncMessage isEmptyUpdate = len(update.Changes) == 0 objTree = s.objTree - addResult tree.AddResult ) + + log := log.With("senderId", senderId). + With("heads", objTree.Heads()). + With("treeId", objTree.ID()) + log.Debug("received head update message") + defer func() { - if headUpdate != nil { - actions = append(actions, func() error { - return s.syncClient.BroadcastAsync(headUpdate) - }) - } - if fullRequest != nil { - actions = append(actions, func() error { - return s.syncClient.SendAsync(senderId, fullRequest, replyId) - }) + if err != nil { + log.With(zap.Error(err)).Debug("head update finished with error") + } else if fullRequest != nil { + log.Debug("sending full sync request") + } else { + if !isEmptyUpdate { + log.Debug("head update finished correctly") + } } }() @@ -121,115 +107,99 @@ func (s *syncTreeHandler) handleHeadUpdate( } // we need to sync in any case fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) - return + if err != nil { + return + } + + return s.syncClient.SendAsync(senderId, fullRequest, replyId) } if s.alreadyHasHeads(objTree, update.Heads) { return } - addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ NewHeads: update.Heads, RawChanges: update.Changes, }) if err != nil { return } - if addResult.Mode != tree.Nothing { - headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) - } if s.alreadyHasHeads(objTree, update.Heads) { return } fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) - - if fullRequest != nil { - log.With("senderId", senderId). - With("heads", objTree.Heads()). - With("treeId", objTree.ID()). - Debug("sending full sync request") - } else { - log.With("senderId", senderId). - With("heads", update.Heads). - With("treeId", objTree.ID()). - Debug("head update finished correctly") + if err != nil { + return } - return + + return s.syncClient.SendAsync(senderId, fullRequest, replyId) } func (s *syncTreeHandler) handleFullSyncRequest( ctx context.Context, senderId string, request *treechangeproto.TreeFullSyncRequest, - replyId string) (actions []sendFunc, err error) { - log.With("senderId", senderId). - With("heads", request.Heads). - With("treeId", s.objTree.ID()). - With("trackingId", replyId). - Debug("received full sync request message") + replyId string) (err error) { var ( fullResponse *treechangeproto.TreeSyncMessage - headUpdate *treechangeproto.TreeSyncMessage - addResult tree.AddResult header = s.objTree.Header() objTree = s.objTree ) + + log := log.With("senderId", senderId). + With("heads", request.Heads). + With("treeId", s.objTree.ID()). + With("replyId", replyId) + log.Debug("received full sync request message") + defer func() { if err != nil { - actions = append(actions, func() error { - return s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId) - }) - return - } + log.With(zap.Error(err)).Debug("full sync request finished with error") - if headUpdate != nil { - actions = append(actions, func() error { - return s.syncClient.BroadcastAsync(headUpdate) - }) - } - if fullResponse != nil { - actions = append(actions, func() error { - return s.syncClient.SendAsync(senderId, fullResponse, replyId) - }) + s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId) + return + } else if fullResponse != nil { + log.Debug("full sync response sent") } }() if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) { - addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ NewHeads: request.Heads, RawChanges: request.Changes, }) if err != nil { return } - if addResult.Mode != tree.Nothing { - headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) - } } fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath) - return + if err != nil { + return + } + + return s.syncClient.SendAsync(senderId, fullResponse, replyId) } func (s *syncTreeHandler) handleFullSyncResponse( ctx context.Context, senderId string, - response *treechangeproto.TreeFullSyncResponse) (actions []sendFunc, err error) { - log.With("senderId", senderId). - With("heads", response.Heads). - With("treeId", s.objTree.ID()). - Debug("received full sync response message") + response *treechangeproto.TreeFullSyncResponse) (err error) { var ( - objTree = s.objTree - addResult tree.AddResult - headUpdate *treechangeproto.TreeSyncMessage + objTree = s.objTree ) + log := log.With("senderId", senderId). + With("heads", response.Heads). + With("treeId", s.objTree.ID()) + log.Debug("received full sync response message") + defer func() { - if headUpdate != nil { - actions = append(actions, func() error { - return s.syncClient.BroadcastAsync(headUpdate) - }) + if err != nil { + log.With(zap.Error(err)).Debug("full sync response failed") + } else { + log.Debug("full sync response succeeded") } }() @@ -237,21 +207,10 @@ func (s *syncTreeHandler) handleFullSyncResponse( return } - addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ NewHeads: response.Heads, RawChanges: response.Changes, }) - if err != nil { - return - } - if addResult.Mode != tree.Nothing { - headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) - } - - log.With("error", err != nil). - With("heads", response.Heads). - With("treeId", s.objTree.ID()). - Debug("finished full sync response") return } diff --git a/common/go.mod b/common/go.mod index 2393aa38..f8522376 100644 --- a/common/go.mod +++ b/common/go.mod @@ -28,6 +28,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/fogleman/gg v1.3.0 // indirect diff --git a/common/go.sum b/common/go.sum index 61def952..3359eaee 100644 --- a/common/go.sum +++ b/common/go.sum @@ -56,6 +56,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c h1:+bD75daSbsxyTzkKpNplC4xls+7/tGwty+zruzOnOmk= +github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=