Update synctree to connect with responsible node
This commit is contained in:
parent
18bc3314fe
commit
25a52f41b8
@ -79,10 +79,15 @@ func newWrappedSyncClient(
|
|||||||
|
|
||||||
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
|
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
|
||||||
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
|
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||||
|
streamChecker := deps.ObjectSync.StreamChecker()
|
||||||
peerId, err := peer.CtxPeerId(ctx)
|
peerId, err := peer.CtxPeerId(ctx)
|
||||||
|
if err != nil {
|
||||||
|
streamChecker.CheckResponsiblePeers()
|
||||||
|
peerId, err = streamChecker.FirstResponsiblePeer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
|
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
|
||||||
objMsg, err := marshallTreeMessage(newTreeRequest, id, "")
|
objMsg, err := marshallTreeMessage(newTreeRequest, id, "")
|
||||||
|
|||||||
@ -16,6 +16,7 @@ import (
|
|||||||
type StreamChecker interface {
|
type StreamChecker interface {
|
||||||
CheckResponsiblePeers()
|
CheckResponsiblePeers()
|
||||||
CheckPeerConnection(peerId string) (err error)
|
CheckPeerConnection(peerId string) (err error)
|
||||||
|
FirstResponsiblePeer() (peerId string, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamChecker struct {
|
type streamChecker struct {
|
||||||
@ -28,7 +29,7 @@ type streamChecker struct {
|
|||||||
lastCheck *atomic.Time
|
lastCheck *atomic.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
const streamCheckerInterval = time.Second * 10
|
const streamCheckerInterval = time.Second * 5
|
||||||
|
|
||||||
func NewStreamChecker(
|
func NewStreamChecker(
|
||||||
spaceId string,
|
spaceId string,
|
||||||
@ -131,3 +132,15 @@ func (s *streamChecker) createStream(p peer.Peer) (err error) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *streamChecker) FirstResponsiblePeer() (peerId string, err error) {
|
||||||
|
nodeIds := s.connector.Configuration().NodeIds(s.spaceId)
|
||||||
|
for _, nodeId := range nodeIds {
|
||||||
|
if s.streamPool.HasActiveStream(nodeId) {
|
||||||
|
peerId = nodeId
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = fmt.Errorf("no responsible peers are connected")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
app "github.com/anytypeio/any-sync/app"
|
app "github.com/anytypeio/any-sync/app"
|
||||||
nodeconf "github.com/anytypeio/any-sync/nodeconf"
|
nodeconf "github.com/anytypeio/any-sync/nodeconf"
|
||||||
|
chash "github.com/anytypeio/go-chash"
|
||||||
gomock "github.com/golang/mock/gomock"
|
gomock "github.com/golang/mock/gomock"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -128,6 +129,20 @@ func (mr *MockConfigurationMockRecorder) Addresses() *gomock.Call {
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addresses", reflect.TypeOf((*MockConfiguration)(nil).Addresses))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addresses", reflect.TypeOf((*MockConfiguration)(nil).Addresses))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CHash mocks base method.
|
||||||
|
func (m *MockConfiguration) CHash() chash.CHash {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "CHash")
|
||||||
|
ret0, _ := ret[0].(chash.CHash)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// CHash indicates an expected call of CHash.
|
||||||
|
func (mr *MockConfigurationMockRecorder) CHash() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CHash", reflect.TypeOf((*MockConfiguration)(nil).CHash))
|
||||||
|
}
|
||||||
|
|
||||||
// ConsensusPeers mocks base method.
|
// ConsensusPeers mocks base method.
|
||||||
func (m *MockConfiguration) ConsensusPeers() []string {
|
func (m *MockConfiguration) ConsensusPeers() []string {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user