From 1c81926f35b5950cc9456d5be5c6b81295cf3ad3 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 5 Aug 2022 18:14:35 +0200 Subject: [PATCH] Fix sync bugs and add logs --- pkg/acl/acltree/acltree.go | 15 ++++++- pkg/acl/acltree/treebuilder.go | 44 ++++++++++++------- service/node/service.go | 6 ++- service/sync/document/service.go | 4 +- service/sync/message/service.go | 14 +++--- service/sync/requesthandler/requesthandler.go | 14 +++++- service/treecache/service.go | 2 +- 7 files changed, 70 insertions(+), 29 deletions(-) diff --git a/pkg/acl/acltree/acltree.go b/pkg/acl/acltree/acltree.go index 4697be1e..19c67822 100644 --- a/pkg/acl/acltree/acltree.go +++ b/pkg/acl/acltree/acltree.go @@ -7,6 +7,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" + "go.uber.org/zap" "sync" ) @@ -430,7 +431,7 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh // but no changes after some of the snapshots var ( - isNewDocument = len(theirPath) != 0 + isNewDocument = len(theirPath) == 0 ourPath = a.SnapshotPath() // by default returning everything we have commonSnapshot = ourPath[len(ourPath)-1] // TODO: root snapshot, probably it is better to have a specific method in treestorage @@ -462,16 +463,26 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh return ch, nil } // we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes + log.With( + zap.Strings("heads", a.fullTree.Heads()), + zap.String("breakpoint", commonSnapshot), + zap.String("id", a.id)). + Debug("getting all changes from common snapshot") _, err = a.treeBuilder.dfs(a.fullTree.Heads(), commonSnapshot, load) if err != nil { return nil, err } if isNewDocument { + // adding snapshot to raw changes _, err = load(commonSnapshot) if err != nil { return nil, err } } + log.With( + zap.Int("len(changes)", len(rawChanges)), + zap.String("id", a.id)). + Debug("sending all changes after common snapshot") return rawChanges, nil } @@ -483,6 +494,8 @@ func (a *aclTree) DebugDump() (string, error) { func (a *aclTree) commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) { var i int var j int + log.With(zap.Strings("our path", ourPath), zap.Strings("their path", theirPath)). + Debug("finding common snapshot for two paths") OuterLoop: // find starting point from the right for i = len(ourPath) - 1; i >= 0; i-- { diff --git a/pkg/acl/acltree/treebuilder.go b/pkg/acl/acltree/treebuilder.go index eb58dc93..9bb9a4b8 100644 --- a/pkg/acl/acltree/treebuilder.go +++ b/pkg/acl/acltree/treebuilder.go @@ -4,10 +4,8 @@ import ( "errors" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread" - - "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys" - //"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/logging" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" ) @@ -18,18 +16,18 @@ var ( type treeBuilder struct { cache map[string]*Change - identityKeys map[string]keys.SigningPubKey - signingPubKeyDecoder keys.SigningPubKeyDecoder + identityKeys map[string]signingkey.PubKey + signingPubKeyDecoder signingkey.PubKeyDecoder tree *Tree - thread thread.Thread + treeStorage treestorage.TreeStorage *changeLoader } -func newTreeBuilder(t thread.Thread, decoder keys.SigningPubKeyDecoder) *treeBuilder { +func newTreeBuilder(t treestorage.TreeStorage, decoder signingkey.PubKeyDecoder) *treeBuilder { return &treeBuilder{ signingPubKeyDecoder: decoder, - thread: t, + treeStorage: t, changeLoader: newChangeLoader( t, decoder, @@ -39,15 +37,23 @@ func newTreeBuilder(t thread.Thread, decoder keys.SigningPubKeyDecoder) *treeBui func (tb *treeBuilder) Init() { tb.cache = make(map[string]*Change) - tb.identityKeys = make(map[string]keys.SigningPubKey) + tb.identityKeys = make(map[string]signingkey.PubKey) tb.tree = &Tree{} tb.changeLoader.Init(tb.cache, tb.identityKeys) } func (tb *treeBuilder) Build(fromStart bool) (*Tree, error) { var headsAndOrphans []string - headsAndOrphans = append(headsAndOrphans, tb.thread.Orphans()...) - headsAndOrphans = append(headsAndOrphans, tb.thread.Heads()...) + orphans, err := tb.treeStorage.Orphans() + if err != nil { + return nil, err + } + heads, err := tb.treeStorage.Heads() + if err != nil { + return nil, err + } + headsAndOrphans = append(headsAndOrphans, orphans...) + headsAndOrphans = append(headsAndOrphans, heads...) if fromStart { if err := tb.buildTreeFromStart(headsAndOrphans); err != nil { @@ -109,7 +115,10 @@ func (tb *treeBuilder) dfsFromStart(heads []string) (buf []*Change, root *Change possibleRoots = append(possibleRoots, ch) } } - header := tb.thread.Header() + header, err := tb.treeStorage.Header() + if err != nil { + return nil, nil, err + } for _, r := range possibleRoots { if r.Id == header.FirstChangeId { return buf, r, nil @@ -125,13 +134,16 @@ func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error) return } tb.tree.AddFast(ch) - changes, err := tb.dfs(heads, breakpoint) + changes, err := tb.dfs(heads, breakpoint, tb.loadChange) tb.tree.AddFast(changes...) return } -func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, err error) { +func (tb *treeBuilder) dfs( + heads []string, + breakpoint string, + load func(string) (*Change, error)) (buf []*Change, err error) { stack := make([]string, len(heads), len(heads)*2) copy(stack, heads) @@ -144,7 +156,7 @@ func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, er continue } - ch, err := tb.loadChange(id) + ch, err := load(id) if err != nil { continue } diff --git a/service/node/service.go b/service/node/service.go index 788624b1..f3b82ba0 100644 --- a/service/node/service.go +++ b/service/node/service.go @@ -3,14 +3,18 @@ package node import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey" + "go.uber.org/zap" ) const CName = "NodesService" +var log = logger.NewNamed("nodesservice") + type Node struct { Address string PeerId string @@ -46,7 +50,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) { if err != nil { return err } - + log.With(zap.String("node", node.PeerId)).Debug("adding peer to known nodes") filteredNodes = append(filteredNodes, node) } s.nodes = filteredNodes diff --git a/service/sync/document/service.go b/service/sync/document/service.go index b1d39689..7a22e590 100644 --- a/service/sync/document/service.go +++ b/service/sync/document/service.go @@ -117,8 +117,6 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e snapshotPath []string heads []string ) - log.With(zap.String("id", id), zap.String("text", text)). - Debug("creating document") err = s.treeCache.Create(ctx, func(builder acltree.ChangeBuilder) error { err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin) @@ -154,6 +152,8 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e if err != nil { return "", err } + log.With(zap.String("id", id), zap.String("text", text)). + Debug("creating document") err = s.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, diff --git a/service/sync/message/service.go b/service/sync/message/service.go index 2d9f7027..12c3a844 100644 --- a/service/sync/message/service.go +++ b/service/sync/message/service.go @@ -4,8 +4,8 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/gogo/protobuf/proto" @@ -19,7 +19,7 @@ var log = logger.NewNamed("messageservice") const CName = "MessageService" type service struct { - nodes []config.Node + nodes []*node.Node requestHandler requesthandler.RequestHandler pool pool.Pool sync.RWMutex @@ -36,7 +36,7 @@ type Service interface { func (s *service) Init(ctx context.Context, a *app.App) (err error) { s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler) - s.nodes = a.MustComponent(config.CName).(*config.Config).Nodes + s.nodes = a.MustComponent(node.CName).(node.Service).Nodes() s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage) return nil @@ -106,10 +106,6 @@ func (s *service) SendMessage(ctx context.Context, peerId string, msg *syncproto } func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error { - log.With( - zap.String("message", msgType(msg))). - Debug("sending message to all") - // dial manually to all peers for _, rp := range s.nodes { if er := s.pool.DialAndAddPeer(context.Background(), rp.PeerId); er != nil { @@ -126,6 +122,10 @@ func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncprot // TODO: use Broadcast method here when it is ready for _, n := range s.nodes { + log.With( + zap.String("peerId", n.PeerId), + zap.String("message", msgType(msg))). + Debug("sending message to peer") err := s.pool.SendAndWait(ctx, n.PeerId, &syncproto.Message{ Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync}, Data: marshalled, diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index d26eb2db..0c1bad55 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -3,6 +3,7 @@ package requesthandler import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" @@ -11,6 +12,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" + "go.uber.org/zap" ) type requestHandler struct { @@ -19,6 +21,8 @@ type requestHandler struct { messageService MessageSender } +var log = logger.NewNamed("requesthandler") + func New() app.Component { return &requestHandler{} } @@ -72,6 +76,8 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, snapshotPath []string result acltree.AddResult ) + log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)). + Debug("received head update message") err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { // TODO: check if we already have those changes @@ -122,6 +128,8 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str snapshotPath []string result acltree.AddResult ) + log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)). + Debug("received full sync request message") err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { // TODO: check if we already have those changes @@ -164,6 +172,8 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st snapshotPath []string result acltree.AddResult ) + log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)). + Debug("received full sync response message") err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error { // TODO: check if we already have those changes @@ -227,10 +237,12 @@ func (r *requestHandler) prepareFullSyncResponse( // filtering our changes, so we will not send the same changes back var final []*aclpb.RawChange for _, ch := range ourChanges { - if _, exists := theirMap[ch.Id]; exists { + if _, exists := theirMap[ch.Id]; !exists { final = append(final, ch) } } + log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)). + Debug("sending changes for tree") return &syncproto.SyncFullResponse{ Heads: tree.Heads(), diff --git a/service/treecache/service.go b/service/treecache/service.go index 78985a68..14a011a9 100644 --- a/service/treecache/service.go +++ b/service/treecache/service.go @@ -69,7 +69,7 @@ func (s *service) Do(ctx context.Context, treeId string, f ACLTreeFunc) error { func (s *service) Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error { log. - With(zap.String("treeId", treeId)). + With(zap.String("treeId", treeId), zap.Int("len(changes)", len(changes))). Debug("adding tree with changes") _, err := s.treeProvider.CreateTreeStorage(treeId, header, changes)