Add queue to sync client

This commit is contained in:
mcrakhman 2022-12-16 16:40:23 +01:00
parent 367bd1978d
commit 0c85336e82
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
11 changed files with 230 additions and 127 deletions

View File

@ -218,7 +218,7 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
SyncService: s.syncService,
Configuration: s.configuration,
AclList: s.aclList,
SpaceStorage: s.storage,
@ -234,8 +234,7 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
Checker: s.syncService.StreamChecker(),
SyncService: s.syncService,
Configuration: s.configuration,
AclList: s.aclList,
SpaceStorage: s.storage,
@ -250,8 +249,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
}
deps := synctree.BuildDeps{
SpaceId: s.id,
StreamPool: s.syncService.StreamPool(),
Checker: s.syncService.StreamChecker(),
SyncService: s.syncService,
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,

View File

@ -0,0 +1,87 @@
package syncservice
import (
"context"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
)
type ActionFunc func() error
type ActionQueue interface {
Send(action ActionFunc) (err error)
Run()
Close()
}
type actionQueue struct {
batcher *mb.MB[ActionFunc]
ctx context.Context
cancel context.CancelFunc
queueDone chan struct{}
}
func NewActionQueue() ActionQueue {
return &actionQueue{
batcher: mb.New[ActionFunc](0),
ctx: nil,
cancel: nil,
queueDone: make(chan struct{}),
}
}
func (q *actionQueue) Send(action ActionFunc) (err error) {
log.Debug("adding action to batcher")
return q.batcher.Add(q.ctx, action)
}
func (q *actionQueue) Run() {
log.Debug("running the queue")
q.ctx, q.cancel = context.WithCancel(context.Background())
go q.read()
}
func (q *actionQueue) read() {
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
limiter <- struct{}{}
}
defer func() {
// wait until all operations are done
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
<-limiter
}
close(q.queueDone)
}()
doSendActions := func() {
actions, err := q.batcher.Wait(q.ctx)
log.Debug("reading from batcher")
if err != nil {
log.With(zap.Error(err)).Error("queue finished")
return
}
for _, msg := range actions {
<-limiter
go func(action ActionFunc) {
err = action()
if err != nil {
log.With(zap.Error(err)).Debug("action errored out")
}
limiter <- struct{}{}
}(msg)
}
}
for {
select {
case <-q.ctx.Done():
return
default:
doSendActions()
}
}
}
func (q *actionQueue) Close() {
q.cancel()
<-q.queueDone
}

View File

@ -5,7 +5,9 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"go.uber.org/atomic"
"go.uber.org/zap"
"time"
)
type StreamChecker interface {
@ -19,8 +21,11 @@ type streamChecker struct {
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
syncCtx context.Context
lastCheck *atomic.Time
}
const streamCheckerInterval = time.Second * 10
func NewStreamChecker(
spaceId string,
connector nodeconf.ConfConnector,
@ -35,10 +40,18 @@ func NewStreamChecker(
clientFactory: clientFactory,
log: log,
syncCtx: syncCtx,
lastCheck: atomic.NewTime(time.Time{}),
}
}
func (s *streamChecker) CheckResponsiblePeers() {
lastCheck := s.lastCheck.Load()
now := time.Now()
if lastCheck.Add(streamCheckerInterval).After(now) {
return
}
s.lastCheck.Store(now)
var (
activeNodeIds []string
configuration = s.connector.Configuration()

View File

@ -20,13 +20,12 @@ type SyncService interface {
synchandler.SyncHandler
StreamPool() StreamPool
StreamChecker() StreamChecker
ActionQueue() ActionQueue
Init(getter objectgetter.ObjectGetter)
Close() (err error)
}
const respPeersStreamCheckInterval = 3000
type syncService struct {
spaceId string
@ -34,6 +33,7 @@ type syncService struct {
checker StreamChecker
periodicSync periodicsync.PeriodicSync
objectGetter objectgetter.ObjectGetter
actionQueue ActionQueue
syncCtx context.Context
cancelSync context.CancelFunc
@ -85,15 +85,18 @@ func newSyncService(
checker: checker,
syncCtx: syncCtx,
cancelSync: cancel,
actionQueue: NewActionQueue(),
}
}
func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) {
s.objectGetter = objectGetter
s.actionQueue.Run()
s.periodicSync.Run()
}
func (s *syncService) Close() (err error) {
s.actionQueue.Close()
s.periodicSync.Close()
s.cancelSync()
return s.streamPool.Close()
@ -119,3 +122,7 @@ func (s *syncService) StreamPool() StreamPool {
func (s *syncService) StreamChecker() StreamChecker {
return s.checker
}
func (s *syncService) ActionQueue() ActionQueue {
return s.actionQueue
}

View File

@ -0,0 +1,36 @@
package synctree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
)
type queuedClient struct {
SyncClient
queue syncservice.ActionQueue
}
func newQueuedClient(client SyncClient, queue syncservice.ActionQueue) SyncClient {
return &queuedClient{
SyncClient: client,
queue: queue,
}
}
func (q *queuedClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) {
return q.queue.Send(func() error {
return q.SyncClient.BroadcastAsync(message)
})
}
func (q *queuedClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) {
return q.queue.Send(func() error {
return q.SyncClient.SendAsync(peerId, message, replyId)
})
}
func (q *queuedClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) {
return q.queue.Send(func() error {
return q.SyncClient.BroadcastAsyncOrSendResponsible(message)
})
}

View File

@ -21,7 +21,8 @@ type syncClient struct {
spaceId string
connector nodeconf.ConfConnector
configuration nodeconf.Configuration
checker syncservice.StreamChecker
checker syncservice.StreamChecker
}
func newSyncClient(

View File

@ -52,22 +52,20 @@ var log = logger.NewNamed("commonspace.synctree").Sugar()
var createDerivedObjectTree = tree.CreateDerivedObjectTree
var createObjectTree = tree.CreateObjectTree
var buildObjectTree = tree.BuildObjectTree
var createSyncClient = newSyncClient
var createSyncClient = newWrappedSyncClient
type CreateDeps struct {
SpaceId string
Payload tree.ObjectTreeCreatePayload
Configuration nodeconf.Configuration
StreamPool syncservice.StreamPool
Checker syncservice.StreamChecker
SyncService syncservice.SyncService
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
}
type BuildDeps struct {
SpaceId string
StreamPool syncservice.StreamPool
Checker syncservice.StreamChecker
SyncService syncservice.SyncService
Configuration nodeconf.Configuration
HeadNotifiable HeadNotifiable
Listener updatelistener.UpdateListener
@ -77,6 +75,11 @@ type BuildDeps struct {
TreeUsage *atomic.Int32
}
func newWrappedSyncClient(spaceId string, factory RequestFactory, syncService syncservice.SyncService, configuration nodeconf.Configuration) SyncClient {
syncClient := newSyncClient(spaceId, syncService.StreamPool(), factory, configuration, syncService.StreamChecker())
return newQueuedClient(syncClient, syncService.ActionQueue())
}
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) {
objTree, err := createDerivedObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage)
if err != nil {
@ -85,10 +88,9 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
syncClient := createSyncClient(
deps.SpaceId,
deps.StreamPool,
sharedFactory,
deps.Configuration,
deps.Checker)
deps.SyncService,
deps.Configuration)
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
syncClient.BroadcastAsync(headUpdate)
@ -103,10 +105,9 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
}
syncClient := createSyncClient(
deps.SpaceId,
deps.StreamPool,
GetRequestFactory(),
deps.Configuration,
deps.Checker)
sharedFactory,
deps.SyncService,
deps.Configuration)
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
syncClient.BroadcastAsync(headUpdate)
@ -126,7 +127,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
return
}
resp, err := deps.StreamPool.SendSync(peerId, objMsg)
resp, err := deps.SyncService.StreamPool().SendSync(peerId, objMsg)
if err != nil {
return
}
@ -190,10 +191,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
}
syncClient := createSyncClient(
deps.SpaceId,
deps.StreamPool,
GetRequestFactory(),
deps.Configuration,
deps.Checker)
sharedFactory,
deps.SyncService,
deps.Configuration)
syncTree := &syncTree{
ObjectTree: objTree,
syncClient: syncClient,
@ -241,7 +241,6 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.ID(), res.Heads)
}
// it is more or less safe to send head updates when creating content (under lock)
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate)
return
@ -269,8 +268,8 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload tree.RawCha
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.ID(), res.Heads)
}
// we removed the sending head updates from here, because this method can be called under a lock
// thus this can block access to the tree
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate)
}
return
}

View File

@ -34,8 +34,6 @@ func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchand
}
}
type sendFunc = func() error
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, unmarshalled)
@ -48,21 +46,10 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
return
}
actions, err := s.handleMessage(ctx, senderId)
if err != nil {
log.With(zap.Error(err)).Debug("handling message finished with error")
}
for _, action := range actions {
err := action()
if err != nil {
log.With(zap.Error(err)).Debug("error while sending action")
}
}
return
return s.handleMessage(ctx, senderId)
}
func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (actions []sendFunc, err error) {
func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) {
s.objTree.Lock()
defer s.objTree.Unlock()
msg, err := s.queue.GetMessage(senderId)
@ -88,28 +75,27 @@ func (s *syncTreeHandler) handleHeadUpdate(
ctx context.Context,
senderId string,
update *treechangeproto.TreeHeadUpdate,
replyId string) (actions []sendFunc, err error) {
log.With("senderId", senderId).
With("heads", update.Heads).
With("treeId", s.objTree.ID()).
Debug("received head update message")
replyId string) (err error) {
var (
fullRequest *treechangeproto.TreeSyncMessage
headUpdate *treechangeproto.TreeSyncMessage
isEmptyUpdate = len(update.Changes) == 0
objTree = s.objTree
addResult tree.AddResult
)
log := log.With("senderId", senderId).
With("heads", objTree.Heads()).
With("treeId", objTree.ID())
log.Debug("received head update message")
defer func() {
if headUpdate != nil {
actions = append(actions, func() error {
return s.syncClient.BroadcastAsync(headUpdate)
})
}
if fullRequest != nil {
actions = append(actions, func() error {
return s.syncClient.SendAsync(senderId, fullRequest, replyId)
})
if err != nil {
log.With(zap.Error(err)).Debug("head update finished with error")
} else if fullRequest != nil {
log.Debug("sending full sync request")
} else {
if !isEmptyUpdate {
log.Debug("head update finished correctly")
}
}
}()
@ -121,115 +107,99 @@ func (s *syncTreeHandler) handleHeadUpdate(
}
// we need to sync in any case
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
return
if err != nil {
return
}
return s.syncClient.SendAsync(senderId, fullRequest, replyId)
}
if s.alreadyHasHeads(objTree, update.Heads) {
return
}
addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: update.Heads,
RawChanges: update.Changes,
})
if err != nil {
return
}
if addResult.Mode != tree.Nothing {
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
}
if s.alreadyHasHeads(objTree, update.Heads) {
return
}
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
if fullRequest != nil {
log.With("senderId", senderId).
With("heads", objTree.Heads()).
With("treeId", objTree.ID()).
Debug("sending full sync request")
} else {
log.With("senderId", senderId).
With("heads", update.Heads).
With("treeId", objTree.ID()).
Debug("head update finished correctly")
if err != nil {
return
}
return
return s.syncClient.SendAsync(senderId, fullRequest, replyId)
}
func (s *syncTreeHandler) handleFullSyncRequest(
ctx context.Context,
senderId string,
request *treechangeproto.TreeFullSyncRequest,
replyId string) (actions []sendFunc, err error) {
log.With("senderId", senderId).
With("heads", request.Heads).
With("treeId", s.objTree.ID()).
With("trackingId", replyId).
Debug("received full sync request message")
replyId string) (err error) {
var (
fullResponse *treechangeproto.TreeSyncMessage
headUpdate *treechangeproto.TreeSyncMessage
addResult tree.AddResult
header = s.objTree.Header()
objTree = s.objTree
)
log := log.With("senderId", senderId).
With("heads", request.Heads).
With("treeId", s.objTree.ID()).
With("replyId", replyId)
log.Debug("received full sync request message")
defer func() {
if err != nil {
actions = append(actions, func() error {
return s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId)
})
return
}
log.With(zap.Error(err)).Debug("full sync request finished with error")
if headUpdate != nil {
actions = append(actions, func() error {
return s.syncClient.BroadcastAsync(headUpdate)
})
}
if fullResponse != nil {
actions = append(actions, func() error {
return s.syncClient.SendAsync(senderId, fullResponse, replyId)
})
s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId)
return
} else if fullResponse != nil {
log.Debug("full sync response sent")
}
}()
if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) {
addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: request.Heads,
RawChanges: request.Changes,
})
if err != nil {
return
}
if addResult.Mode != tree.Nothing {
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
}
}
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath)
return
if err != nil {
return
}
return s.syncClient.SendAsync(senderId, fullResponse, replyId)
}
func (s *syncTreeHandler) handleFullSyncResponse(
ctx context.Context,
senderId string,
response *treechangeproto.TreeFullSyncResponse) (actions []sendFunc, err error) {
log.With("senderId", senderId).
With("heads", response.Heads).
With("treeId", s.objTree.ID()).
Debug("received full sync response message")
response *treechangeproto.TreeFullSyncResponse) (err error) {
var (
objTree = s.objTree
addResult tree.AddResult
headUpdate *treechangeproto.TreeSyncMessage
objTree = s.objTree
)
log := log.With("senderId", senderId).
With("heads", response.Heads).
With("treeId", s.objTree.ID())
log.Debug("received full sync response message")
defer func() {
if headUpdate != nil {
actions = append(actions, func() error {
return s.syncClient.BroadcastAsync(headUpdate)
})
if err != nil {
log.With(zap.Error(err)).Debug("full sync response failed")
} else {
log.Debug("full sync response succeeded")
}
}()
@ -237,21 +207,10 @@ func (s *syncTreeHandler) handleFullSyncResponse(
return
}
addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: response.Heads,
RawChanges: response.Changes,
})
if err != nil {
return
}
if addResult.Mode != tree.Nothing {
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
}
log.With("error", err != nil).
With("heads", response.Heads).
With("treeId", s.objTree.ID()).
Debug("finished full sync response")
return
}

View File

@ -28,6 +28,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/fogleman/gg v1.3.0 // indirect

View File

@ -56,6 +56,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c h1:+bD75daSbsxyTzkKpNplC4xls+7/tGwty+zruzOnOmk=
github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=