Removed sending under a lock in sync tree handler

This commit is contained in:
mcrakhman 2022-12-14 21:37:36 +01:00 committed by Mikhail Iudin
parent 5e8bd53fb2
commit ff12e4ad1e
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
6 changed files with 45 additions and 21 deletions

View File

@ -205,12 +205,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
t = syncTree t = syncTree
syncTree.Lock() syncTree.Lock()
defer syncTree.Unlock()
syncTree.afterBuild() syncTree.afterBuild()
syncTree.Unlock()
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// here we will have different behaviour based on who is sending this update
if isFirstBuild { if isFirstBuild {
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// send to everybody, because everybody should know that the node or client got new tree // send to everybody, because everybody should know that the node or client got new tree
err = syncTree.syncClient.BroadcastAsync(headUpdate) err = syncTree.syncClient.BroadcastAsync(headUpdate)
} }
@ -242,6 +241,7 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
if s.notifiable != nil { if s.notifiable != nil {
s.notifiable.UpdateHeads(s.ID(), res.Heads) s.notifiable.UpdateHeads(s.ID(), res.Heads)
} }
// it is more or less safe to send head updates when creating content (under lock)
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate) err = s.syncClient.BroadcastAsync(headUpdate)
return return
@ -269,8 +269,8 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload tree.RawCha
if s.notifiable != nil { if s.notifiable != nil {
s.notifiable.UpdateHeads(s.ID(), res.Heads) s.notifiable.UpdateHeads(s.ID(), res.Heads)
} }
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) // we removed the sending head updates from here, because this method can be called under a lock
err = s.syncClient.BroadcastAsync(headUpdate) // thus this can block access to the tree
} }
return return
} }

View File

@ -52,8 +52,10 @@ func (s *syncTreeHandler) handleHeadUpdate(
Debug("received head update message") Debug("received head update message")
var ( var (
fullRequest *treechangeproto.TreeSyncMessage fullRequest *treechangeproto.TreeSyncMessage
headUpdate *treechangeproto.TreeSyncMessage
isEmptyUpdate = len(update.Changes) == 0 isEmptyUpdate = len(update.Changes) == 0
objTree = s.objTree objTree = s.objTree
addResult tree.AddResult
) )
err = func() error { err = func() error {
@ -75,13 +77,16 @@ func (s *syncTreeHandler) handleHeadUpdate(
return nil return nil
} }
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: update.Heads, NewHeads: update.Heads,
RawChanges: update.Changes, RawChanges: update.Changes,
}) })
if err != nil { if err != nil {
return err return err
} }
if addResult.Mode != tree.Nothing {
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
}
if s.alreadyHasHeads(objTree, update.Heads) { if s.alreadyHasHeads(objTree, update.Heads) {
return nil return nil
@ -91,6 +96,10 @@ func (s *syncTreeHandler) handleHeadUpdate(
return err return err
}() }()
if headUpdate != nil {
s.syncClient.BroadcastAsync(headUpdate)
}
if fullRequest != nil { if fullRequest != nil {
log.With("senderId", senderId). log.With("senderId", senderId).
With("heads", objTree.Heads()). With("heads", objTree.Heads()).
@ -117,6 +126,8 @@ func (s *syncTreeHandler) handleFullSyncRequest(
Debug("received full sync request message") Debug("received full sync request message")
var ( var (
fullResponse *treechangeproto.TreeSyncMessage fullResponse *treechangeproto.TreeSyncMessage
headUpdate *treechangeproto.TreeSyncMessage
addResult tree.AddResult
header = s.objTree.Header() header = s.objTree.Header()
objTree = s.objTree objTree = s.objTree
) )
@ -131,18 +142,23 @@ func (s *syncTreeHandler) handleFullSyncRequest(
defer objTree.Unlock() defer objTree.Unlock()
if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) { if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: request.Heads, NewHeads: request.Heads,
RawChanges: request.Changes, RawChanges: request.Changes,
}) })
if err != nil { if err != nil {
return err return err
} }
if addResult.Mode != tree.Nothing {
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
}
} }
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath) fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath)
return err return err
}() }()
if headUpdate != nil {
s.syncClient.BroadcastAsync(headUpdate)
}
if err != nil { if err != nil {
return return
@ -158,14 +174,11 @@ func (s *syncTreeHandler) handleFullSyncResponse(
With("heads", response.Heads). With("heads", response.Heads).
With("treeId", s.objTree.ID()). With("treeId", s.objTree.ID()).
Debug("received full sync response message") Debug("received full sync response message")
objTree := s.objTree var (
if err != nil { objTree = s.objTree
log.With("senderId", senderId). addResult tree.AddResult
With("heads", response.Heads). headUpdate *treechangeproto.TreeSyncMessage
With("treeId", s.objTree.ID()). )
Debug("failed to find the tree in full sync response")
return
}
err = func() error { err = func() error {
objTree.Lock() objTree.Lock()
defer objTree.Unlock() defer objTree.Unlock()
@ -174,12 +187,22 @@ func (s *syncTreeHandler) handleFullSyncResponse(
return nil return nil
} }
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{ addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: response.Heads, NewHeads: response.Heads,
RawChanges: response.Changes, RawChanges: response.Changes,
}) })
if err != nil {
return err
}
if addResult.Mode != tree.Nothing {
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
}
return err return err
}() }()
if headUpdate != nil {
s.syncClient.BroadcastAsync(headUpdate)
}
log.With("error", err != nil). log.With("error", err != nil).
With("heads", response.Heads). With("heads", response.Heads).
With("treeId", s.objTree.ID()). With("treeId", s.objTree.ID()).

View File

@ -6,9 +6,9 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn"
"github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec"
"go.uber.org/zap" "go.uber.org/zap"
"net" "net"

View File

@ -2,7 +2,7 @@ package secure
import ( import (
"context" "context"
timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn"
"net" "net"
"time" "time"
) )

View File

@ -2,8 +2,8 @@ package secure
import ( import (
"context" "context"
timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn"
"github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/crypto"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"net" "net"

View File

@ -1,4 +1,4 @@
package conn package timeoutconn
import ( import (
"errors" "errors"
@ -31,6 +31,7 @@ func (c *Conn) Write(p []byte) (n int, err error) {
c.Conn.SetWriteDeadline(time.Time{}) c.Conn.SetWriteDeadline(time.Time{})
} }
if err != nil { if err != nil {
// if the connection is timed out and we should close it
c.Conn.Close() c.Conn.Close()
} }
return n, err return n, err