From f21f05585ca4e5ec3ffdbe434eb2a82b5b6cf0ee Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 20 Oct 2022 16:02:16 +0200 Subject: [PATCH] Fix bugs with storage --- client/document/textdocument/textdocument.go | 3 +- .../commonspace/spacesyncproto/spacesync.go | 20 ++++++++++++ common/commonspace/syncservice/streampool.go | 8 ++++- common/commonspace/syncservice/synchandler.go | 31 +++++++++++++++++-- common/commonspace/synctree/synctree.go | 5 +++ node/storage/treestorage.go | 2 +- 6 files changed, 64 insertions(+), 5 deletions(-) diff --git a/client/document/textdocument/textdocument.go b/client/document/textdocument/textdocument.go index b749a759..03f3c151 100644 --- a/client/document/textdocument/textdocument.go +++ b/client/document/textdocument/textdocument.go @@ -71,7 +71,8 @@ func (t *textDocument) AddText(text string) (err error) { if err != nil { return } - + t.objTree.Lock() + defer t.objTree.Unlock() _, err = t.objTree.AddContent(context.Background(), tree.SignableChangeContent{ Data: res, Key: t.account.Account().SignKey, diff --git a/common/commonspace/spacesyncproto/spacesync.go b/common/commonspace/spacesyncproto/spacesync.go index 75eb3bca..c9351714 100644 --- a/common/commonspace/spacesyncproto/spacesync.go +++ b/common/commonspace/spacesyncproto/spacesync.go @@ -2,6 +2,7 @@ package spacesyncproto import ( + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "storj.io/drpc" ) @@ -25,6 +26,7 @@ func WrapHeadUpdate(update *ObjectHeadUpdate, rootChange *treechangeproto.RawTre }, RootChange: rootChange, TreeId: treeId, + TrackingId: trackingId, } } @@ -35,6 +37,7 @@ func WrapFullRequest(request *ObjectFullSyncRequest, rootChange *treechangeproto }, RootChange: rootChange, TreeId: treeId, + TrackingId: trackingId, } } @@ -45,6 +48,7 @@ func WrapFullResponse(response *ObjectFullSyncResponse, rootChange *treechangepr }, RootChange: rootChange, TreeId: treeId, + TrackingId: trackingId, } } @@ -55,5 +59,21 @@ func WrapError(err error, rootChange *treechangeproto.RawTreeChangeWithId, treeI }, RootChange: rootChange, TreeId: treeId, + TrackingId: trackingId, } } + +func MessageDescription(msg *ObjectSyncMessage) string { + content := msg.GetContent() + switch { + case content.GetHeadUpdate() != nil: + return fmt.Sprintf("head update/%v", content.GetHeadUpdate().Heads) + case content.GetFullSyncRequest() != nil: + return fmt.Sprintf("fullsync request/%v", content.GetFullSyncRequest().Heads) + case content.GetFullSyncResponse() != nil: + return fmt.Sprintf("fullsync response/%v", content.GetFullSyncResponse().Heads) + case content.GetErrorResponse() != nil: + return fmt.Sprintf("error response/%v", content.GetErrorResponse().Error) + } + return "" +} diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 9651d756..1421839f 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -90,7 +90,7 @@ func (s *streamPool) SendSync( if err != nil { return } - + // TODO: limit wait time here and remove the waiter reply = <-waiter.ch return } @@ -112,6 +112,9 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn streams := getStreams() s.Unlock() + log.With("description", spacesyncproto.MessageDescription(message)). + With("treeId", message.TreeId). + Debugf("sending message to %d peers", len(streams)) for _, s := range streams { err = s.Send(message) } @@ -158,6 +161,9 @@ Loop: func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) { streams := s.getAllStreams() + log.With("description", spacesyncproto.MessageDescription(message)). + With("treeId", message.TreeId). + Debugf("broadcasting message to %d peers", len(streams)) for _, stream := range streams { if err = stream.Send(message); err != nil { // TODO: add logging diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index deb0788a..16879b36 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -44,7 +44,10 @@ func (s *syncHandler) handleHeadUpdate( senderId string, update *spacesyncproto.ObjectHeadUpdate, msg *spacesyncproto.ObjectSyncMessage) (err error) { - + log.With("senderId", senderId). + With("heads", update.Heads). + With("treeId", msg.TreeId). + Debug("received head update message") var ( fullRequest *spacesyncproto.ObjectSyncMessage isEmptyUpdate = len(update.Changes) == 0 @@ -86,8 +89,16 @@ func (s *syncHandler) handleHeadUpdate( }() if fullRequest != nil { + log.With("senderId", senderId). + With("heads", update.Heads). + With("treeId", msg.TreeId). + Debug("sending full sync request") return s.syncClient.SendAsync([]string{senderId}, fullRequest) } + log.With("senderId", senderId). + With("heads", update.Heads). + With("treeId", msg.TreeId). + Debug("head update finished correctly") return } @@ -96,6 +107,11 @@ func (s *syncHandler) handleFullSyncRequest( senderId string, request *spacesyncproto.ObjectFullSyncRequest, msg *spacesyncproto.ObjectSyncMessage) (err error) { + log.With("senderId", senderId). + With("heads", request.Heads). + With("treeId", msg.TreeId). + With("trackingId", msg.TrackingId). + Debug("received full sync request message") var ( fullResponse *spacesyncproto.ObjectSyncMessage header = msg.RootChange @@ -141,11 +157,18 @@ func (s *syncHandler) handleFullSyncResponse( senderId string, response *spacesyncproto.ObjectFullSyncResponse, msg *spacesyncproto.ObjectSyncMessage) (err error) { + log.With("senderId", senderId). + With("heads", response.Heads). + With("treeId", msg.TreeId). + Debug("received full sync response message") objTree, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId) if err != nil { + log.With("senderId", senderId). + With("heads", response.Heads). + With("treeId", msg.TreeId). + Debug("failed to find the tree in full sync response") return } - err = func() error { objTree.Lock() defer objTree.Unlock() @@ -157,6 +180,10 @@ func (s *syncHandler) handleFullSyncResponse( _, err = objTree.AddRawChanges(ctx, response.Changes...) return err }() + log.With("error", err != nil). + With("heads", response.Heads). + With("treeId", msg.TreeId). + Debug("finished full sync response") return } diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 41f7c16a..4cd6e2ee 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -3,6 +3,7 @@ package synctree import ( "context" "errors" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" @@ -21,6 +22,8 @@ type SyncTree struct { isClosed bool } +var log = logger.NewNamed("commonspace.synctree").Sugar() + var createDerivedObjectTree = tree2.CreateDerivedObjectTree var createObjectTree = tree2.CreateObjectTree var buildObjectTree = tree2.BuildObjectTree @@ -138,8 +141,10 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot } func (s *SyncTree) Close() (err error) { + log.With("id", s.ID()).Debug("closing sync tree") s.Lock() defer s.Unlock() + log.With("id", s.ID()).Debug("taken lock on sync tree") if s.isClosed { err = ErrSyncTreeClosed return diff --git a/node/storage/treestorage.go b/node/storage/treestorage.go index 919f2140..7ad33aa1 100644 --- a/node/storage/treestorage.go +++ b/node/storage/treestorage.go @@ -99,7 +99,7 @@ func (t *treeStorage) Heads() (heads []string, err error) { if err != nil { return } - if heads == nil { + if headsBytes == nil { err = storage2.ErrUnknownTreeId return }