Removed sending under a lock in sync tree handler
This commit is contained in:
parent
38e055ed12
commit
c36fe14272
@ -205,12 +205,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
|||||||
syncTree.SyncHandler = syncHandler
|
syncTree.SyncHandler = syncHandler
|
||||||
t = syncTree
|
t = syncTree
|
||||||
syncTree.Lock()
|
syncTree.Lock()
|
||||||
defer syncTree.Unlock()
|
|
||||||
syncTree.afterBuild()
|
syncTree.afterBuild()
|
||||||
|
syncTree.Unlock()
|
||||||
|
|
||||||
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
|
||||||
// here we will have different behaviour based on who is sending this update
|
|
||||||
if isFirstBuild {
|
if isFirstBuild {
|
||||||
|
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
||||||
// send to everybody, because everybody should know that the node or client got new tree
|
// send to everybody, because everybody should know that the node or client got new tree
|
||||||
err = syncTree.syncClient.BroadcastAsync(headUpdate)
|
err = syncTree.syncClient.BroadcastAsync(headUpdate)
|
||||||
}
|
}
|
||||||
@ -242,6 +241,7 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
|
|||||||
if s.notifiable != nil {
|
if s.notifiable != nil {
|
||||||
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
||||||
}
|
}
|
||||||
|
// it is more or less safe to send head updates when creating content (under lock)
|
||||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||||
return
|
return
|
||||||
@ -269,8 +269,8 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload tree.RawCha
|
|||||||
if s.notifiable != nil {
|
if s.notifiable != nil {
|
||||||
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
||||||
}
|
}
|
||||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
// we removed the sending head updates from here, because this method can be called under a lock
|
||||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
// thus this can block access to the tree
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -52,8 +52,10 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
|||||||
Debug("received head update message")
|
Debug("received head update message")
|
||||||
var (
|
var (
|
||||||
fullRequest *treechangeproto.TreeSyncMessage
|
fullRequest *treechangeproto.TreeSyncMessage
|
||||||
|
headUpdate *treechangeproto.TreeSyncMessage
|
||||||
isEmptyUpdate = len(update.Changes) == 0
|
isEmptyUpdate = len(update.Changes) == 0
|
||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
|
addResult tree.AddResult
|
||||||
)
|
)
|
||||||
|
|
||||||
err = func() error {
|
err = func() error {
|
||||||
@ -75,13 +77,16 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
|
addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
|
||||||
NewHeads: update.Heads,
|
NewHeads: update.Heads,
|
||||||
RawChanges: update.Changes,
|
RawChanges: update.Changes,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if addResult.Mode != tree.Nothing {
|
||||||
|
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
|
||||||
|
}
|
||||||
|
|
||||||
if s.alreadyHasHeads(objTree, update.Heads) {
|
if s.alreadyHasHeads(objTree, update.Heads) {
|
||||||
return nil
|
return nil
|
||||||
@ -91,6 +96,10 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
|||||||
return err
|
return err
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if headUpdate != nil {
|
||||||
|
s.syncClient.BroadcastAsync(headUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
if fullRequest != nil {
|
if fullRequest != nil {
|
||||||
log.With("senderId", senderId).
|
log.With("senderId", senderId).
|
||||||
With("heads", objTree.Heads()).
|
With("heads", objTree.Heads()).
|
||||||
@ -117,6 +126,8 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
|||||||
Debug("received full sync request message")
|
Debug("received full sync request message")
|
||||||
var (
|
var (
|
||||||
fullResponse *treechangeproto.TreeSyncMessage
|
fullResponse *treechangeproto.TreeSyncMessage
|
||||||
|
headUpdate *treechangeproto.TreeSyncMessage
|
||||||
|
addResult tree.AddResult
|
||||||
header = s.objTree.Header()
|
header = s.objTree.Header()
|
||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
)
|
)
|
||||||
@ -131,18 +142,23 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
|||||||
defer objTree.Unlock()
|
defer objTree.Unlock()
|
||||||
|
|
||||||
if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) {
|
if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) {
|
||||||
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
|
addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
|
||||||
NewHeads: request.Heads,
|
NewHeads: request.Heads,
|
||||||
RawChanges: request.Changes,
|
RawChanges: request.Changes,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if addResult.Mode != tree.Nothing {
|
||||||
|
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath)
|
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath)
|
||||||
return err
|
return err
|
||||||
}()
|
}()
|
||||||
|
if headUpdate != nil {
|
||||||
|
s.syncClient.BroadcastAsync(headUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -158,14 +174,11 @@ func (s *syncTreeHandler) handleFullSyncResponse(
|
|||||||
With("heads", response.Heads).
|
With("heads", response.Heads).
|
||||||
With("treeId", s.objTree.ID()).
|
With("treeId", s.objTree.ID()).
|
||||||
Debug("received full sync response message")
|
Debug("received full sync response message")
|
||||||
objTree := s.objTree
|
var (
|
||||||
if err != nil {
|
objTree = s.objTree
|
||||||
log.With("senderId", senderId).
|
addResult tree.AddResult
|
||||||
With("heads", response.Heads).
|
headUpdate *treechangeproto.TreeSyncMessage
|
||||||
With("treeId", s.objTree.ID()).
|
)
|
||||||
Debug("failed to find the tree in full sync response")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = func() error {
|
err = func() error {
|
||||||
objTree.Lock()
|
objTree.Lock()
|
||||||
defer objTree.Unlock()
|
defer objTree.Unlock()
|
||||||
@ -174,12 +187,22 @@ func (s *syncTreeHandler) handleFullSyncResponse(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
|
addResult, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
|
||||||
NewHeads: response.Heads,
|
NewHeads: response.Heads,
|
||||||
RawChanges: response.Changes,
|
RawChanges: response.Changes,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if addResult.Mode != tree.Nothing {
|
||||||
|
headUpdate = s.syncClient.CreateHeadUpdate(objTree, addResult.Added)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}()
|
}()
|
||||||
|
if headUpdate != nil {
|
||||||
|
s.syncClient.BroadcastAsync(headUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
log.With("error", err != nil).
|
log.With("error", err != nil).
|
||||||
With("heads", response.Heads).
|
With("heads", response.Heads).
|
||||||
With("treeId", s.objTree.ID()).
|
With("treeId", s.objTree.ID()).
|
||||||
|
|||||||
@ -6,9 +6,9 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
|
||||||
timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn"
|
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn"
|
||||||
"github.com/libp2p/go-libp2p/core/sec"
|
"github.com/libp2p/go-libp2p/core/sec"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"net"
|
"net"
|
||||||
|
|||||||
@ -2,7 +2,7 @@ package secure
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|||||||
@ -2,8 +2,8 @@ package secure
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
timeoutconn "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/conn"
|
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/timeoutconn"
|
||||||
"github.com/libp2p/go-libp2p/core/crypto"
|
"github.com/libp2p/go-libp2p/core/crypto"
|
||||||
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
|
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
|
||||||
"net"
|
"net"
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
package conn
|
package timeoutconn
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
@ -31,6 +31,7 @@ func (c *Conn) Write(p []byte) (n int, err error) {
|
|||||||
c.Conn.SetWriteDeadline(time.Time{})
|
c.Conn.SetWriteDeadline(time.Time{})
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// if the connection is timed out and we should close it
|
||||||
c.Conn.Close()
|
c.Conn.Close()
|
||||||
}
|
}
|
||||||
return n, err
|
return n, err
|
||||||
Loading…
x
Reference in New Issue
Block a user