From 84d1f0df0690ef5f096b1a7b9fdfea1ac791c2d7 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 6 Aug 2022 10:22:09 +0200 Subject: [PATCH] Fix sync logic, move acks after handling finished, made message service async --- pkg/acl/acltree/acltree.go | 2 +- service/net/pool/message.go | 40 ++++++++-- service/net/pool/pool.go | 32 ++++++-- service/sync/document/service.go | 4 +- service/sync/message/service.go | 77 ++++++++----------- service/sync/requesthandler/requesthandler.go | 29 ++++--- 6 files changed, 110 insertions(+), 74 deletions(-) diff --git a/pkg/acl/acltree/acltree.go b/pkg/acl/acltree/acltree.go index 19c67822..07acee6c 100644 --- a/pkg/acl/acltree/acltree.go +++ b/pkg/acl/acltree/acltree.go @@ -482,7 +482,7 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh log.With( zap.Int("len(changes)", len(rawChanges)), zap.String("id", a.id)). - Debug("sending all changes after common snapshot") + Debug("returning all changes after common snapshot") return rawChanges, nil } diff --git a/service/net/pool/message.go b/service/net/pool/message.go index e6cb38fe..8ee7bdd5 100644 --- a/service/net/pool/message.go +++ b/service/net/pool/message.go @@ -38,14 +38,26 @@ func (m *Message) Ack() (err error) { } rep := &syncproto.Message{ Header: &syncproto.Header{ - TraceId: m.GetHeader().TraceId, - ReplyId: m.GetHeader().RequestId, - Type: syncproto.MessageType_MessageTypeSystem, + TraceId: m.GetHeader().TraceId, + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSystem, + DebugInfo: "Ack", }, Data: data, } - log.With(zap.String("header", rep.Header.String())).Info("sending ack to peer") - return m.peer.Send(rep) + err = m.peer.Send(rep) + if err != nil { + log.With( + zap.String("peerId", m.peer.Id()), + zap.String("header", rep.GetHeader().String())). + Error("failed sending ack to peer", zap.Error(err)) + } else { + log.With( + zap.String("peerId", m.peer.Id()), + zap.String("header", rep.GetHeader().String())). + Debug("sent ack to peer") + } + return } func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (err error) { @@ -63,11 +75,23 @@ func (m *Message) AckError(code syncproto.SystemErrorCode, description string) ( } rep := &syncproto.Message{ Header: &syncproto.Header{ - TraceId: []byte(bson.NewObjectId()), - ReplyId: m.GetHeader().RequestId, - Type: syncproto.MessageType_MessageTypeSystem, + TraceId: []byte(bson.NewObjectId()), + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSystem, + DebugInfo: "AckError", }, Data: data, } + if err != nil { + log.With( + zap.String("peerId", m.peer.Id()), + zap.String("header", rep.GetHeader().String())). + Error("failed sending ackError to peer", zap.Error(err)) + } else { + log.With( + zap.String("peerId", m.peer.Id()), + zap.String("header", rep.GetHeader().String())). + Debug("sent ackError to peer") + } return m.peer.Send(rep) } diff --git a/service/net/pool/pool.go b/service/net/pool/pool.go index f4aa266f..1516cffc 100644 --- a/service/net/pool/pool.go +++ b/service/net/pool/pool.go @@ -154,41 +154,59 @@ func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) { } func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) { + defer func() { + if err != nil { + log.With( + zap.String("peerId", peerId), + zap.String("header", msg.GetHeader().String())). + Error("failed sending message to peer", zap.Error(err)) + } else { + log.With( + zap.String("peerId", peerId), + zap.String("header", msg.GetHeader().String())). + Debug("sent message to peer") + } + }() + p.mu.RLock() peer := p.peersById[peerId] p.mu.RUnlock() if peer == nil { - return ErrPeerNotFound + err = ErrPeerNotFound + return } + repId := p.waiters.NewReplyId() msg.GetHeader().RequestId = repId ch := make(chan Reply, 1) + log.With(zap.Uint64("reply id", repId)).Debug("adding waiter for reply id") p.waiters.Add(repId, &waiter{ch: ch}) defer p.waiters.Remove(repId) + if err = peer.peer.Send(msg); err != nil { return } select { case rep := <-ch: if rep.Error != nil { - return rep.Error + err = rep.Error } - return nil case <-ctx.Done(): - log.Debug("context error happened in send and wait") - return ctx.Err() + log.Debug("context done in SendAndWait") + err = ctx.Err() } + return } func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) { - //TODO implement me panic("implement me") } func (p *pool) readPeerLoop(peer peer.Peer) (err error) { defer p.wg.Done() + limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) for i := 0; i < maxSimultaneousOperationsPerStream; i++ { limiter <- struct{}{} @@ -213,7 +231,7 @@ Loop: }() } if err = p.removePeer(peer.Id()); err != nil { - log.Error("remove peer error", zap.String("peerId", peer.Id())) + log.Error("remove peer error", zap.String("peerId", peer.Id()), zap.Error(err)) } return } diff --git a/service/sync/document/service.go b/service/sync/document/service.go index 7a22e590..b684e64b 100644 --- a/service/sync/document/service.go +++ b/service/sync/document/service.go @@ -100,7 +100,7 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro zap.String("header", header.String())). Debug("document updated in the database") - return s.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ + return s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, Changes: []*aclpb.RawChange{ch}, TreeId: id, @@ -155,7 +155,7 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e log.With(zap.String("id", id), zap.String("text", text)). Debug("creating document") - err = s.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ + err = s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, Changes: []*aclpb.RawChange{ch}, TreeId: id, diff --git a/service/sync/message/service.go b/service/sync/message/service.go index 44e7e68c..8f71f597 100644 --- a/service/sync/message/service.go +++ b/service/sync/message/service.go @@ -9,7 +9,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/gogo/protobuf/proto" - "go.uber.org/zap" "sync" "time" ) @@ -30,8 +29,8 @@ func New() app.Component { } type Service interface { - SendMessage(ctx context.Context, peerId string, msg *syncproto.Sync) error - SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error + SendMessageAsync(peerId string, msg *syncproto.Sync) error + SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error } func (s *service) Init(ctx context.Context, a *app.App) (err error) { @@ -55,72 +54,60 @@ func (s *service) Close(ctx context.Context) (err error) { } func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err error) { - log.With( - zap.String("peerId", msg.Peer().Id())). - Debug("handling message from peer") + defer func() { + if err != nil { + msg.AckError(syncproto.SystemError_UNKNOWN, err.Error()) + } else { + msg.Ack() + } + }() - err = msg.Ack() - if err != nil { - log.With(zap.String("peerId", msg.Peer().Id()), zap.Error(err)). - Error("could not ack message") - } else { - log.With(zap.String("peerId", msg.Peer().Id()), zap.Int("type", int(msg.Header.Type))). - Debug("ack returned") - } syncMsg := &syncproto.Sync{} err = proto.Unmarshal(msg.Data, syncMsg) if err != nil { - return err + return } timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - return s.requestHandler.HandleSyncMessage(timeoutCtx, msg.Peer().Id(), syncMsg) + err = s.requestHandler.HandleSyncMessage(timeoutCtx, msg.Peer().Id(), syncMsg) + return } -func (s *service) SendMessage(ctx context.Context, peerId string, msg *syncproto.Sync) error { - log.With( - zap.String("peerId", peerId), - zap.String("message", msgType(msg))). - Debug("sending message to peer") - - err := s.pool.DialAndAddPeer(context.Background(), peerId) +func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err error) { + err = s.pool.DialAndAddPeer(context.Background(), peerId) if err != nil { - return err + return } marshalled, err := proto.Marshal(msg) if err != nil { - return err + return } - err = s.pool.SendAndWait(ctx, peerId, &syncproto.Message{ - Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync}, - Data: marshalled, - }) - - if err != nil { - log.With( - zap.String("peerId", peerId), - zap.String("message", msgType(msg)), - zap.Error(err)). - Debug("failed to send message to peer") - } else { - log.With( - zap.String("peerId", peerId), - zap.String("message", msgType(msg))). - Debug("message send to peer completed") - } - return err + go s.sendAsync(peerId, msgType(msg), marshalled) + return } -func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error { +func (s *service) SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error { for _, rp := range s.nodes { - s.SendMessage(ctx, rp.PeerId, msg) + s.SendMessageAsync(rp.PeerId, msg) } return nil } +func (s *service) sendAsync(peerId string, msgTypeStr string, marshalled []byte) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + return s.pool.SendAndWait(ctx, peerId, &syncproto.Message{ + Header: &syncproto.Header{ + Type: syncproto.MessageType_MessageTypeSync, + DebugInfo: msgTypeStr, + }, + Data: marshalled, + }) +} + func msgType(content *syncproto.Sync) string { msg := content.GetMessage() switch { diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index cce5ab0b..2eb70551 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -32,8 +32,8 @@ type RequestHandler interface { } type MessageSender interface { - SendMessage(ctx context.Context, peerId string, msg *syncproto.Sync) error - SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error + SendMessageAsync(peerId string, msg *syncproto.Sync) error + SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error } const CName = "SyncRequestHandler" @@ -77,7 +77,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, result acltree.AddResult ) log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)). - Debug("received head update message") + Debug("processing head update") err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { // TODO: check if we already have those changes @@ -85,6 +85,8 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, if err != nil { return err } + log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", tree.Heads())). + Debug("comparing heads after head update") shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads()) snapshotPath = tree.SnapshotPath() if shouldFullSync { @@ -105,7 +107,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, } // if we have incompatible heads, or we haven't seen the tree at all if fullRequest != nil { - return r.messageService.SendMessage(ctx, senderId, syncproto.WrapFullRequest(fullRequest)) + return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest)) } // if error or nothing has changed if err != nil || len(result.Added) == 0 { @@ -119,7 +121,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, TreeId: update.TreeId, TreeHeader: update.TreeHeader, } - return r.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.SyncFullRequest) (err error) { @@ -129,7 +131,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str result acltree.AddResult ) log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)). - Debug("received full sync request message") + Debug("processing full sync request") err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { // TODO: check if we already have those changes @@ -150,7 +152,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str if err != nil { return err } - err = r.messageService.SendMessage(ctx, senderId, syncproto.WrapFullResponse(fullResponse)) + err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse)) // if error or nothing has changed if err != nil || len(result.Added) == 0 { return err @@ -164,7 +166,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str TreeId: request.TreeId, TreeHeader: request.TreeHeader, } - return r.messageService.SendToSpace(ctx, "def", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) { @@ -173,7 +175,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st result acltree.AddResult ) log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)). - Debug("received full sync response message") + Debug("processing full sync response") err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error { // TODO: check if we already have those changes @@ -194,6 +196,11 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st if err != nil { return err } + result = acltree.AddResult{ + OldHeads: []string{}, + Heads: response.Heads, + Added: response.Changes, + } } // sending heads update message newUpdate := &syncproto.SyncHeadUpdate{ @@ -202,7 +209,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st SnapshotPath: snapshotPath, TreeId: response.TreeId, } - return r.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.SyncFullRequest, error) { @@ -242,7 +249,7 @@ func (r *requestHandler) prepareFullSyncResponse( } } log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)). - Debug("sending changes for tree") + Debug("preparing changes for tree") return &syncproto.SyncFullResponse{ Heads: tree.Heads(),