From ff12e4ad1e66531f5201c77f506cf24ebda741c2 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 14 Dec 2022 21:37:36 +0100 Subject: [PATCH] Removed sending under a lock in sync tree handler --- common/commonspace/synctree/synctree.go | 10 ++-- .../commonspace/synctree/synctreehandler.go | 47 ++++++++++++++----- common/net/dialer/dialer.go | 2 +- common/net/secure/basiclistener.go | 2 +- common/net/secure/tlslistener.go | 2 +- common/net/{conn => timeoutconn}/conn.go | 3 +- 6 files changed, 45 insertions(+), 21 deletions(-) rename common/net/{conn => timeoutconn}/conn.go (89%) diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index bf991efe..76da81b0 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -205,12 +205,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy syncTree.SyncHandler = syncHandler t = syncTree syncTree.Lock() - defer syncTree.Unlock() syncTree.afterBuild() + syncTree.Unlock() - headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) - // here we will have different behaviour based on who is sending this update if isFirstBuild { + headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // send to everybody, because everybody should know that the node or client got new tree err = syncTree.syncClient.BroadcastAsync(headUpdate) } @@ -242,6 +241,7 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo if s.notifiable != nil { s.notifiable.UpdateHeads(s.ID(), res.Heads) } + // it is more or less safe to send head updates when creating content (under lock) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) err = s.syncClient.BroadcastAsync(headUpdate) return @@ -269,8 +269,8 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload tree.RawCha if s.notifiable != nil { s.notifiable.UpdateHeads(s.ID(), res.Heads) } - headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) - err = s.syncClient.BroadcastAsync(headUpdate) + // we removed the sending head updates from here, because this method can be called under a lock + // thus this can block access to the tree } return } diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index 5939a16a..cb0f5d98 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -52,8 +52,10 @@ func (s *syncTreeHandler) handleHeadUpdate( Debug("received head update message") var ( fullRequest *treechangeproto.TreeSyncMessage + headUpdate *treechangeproto.TreeSyncMessage isEmptyUpdate = len(update.Changes) == 0 objTree = s.objTree + addResult tree.AddResult ) err = func() error { @@ -75,13 +77,16 @@ func (s *syncTreeHandler) handleHeadUpdate( return nil } - _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ NewHeads: update.Heads, RawChanges: update.Changes, }) if err != nil { return err } + if addResult.Mode != tree.Nothing { + headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) + } if s.alreadyHasHeads(objTree, update.Heads) { return nil @@ -91,6 +96,10 @@ func (s *syncTreeHandler) handleHeadUpdate( return err }() + if headUpdate != nil { + s.syncClient.BroadcastAsync(headUpdate) + } + if fullRequest != nil { log.With("senderId", senderId). With("heads", objTree.Heads()). @@ -117,6 +126,8 @@ func (s *syncTreeHandler) handleFullSyncRequest( Debug("received full sync request message") var ( fullResponse *treechangeproto.TreeSyncMessage + headUpdate *treechangeproto.TreeSyncMessage + addResult tree.AddResult header = s.objTree.Header() objTree = s.objTree ) @@ -131,18 +142,23 @@ func (s *syncTreeHandler) handleFullSyncRequest( defer objTree.Unlock() if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) { - _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ NewHeads: request.Heads, RawChanges: request.Changes, }) if err != nil { return err } + if addResult.Mode != tree.Nothing { + headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) + } } - fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath) return err }() + if headUpdate != nil { + s.syncClient.BroadcastAsync(headUpdate) + } if err != nil { return @@ -158,14 +174,11 @@ func (s *syncTreeHandler) handleFullSyncResponse( With("heads", response.Heads). With("treeId", s.objTree.ID()). Debug("received full sync response message") - objTree := s.objTree - if err != nil { - log.With("senderId", senderId). - With("heads", response.Heads). - With("treeId", s.objTree.ID()). - Debug("failed to find the tree in full sync response") - return - } + var ( + objTree = s.objTree + addResult tree.AddResult + headUpdate *treechangeproto.TreeSyncMessage + ) err = func() error { objTree.Lock() defer objTree.Unlock() @@ -174,12 +187,22 @@ func (s *syncTreeHandler) handleFullSyncResponse( return nil } - _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ + addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ NewHeads: response.Heads, RawChanges: response.Changes, }) + if err != nil { + return err + } + if addResult.Mode != tree.Nothing { + headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added) + } return err }() + if headUpdate != nil { + s.syncClient.BroadcastAsync(headUpdate) + } + log.With("error", err != nil). With("heads", response.Heads). With("treeId", s.objTree.ID()). diff --git a/common/net/dialer/dialer.go b/common/net/dialer/dialer.go index 3d680c59..44b23b54 100644 --- a/common/net/dialer/dialer.go +++ b/common/net/dialer/dialer.go @@ -6,9 +6,9 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" - timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn" "github.com/libp2p/go-libp2p/core/sec" "go.uber.org/zap" "net" diff --git a/common/net/secure/basiclistener.go b/common/net/secure/basiclistener.go index 05a0cb1a..927965af 100644 --- a/common/net/secure/basiclistener.go +++ b/common/net/secure/basiclistener.go @@ -2,7 +2,7 @@ package secure import ( "context" - timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn" "net" "time" ) diff --git a/common/net/secure/tlslistener.go b/common/net/secure/tlslistener.go index dd657a73..120c6f01 100644 --- a/common/net/secure/tlslistener.go +++ b/common/net/secure/tlslistener.go @@ -2,8 +2,8 @@ package secure import ( "context" - timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn" "github.com/libp2p/go-libp2p/core/crypto" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "net" diff --git a/common/net/conn/conn.go b/common/net/timeoutconn/conn.go similarity index 89% rename from common/net/conn/conn.go rename to common/net/timeoutconn/conn.go index 6504ac2a..77f53069 100644 --- a/common/net/conn/conn.go +++ b/common/net/timeoutconn/conn.go @@ -1,4 +1,4 @@ -package conn +package timeoutconn import ( "errors" @@ -31,6 +31,7 @@ func (c *Conn) Write(p []byte) (n int, err error) { c.Conn.SetWriteDeadline(time.Time{}) } if err != nil { + // if the connection is timed out and we should close it c.Conn.Close() } return n, err