Merge pull request #6 from anytypeio/async-stream-dial
This commit is contained in:
commit
48e55d4827
@ -1,85 +0,0 @@
|
|||||||
//go:generate mockgen -destination mock_confconnector/mock_confconnector.go github.com/anytypeio/any-sync/commonspace/confconnector ConfConnector
|
|
||||||
package confconnector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/anytypeio/any-sync/net/peer"
|
|
||||||
"github.com/anytypeio/any-sync/net/pool"
|
|
||||||
"github.com/anytypeio/any-sync/nodeconf"
|
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ConfConnector interface {
|
|
||||||
Configuration() nodeconf.Configuration
|
|
||||||
Pool() pool.Pool
|
|
||||||
GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
|
|
||||||
DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type confConnector struct {
|
|
||||||
conf nodeconf.Configuration
|
|
||||||
pool pool.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewConfConnector(conf nodeconf.Configuration, pool pool.Pool) ConfConnector {
|
|
||||||
return &confConnector{conf: conf, pool: pool}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *confConnector) Configuration() nodeconf.Configuration {
|
|
||||||
return s.conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *confConnector) Pool() pool.Pool {
|
|
||||||
return s.pool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) {
|
|
||||||
return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *confConnector) DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) {
|
|
||||||
return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *confConnector) connectOneOrMany(
|
|
||||||
ctx context.Context,
|
|
||||||
spaceId string,
|
|
||||||
activeNodeIds []string,
|
|
||||||
connectOne func(context.Context, string) (peer.Peer, error)) (peers []peer.Peer, err error) {
|
|
||||||
var (
|
|
||||||
inactiveNodeIds []string
|
|
||||||
allNodes = s.conf.NodeIds(spaceId)
|
|
||||||
)
|
|
||||||
for _, id := range allNodes {
|
|
||||||
if !slices.Contains(activeNodeIds, id) {
|
|
||||||
inactiveNodeIds = append(inactiveNodeIds, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.conf.IsResponsible(spaceId) {
|
|
||||||
for _, id := range inactiveNodeIds {
|
|
||||||
var p peer.Peer
|
|
||||||
p, err = connectOne(ctx, id)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
peers = append(peers, p)
|
|
||||||
}
|
|
||||||
} else if len(activeNodeIds) == 0 {
|
|
||||||
// that means that all connected ids
|
|
||||||
var p peer.Peer
|
|
||||||
p, err = s.pool.GetOneOf(ctx, allNodes)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we are dialling someone, we want to dial to the same peer which we cached
|
|
||||||
// thus communication through streams and through diff will go to the same node
|
|
||||||
p, err = connectOne(ctx, p.Id())
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
peers = []peer.Peer{p}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
@ -1,96 +0,0 @@
|
|||||||
// Code generated by MockGen. DO NOT EDIT.
|
|
||||||
// Source: github.com/anytypeio/any-sync/commonspace/confconnector (interfaces: ConfConnector)
|
|
||||||
|
|
||||||
// Package mock_confconnector is a generated GoMock package.
|
|
||||||
package mock_confconnector
|
|
||||||
|
|
||||||
import (
|
|
||||||
context "context"
|
|
||||||
reflect "reflect"
|
|
||||||
|
|
||||||
peer "github.com/anytypeio/any-sync/net/peer"
|
|
||||||
pool "github.com/anytypeio/any-sync/net/pool"
|
|
||||||
nodeconf "github.com/anytypeio/any-sync/nodeconf"
|
|
||||||
gomock "github.com/golang/mock/gomock"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MockConfConnector is a mock of ConfConnector interface.
|
|
||||||
type MockConfConnector struct {
|
|
||||||
ctrl *gomock.Controller
|
|
||||||
recorder *MockConfConnectorMockRecorder
|
|
||||||
}
|
|
||||||
|
|
||||||
// MockConfConnectorMockRecorder is the mock recorder for MockConfConnector.
|
|
||||||
type MockConfConnectorMockRecorder struct {
|
|
||||||
mock *MockConfConnector
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewMockConfConnector creates a new mock instance.
|
|
||||||
func NewMockConfConnector(ctrl *gomock.Controller) *MockConfConnector {
|
|
||||||
mock := &MockConfConnector{ctrl: ctrl}
|
|
||||||
mock.recorder = &MockConfConnectorMockRecorder{mock}
|
|
||||||
return mock
|
|
||||||
}
|
|
||||||
|
|
||||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
|
||||||
func (m *MockConfConnector) EXPECT() *MockConfConnectorMockRecorder {
|
|
||||||
return m.recorder
|
|
||||||
}
|
|
||||||
|
|
||||||
// Configuration mocks base method.
|
|
||||||
func (m *MockConfConnector) Configuration() nodeconf.Configuration {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
ret := m.ctrl.Call(m, "Configuration")
|
|
||||||
ret0, _ := ret[0].(nodeconf.Configuration)
|
|
||||||
return ret0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Configuration indicates an expected call of Configuration.
|
|
||||||
func (mr *MockConfConnectorMockRecorder) Configuration() *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configuration", reflect.TypeOf((*MockConfConnector)(nil).Configuration))
|
|
||||||
}
|
|
||||||
|
|
||||||
// DialInactiveResponsiblePeers mocks base method.
|
|
||||||
func (m *MockConfConnector) DialInactiveResponsiblePeers(arg0 context.Context, arg1 string, arg2 []string) ([]peer.Peer, error) {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
ret := m.ctrl.Call(m, "DialInactiveResponsiblePeers", arg0, arg1, arg2)
|
|
||||||
ret0, _ := ret[0].([]peer.Peer)
|
|
||||||
ret1, _ := ret[1].(error)
|
|
||||||
return ret0, ret1
|
|
||||||
}
|
|
||||||
|
|
||||||
// DialInactiveResponsiblePeers indicates an expected call of DialInactiveResponsiblePeers.
|
|
||||||
func (mr *MockConfConnectorMockRecorder) DialInactiveResponsiblePeers(arg0, arg1, arg2 interface{}) *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialInactiveResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialInactiveResponsiblePeers), arg0, arg1, arg2)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetResponsiblePeers mocks base method.
|
|
||||||
func (m *MockConfConnector) GetResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0, arg1)
|
|
||||||
ret0, _ := ret[0].([]peer.Peer)
|
|
||||||
ret1, _ := ret[1].(error)
|
|
||||||
return ret0, ret1
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetResponsiblePeers indicates an expected call of GetResponsiblePeers.
|
|
||||||
func (mr *MockConfConnectorMockRecorder) GetResponsiblePeers(arg0, arg1 interface{}) *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).GetResponsiblePeers), arg0, arg1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pool mocks base method.
|
|
||||||
func (m *MockConfConnector) Pool() pool.Pool {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
ret := m.ctrl.Call(m, "Pool")
|
|
||||||
ret0, _ := ret[0].(pool.Pool)
|
|
||||||
return ret0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pool indicates an expected call of Pool.
|
|
||||||
func (mr *MockConfConnectorMockRecorder) Pool() *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pool", reflect.TypeOf((*MockConfConnector)(nil).Pool))
|
|
||||||
}
|
|
||||||
@ -5,9 +5,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/anytypeio/any-sync/app/ldiff"
|
"github.com/anytypeio/any-sync/app/ldiff"
|
||||||
"github.com/anytypeio/any-sync/app/logger"
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
|
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||||
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
@ -28,7 +28,7 @@ type DiffSyncer interface {
|
|||||||
func newDiffSyncer(
|
func newDiffSyncer(
|
||||||
spaceId string,
|
spaceId string,
|
||||||
diff ldiff.Diff,
|
diff ldiff.Diff,
|
||||||
confConnector confconnector.ConfConnector,
|
peerManager peermanager.PeerManager,
|
||||||
cache treegetter.TreeGetter,
|
cache treegetter.TreeGetter,
|
||||||
storage spacestorage.SpaceStorage,
|
storage spacestorage.SpaceStorage,
|
||||||
clientFactory spacesyncproto.ClientFactory,
|
clientFactory spacesyncproto.ClientFactory,
|
||||||
@ -39,7 +39,7 @@ func newDiffSyncer(
|
|||||||
spaceId: spaceId,
|
spaceId: spaceId,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
storage: storage,
|
storage: storage,
|
||||||
confConnector: confConnector,
|
peerManager: peerManager,
|
||||||
clientFactory: clientFactory,
|
clientFactory: clientFactory,
|
||||||
log: log,
|
log: log,
|
||||||
syncStatus: syncStatus,
|
syncStatus: syncStatus,
|
||||||
@ -49,7 +49,7 @@ func newDiffSyncer(
|
|||||||
type diffSyncer struct {
|
type diffSyncer struct {
|
||||||
spaceId string
|
spaceId string
|
||||||
diff ldiff.Diff
|
diff ldiff.Diff
|
||||||
confConnector confconnector.ConfConnector
|
peerManager peermanager.PeerManager
|
||||||
cache treegetter.TreeGetter
|
cache treegetter.TreeGetter
|
||||||
storage spacestorage.SpaceStorage
|
storage spacestorage.SpaceStorage
|
||||||
clientFactory spacesyncproto.ClientFactory
|
clientFactory spacesyncproto.ClientFactory
|
||||||
@ -88,7 +88,7 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) {
|
|||||||
func (d *diffSyncer) Sync(ctx context.Context) error {
|
func (d *diffSyncer) Sync(ctx context.Context) error {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
// diffing with responsible peers according to configuration
|
// diffing with responsible peers according to configuration
|
||||||
peers, err := d.confConnector.GetResponsiblePeers(ctx, d.spaceId)
|
peers, err := d.peerManager.GetResponsiblePeers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,12 +6,12 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/app/ldiff"
|
"github.com/anytypeio/any-sync/app/ldiff"
|
||||||
"github.com/anytypeio/any-sync/app/ldiff/mock_ldiff"
|
"github.com/anytypeio/any-sync/app/ldiff/mock_ldiff"
|
||||||
"github.com/anytypeio/any-sync/app/logger"
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"github.com/anytypeio/any-sync/commonspace/confconnector/mock_confconnector"
|
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
|
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage"
|
"github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage"
|
mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter"
|
"github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/peermanager/mock_peermanager"
|
||||||
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate/mock_deletionstate"
|
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate/mock_deletionstate"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacestorage/mock_spacestorage"
|
"github.com/anytypeio/any-sync/commonspace/spacestorage/mock_spacestorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
@ -99,7 +99,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
defer ctrl.Finish()
|
defer ctrl.Finish()
|
||||||
|
|
||||||
diffMock := mock_ldiff.NewMockDiff(ctrl)
|
diffMock := mock_ldiff.NewMockDiff(ctrl)
|
||||||
connectorMock := mock_confconnector.NewMockConfConnector(ctrl)
|
peerManagerMock := mock_peermanager.NewMockPeerManager(ctrl)
|
||||||
cacheMock := mock_treegetter.NewMockTreeGetter(ctrl)
|
cacheMock := mock_treegetter.NewMockTreeGetter(ctrl)
|
||||||
stMock := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
stMock := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
||||||
clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl)
|
clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl)
|
||||||
@ -110,13 +110,13 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
spaceId := "spaceId"
|
spaceId := "spaceId"
|
||||||
aclRootId := "aclRootId"
|
aclRootId := "aclRootId"
|
||||||
l := logger.NewNamed(spaceId)
|
l := logger.NewNamed(spaceId)
|
||||||
diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l)
|
diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l)
|
||||||
delState.EXPECT().AddObserver(gomock.Any())
|
delState.EXPECT().AddObserver(gomock.Any())
|
||||||
diffSyncer.Init(delState)
|
diffSyncer.Init(delState)
|
||||||
|
|
||||||
t.Run("diff syncer sync", func(t *testing.T) {
|
t.Run("diff syncer sync", func(t *testing.T) {
|
||||||
connectorMock.EXPECT().
|
peerManagerMock.EXPECT().
|
||||||
GetResponsiblePeers(gomock.Any(), spaceId).
|
GetResponsiblePeers(gomock.Any()).
|
||||||
Return([]peer.Peer{mockPeer{}}, nil)
|
Return([]peer.Peer{mockPeer{}}, nil)
|
||||||
diffMock.EXPECT().
|
diffMock.EXPECT().
|
||||||
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
||||||
@ -131,8 +131,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("diff syncer sync conf error", func(t *testing.T) {
|
t.Run("diff syncer sync conf error", func(t *testing.T) {
|
||||||
connectorMock.EXPECT().
|
peerManagerMock.EXPECT().
|
||||||
GetResponsiblePeers(gomock.Any(), spaceId).
|
GetResponsiblePeers(gomock.Any()).
|
||||||
Return(nil, fmt.Errorf("some error"))
|
Return(nil, fmt.Errorf("some error"))
|
||||||
|
|
||||||
require.Error(t, diffSyncer.Sync(ctx))
|
require.Error(t, diffSyncer.Sync(ctx))
|
||||||
@ -173,8 +173,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{}
|
spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{}
|
||||||
spaceSettingsId := "spaceSettingsId"
|
spaceSettingsId := "spaceSettingsId"
|
||||||
|
|
||||||
connectorMock.EXPECT().
|
peerManagerMock.EXPECT().
|
||||||
GetResponsiblePeers(gomock.Any(), spaceId).
|
GetResponsiblePeers(gomock.Any()).
|
||||||
Return([]peer.Peer{mockPeer{}}, nil)
|
Return([]peer.Peer{mockPeer{}}, nil)
|
||||||
diffMock.EXPECT().
|
diffMock.EXPECT().
|
||||||
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
||||||
@ -197,8 +197,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("diff syncer sync other error", func(t *testing.T) {
|
t.Run("diff syncer sync other error", func(t *testing.T) {
|
||||||
connectorMock.EXPECT().
|
peerManagerMock.EXPECT().
|
||||||
GetResponsiblePeers(gomock.Any(), spaceId).
|
GetResponsiblePeers(gomock.Any()).
|
||||||
Return([]peer.Peer{mockPeer{}}, nil)
|
Return([]peer.Peer{mockPeer{}}, nil)
|
||||||
diffMock.EXPECT().
|
diffMock.EXPECT().
|
||||||
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
||||||
|
|||||||
@ -5,8 +5,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"github.com/anytypeio/any-sync/app/ldiff"
|
"github.com/anytypeio/any-sync/app/ldiff"
|
||||||
"github.com/anytypeio/any-sync/app/logger"
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||||
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
@ -49,7 +49,7 @@ func NewHeadSync(
|
|||||||
spaceId string,
|
spaceId string,
|
||||||
syncPeriod int,
|
syncPeriod int,
|
||||||
storage spacestorage.SpaceStorage,
|
storage spacestorage.SpaceStorage,
|
||||||
confConnector confconnector.ConfConnector,
|
peerManager peermanager.PeerManager,
|
||||||
cache treegetter.TreeGetter,
|
cache treegetter.TreeGetter,
|
||||||
syncStatus syncstatus.StatusUpdater,
|
syncStatus syncstatus.StatusUpdater,
|
||||||
log logger.CtxLogger) HeadSync {
|
log logger.CtxLogger) HeadSync {
|
||||||
@ -57,7 +57,7 @@ func NewHeadSync(
|
|||||||
diff := ldiff.New(16, 16)
|
diff := ldiff.New(16, 16)
|
||||||
l := log.With(zap.String("spaceId", spaceId))
|
l := log.With(zap.String("spaceId", spaceId))
|
||||||
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
||||||
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l)
|
syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l)
|
||||||
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l)
|
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l)
|
||||||
|
|
||||||
return &headSync{
|
return &headSync{
|
||||||
|
|||||||
@ -3,7 +3,6 @@ package synctree
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
@ -21,7 +20,6 @@ type syncClient struct {
|
|||||||
objectsync.MessagePool
|
objectsync.MessagePool
|
||||||
RequestFactory
|
RequestFactory
|
||||||
spaceId string
|
spaceId string
|
||||||
connector confconnector.ConfConnector
|
|
||||||
configuration nodeconf.Configuration
|
configuration nodeconf.Configuration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/anytypeio/any-sync/app/ocache"
|
"github.com/anytypeio/any-sync/app/ocache"
|
||||||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -14,17 +15,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StreamManager interface {
|
|
||||||
SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
|
||||||
SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
|
||||||
Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// MessagePool can be made generic to work with different streams
|
// MessagePool can be made generic to work with different streams
|
||||||
type MessagePool interface {
|
type MessagePool interface {
|
||||||
ocache.ObjectLastUsage
|
ocache.ObjectLastUsage
|
||||||
synchandler.SyncHandler
|
synchandler.SyncHandler
|
||||||
StreamManager
|
peermanager.PeerManager
|
||||||
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,7 +31,7 @@ type responseWaiter struct {
|
|||||||
|
|
||||||
type messagePool struct {
|
type messagePool struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
StreamManager
|
peermanager.PeerManager
|
||||||
messageHandler MessageHandler
|
messageHandler MessageHandler
|
||||||
waiters map[string]responseWaiter
|
waiters map[string]responseWaiter
|
||||||
waitersMx sync.Mutex
|
waitersMx sync.Mutex
|
||||||
@ -44,9 +39,9 @@ type messagePool struct {
|
|||||||
lastUsage atomic.Int64
|
lastUsage atomic.Int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool {
|
func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageHandler) MessagePool {
|
||||||
s := &messagePool{
|
s := &messagePool{
|
||||||
StreamManager: streamManager,
|
PeerManager: peerManager,
|
||||||
messageHandler: messageHandler,
|
messageHandler: messageHandler,
|
||||||
waiters: make(map[string]responseWaiter),
|
waiters: make(map[string]responseWaiter),
|
||||||
}
|
}
|
||||||
@ -88,16 +83,16 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
|||||||
|
|
||||||
func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
return s.StreamManager.SendPeer(ctx, peerId, msg)
|
return s.PeerManager.SendPeer(ctx, peerId, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
return s.StreamManager.SendResponsible(ctx, msg)
|
return s.PeerManager.SendResponsible(ctx, msg)
|
||||||
}
|
}
|
||||||
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
return s.StreamManager.Broadcast(ctx, msg)
|
return s.PeerManager.Broadcast(ctx, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/app/ocache"
|
"github.com/anytypeio/any-sync/app/ocache"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
||||||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"time"
|
"time"
|
||||||
@ -34,7 +35,7 @@ type objectSync struct {
|
|||||||
|
|
||||||
func NewObjectSync(
|
func NewObjectSync(
|
||||||
spaceId string,
|
spaceId string,
|
||||||
streamManager StreamManager,
|
peerManager peermanager.PeerManager,
|
||||||
objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync {
|
objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync {
|
||||||
syncCtx, cancel := context.WithCancel(context.Background())
|
syncCtx, cancel := context.WithCancel(context.Background())
|
||||||
os := newObjectSync(
|
os := newObjectSync(
|
||||||
@ -42,7 +43,7 @@ func NewObjectSync(
|
|||||||
objectGetter,
|
objectGetter,
|
||||||
syncCtx,
|
syncCtx,
|
||||||
cancel)
|
cancel)
|
||||||
msgPool := newMessagePool(streamManager, os.handleMessage)
|
msgPool := newMessagePool(peerManager, os.handleMessage)
|
||||||
os.messagePool = msgPool
|
os.messagePool = msgPool
|
||||||
return os
|
return os
|
||||||
}
|
}
|
||||||
|
|||||||
94
commonspace/peermanager/mock_peermanager/mock_peermanager.go
Normal file
94
commonspace/peermanager/mock_peermanager/mock_peermanager.go
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
|
// Source: github.com/anytypeio/any-sync/commonspace/peermanager (interfaces: PeerManager)
|
||||||
|
|
||||||
|
// Package mock_peermanager is a generated GoMock package.
|
||||||
|
package mock_peermanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
reflect "reflect"
|
||||||
|
|
||||||
|
spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
|
peer "github.com/anytypeio/any-sync/net/peer"
|
||||||
|
gomock "github.com/golang/mock/gomock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MockPeerManager is a mock of PeerManager interface.
|
||||||
|
type MockPeerManager struct {
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
recorder *MockPeerManagerMockRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager.
|
||||||
|
type MockPeerManagerMockRecorder struct {
|
||||||
|
mock *MockPeerManager
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMockPeerManager creates a new mock instance.
|
||||||
|
func NewMockPeerManager(ctrl *gomock.Controller) *MockPeerManager {
|
||||||
|
mock := &MockPeerManager{ctrl: ctrl}
|
||||||
|
mock.recorder = &MockPeerManagerMockRecorder{mock}
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||||
|
func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder {
|
||||||
|
return m.recorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast mocks base method.
|
||||||
|
func (m *MockPeerManager) Broadcast(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Broadcast", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast indicates an expected call of Broadcast.
|
||||||
|
func (mr *MockPeerManagerMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockPeerManager)(nil).Broadcast), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetResponsiblePeers mocks base method.
|
||||||
|
func (m *MockPeerManager) GetResponsiblePeers(arg0 context.Context) ([]peer.Peer, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0)
|
||||||
|
ret0, _ := ret[0].([]peer.Peer)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetResponsiblePeers indicates an expected call of GetResponsiblePeers.
|
||||||
|
func (mr *MockPeerManagerMockRecorder) GetResponsiblePeers(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockPeerManager)(nil).GetResponsiblePeers), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendPeer mocks base method.
|
||||||
|
func (m *MockPeerManager) SendPeer(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "SendPeer", arg0, arg1, arg2)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendPeer indicates an expected call of SendPeer.
|
||||||
|
func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendResponsible mocks base method.
|
||||||
|
func (m *MockPeerManager) SendResponsible(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "SendResponsible", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendResponsible indicates an expected call of SendResponsible.
|
||||||
|
func (mr *MockPeerManagerMockRecorder) SendResponsible(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendResponsible", reflect.TypeOf((*MockPeerManager)(nil).SendResponsible), arg0, arg1)
|
||||||
|
}
|
||||||
27
commonspace/peermanager/peermanager.go
Normal file
27
commonspace/peermanager/peermanager.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
//go:generate mockgen -destination mock_peermanager/mock_peermanager.go github.com/anytypeio/any-sync/commonspace/peermanager PeerManager
|
||||||
|
package peermanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/anytypeio/any-sync/app"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
const CName = "common.commonspace.peermanager"
|
||||||
|
|
||||||
|
type PeerManager interface {
|
||||||
|
// SendPeer sends a message to a stream by peerId
|
||||||
|
SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
||||||
|
// SendResponsible sends a message to responsible peers streams
|
||||||
|
SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
||||||
|
// Broadcast sends a message to all subscribed peers
|
||||||
|
Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
||||||
|
// GetResponsiblePeers dials or gets from cache responsible peers to unary operations
|
||||||
|
GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeerManagerProvider interface {
|
||||||
|
app.Component
|
||||||
|
NewPeerManager(ctx context.Context, spaceId string) (sm PeerManager, err error)
|
||||||
|
}
|
||||||
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
|
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
|
||||||
"github.com/anytypeio/any-sync/util/multiqueue"
|
"github.com/anytypeio/any-sync/util/multiqueue"
|
||||||
"github.com/anytypeio/any-sync/util/slice"
|
"github.com/anytypeio/any-sync/util/slice"
|
||||||
|
"github.com/cheggaaa/mb/v3"
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -359,7 +360,13 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error)
|
|||||||
_ = s.handleQueue.CloseThread(threadId)
|
_ = s.handleQueue.CloseThread(threadId)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
return s.handleQueue.Add(ctx, threadId, hm)
|
err = s.handleQueue.Add(ctx, threadId, hm)
|
||||||
|
if err == mb.ErrOverflowed {
|
||||||
|
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId))
|
||||||
|
// skip overflowed error
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *space) handleMessage(msg HandleMessage) {
|
func (s *space) handleMessage(msg HandleMessage) {
|
||||||
|
|||||||
@ -5,15 +5,14 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/accountservice"
|
"github.com/anytypeio/any-sync/accountservice"
|
||||||
"github.com/anytypeio/any-sync/app"
|
"github.com/anytypeio/any-sync/app"
|
||||||
"github.com/anytypeio/any-sync/app/logger"
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
|
||||||
"github.com/anytypeio/any-sync/commonspace/headsync"
|
"github.com/anytypeio/any-sync/commonspace/headsync"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
|
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
"github.com/anytypeio/any-sync/commonspace/streammanager"
|
|
||||||
"github.com/anytypeio/any-sync/commonspace/syncstatus"
|
"github.com/anytypeio/any-sync/commonspace/syncstatus"
|
||||||
"github.com/anytypeio/any-sync/net/peer"
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
"github.com/anytypeio/any-sync/net/pool"
|
"github.com/anytypeio/any-sync/net/pool"
|
||||||
@ -40,13 +39,13 @@ type SpaceService interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type spaceService struct {
|
type spaceService struct {
|
||||||
config Config
|
config Config
|
||||||
account accountservice.Service
|
account accountservice.Service
|
||||||
configurationService nodeconf.Service
|
configurationService nodeconf.Service
|
||||||
storageProvider spacestorage.SpaceStorageProvider
|
storageProvider spacestorage.SpaceStorageProvider
|
||||||
streamManagerProvider streammanager.StreamManagerProvider
|
peermanagerProvider peermanager.PeerManagerProvider
|
||||||
treeGetter treegetter.TreeGetter
|
treeGetter treegetter.TreeGetter
|
||||||
pool pool.Pool
|
pool pool.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *spaceService) Init(a *app.App) (err error) {
|
func (s *spaceService) Init(a *app.App) (err error) {
|
||||||
@ -55,7 +54,7 @@ func (s *spaceService) Init(a *app.App) (err error) {
|
|||||||
s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
|
s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
|
||||||
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
||||||
s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter)
|
s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter)
|
||||||
s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider)
|
s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider)
|
||||||
s.pool = a.MustComponent(pool.CName).(pool.Pool)
|
s.pool = a.MustComponent(pool.CName).(pool.Pool)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -117,7 +116,6 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
lastConfiguration := s.configurationService.GetLast()
|
lastConfiguration := s.configurationService.GetLast()
|
||||||
confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool)
|
|
||||||
getter := newCommonGetter(st.Id(), s.treeGetter)
|
getter := newCommonGetter(st.Id(), s.treeGetter)
|
||||||
syncStatus := syncstatus.NewNoOpSyncStatus()
|
syncStatus := syncstatus.NewNoOpSyncStatus()
|
||||||
// this will work only for clients, not the best solution, but...
|
// this will work only for clients, not the best solution, but...
|
||||||
@ -126,14 +124,13 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
|||||||
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
|
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
|
||||||
}
|
}
|
||||||
|
|
||||||
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, getter, syncStatus, log)
|
peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id)
|
||||||
|
|
||||||
streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
objectSync := objectsync.NewObjectSync(id, streamManager, getter)
|
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, peerManager, getter, syncStatus, log)
|
||||||
|
objectSync := objectsync.NewObjectSync(id, peerManager, getter)
|
||||||
sp := &space{
|
sp := &space{
|
||||||
id: id,
|
id: id,
|
||||||
objectSync: objectSync,
|
objectSync: objectSync,
|
||||||
|
|||||||
@ -1,14 +0,0 @@
|
|||||||
package streammanager
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/anytypeio/any-sync/app"
|
|
||||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
|
||||||
)
|
|
||||||
|
|
||||||
const CName = "common.commonspace.streammanager"
|
|
||||||
|
|
||||||
type StreamManagerProvider interface {
|
|
||||||
app.Component
|
|
||||||
NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error)
|
|
||||||
}
|
|
||||||
@ -6,11 +6,11 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// newStreamSender creates new sendPool
|
// newExecPool creates new execPool
|
||||||
// workers - how many processes will execute tasks
|
// workers - how many processes will execute tasks
|
||||||
// maxSize - limit for queue size
|
// maxSize - limit for queue size
|
||||||
func newStreamSender(workers, maxSize int) *sendPool {
|
func newExecPool(workers, maxSize int) *execPool {
|
||||||
ss := &sendPool{
|
ss := &execPool{
|
||||||
batch: mb.New[func()](maxSize),
|
batch: mb.New[func()](maxSize),
|
||||||
}
|
}
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
@ -19,16 +19,16 @@ func newStreamSender(workers, maxSize int) *sendPool {
|
|||||||
return ss
|
return ss
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendPool needed for parallel execution of the incoming send tasks
|
// execPool needed for parallel execution of the incoming send tasks
|
||||||
type sendPool struct {
|
type execPool struct {
|
||||||
batch *mb.MB[func()]
|
batch *mb.MB[func()]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *sendPool) Add(ctx context.Context, f ...func()) (err error) {
|
func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) {
|
||||||
return ss.batch.Add(ctx, f...)
|
return ss.batch.Add(ctx, f...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *sendPool) sendLoop() {
|
func (ss *execPool) sendLoop() {
|
||||||
for {
|
for {
|
||||||
f, err := ss.batch.WaitOne(context.Background())
|
f, err := ss.batch.WaitOne(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -39,6 +39,6 @@ func (ss *sendPool) sendLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *sendPool) Close() (err error) {
|
func (ss *execPool) Close() (err error) {
|
||||||
return ss.batch.Close()
|
return ss.batch.Close()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,6 +21,9 @@ type StreamHandler interface {
|
|||||||
NewReadMessage() drpc.Message
|
NewReadMessage() drpc.Message
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PeerGetter should dial or return cached peers
|
||||||
|
type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)
|
||||||
|
|
||||||
// StreamPool keeps and read streams
|
// StreamPool keeps and read streams
|
||||||
type StreamPool interface {
|
type StreamPool interface {
|
||||||
// AddStream adds new outgoing stream into the pool
|
// AddStream adds new outgoing stream into the pool
|
||||||
@ -28,7 +31,7 @@ type StreamPool interface {
|
|||||||
// ReadStream adds new incoming stream and synchronously read it
|
// ReadStream adds new incoming stream and synchronously read it
|
||||||
ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error)
|
ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error)
|
||||||
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
|
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
|
||||||
Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error)
|
Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
|
||||||
// SendById sends a message to given peerIds. Works only if stream exists
|
// SendById sends a message to given peerIds. Works only if stream exists
|
||||||
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
|
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
|
||||||
// Broadcast sends a message to all peers with given tags. Works async.
|
// Broadcast sends a message to all peers with given tags. Works async.
|
||||||
@ -47,8 +50,9 @@ type streamPool struct {
|
|||||||
streamIdsByTag map[string][]uint32
|
streamIdsByTag map[string][]uint32
|
||||||
streams map[uint32]*stream
|
streams map[uint32]*stream
|
||||||
opening map[string]*openingProcess
|
opening map[string]*openingProcess
|
||||||
exec *sendPool
|
exec *execPool
|
||||||
mu sync.RWMutex
|
dial *execPool
|
||||||
|
mu sync.Mutex
|
||||||
lastStreamId uint32
|
lastStreamId uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,11 +60,6 @@ type openingProcess struct {
|
|||||||
ch chan struct{}
|
ch chan struct{}
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
type handleMessage struct {
|
|
||||||
ctx context.Context
|
|
||||||
msg drpc.Message
|
|
||||||
peerId string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error {
|
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error {
|
||||||
st := s.addStream(peerId, drpcStream, tags...)
|
st := s.addStream(peerId, drpcStream, tags...)
|
||||||
@ -95,7 +94,7 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st
|
|||||||
return st
|
return st
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) {
|
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) {
|
||||||
var sendOneFunc = func(sp peer.Peer) func() {
|
var sendOneFunc = func(sp peer.Peer) func() {
|
||||||
return func() {
|
return func() {
|
||||||
if e := s.sendOne(ctx, sp, msg); e != nil {
|
if e := s.sendOne(ctx, sp, msg); e != nil {
|
||||||
@ -105,13 +104,17 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return s.dial.Add(ctx, func() {
|
||||||
for _, p := range peers {
|
peers, dialErr := peerGetter(ctx)
|
||||||
if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil {
|
if dialErr != nil {
|
||||||
return
|
log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr))
|
||||||
}
|
}
|
||||||
}
|
for _, p := range peers {
|
||||||
return
|
if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {
|
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {
|
||||||
@ -329,6 +332,9 @@ func (s *streamPool) removeStream(streamId uint32) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) Close() (err error) {
|
func (s *streamPool) Close() (err error) {
|
||||||
|
if e := s.dial.Close(); e != nil {
|
||||||
|
log.Warn("dial queue close error", zap.Error(e))
|
||||||
|
}
|
||||||
return s.exec.Close()
|
return s.exec.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -66,7 +66,9 @@ func TestStreamPool_AddStream(t *testing.T) {
|
|||||||
defer s1.Close()
|
defer s1.Close()
|
||||||
fx.AddStream("p1", s1, "space1", "common")
|
fx.AddStream("p1", s1, "space1", "common")
|
||||||
|
|
||||||
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, p1))
|
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, func(ctx context.Context) (peers []peer.Peer, err error) {
|
||||||
|
return []peer.Peer{p1}, nil
|
||||||
|
}))
|
||||||
var msg *testservice.StreamMessage
|
var msg *testservice.StreamMessage
|
||||||
select {
|
select {
|
||||||
case msg = <-fx.tsh.receiveCh:
|
case msg = <-fx.tsh.receiveCh:
|
||||||
@ -85,7 +87,9 @@ func TestStreamPool_Send(t *testing.T) {
|
|||||||
p, err := fx.tp.Dial(ctx, "p1")
|
p, err := fx.tp.Dial(ctx, "p1")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p))
|
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, func(ctx context.Context) (peers []peer.Peer, err error) {
|
||||||
|
return []peer.Peer{p}, nil
|
||||||
|
}))
|
||||||
|
|
||||||
var msg *testservice.StreamMessage
|
var msg *testservice.StreamMessage
|
||||||
select {
|
select {
|
||||||
@ -107,7 +111,9 @@ func TestStreamPool_Send(t *testing.T) {
|
|||||||
var numMsgs = 5
|
var numMsgs = 5
|
||||||
|
|
||||||
for i := 0; i < numMsgs; i++ {
|
for i := 0; i < numMsgs; i++ {
|
||||||
go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p))
|
go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, func(ctx context.Context) (peers []peer.Peer, err error) {
|
||||||
|
return []peer.Peer{p}, nil
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
var msgs []*testservice.StreamMessage
|
var msgs []*testservice.StreamMessage
|
||||||
@ -194,7 +200,12 @@ func newFixture(t *testing.T) *fixture {
|
|||||||
require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh))
|
require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh))
|
||||||
fx.tp = rpctest.NewTestPool().WithServer(ts)
|
fx.tp = rpctest.NewTestPool().WithServer(ts)
|
||||||
fx.th = &testHandler{}
|
fx.th = &testHandler{}
|
||||||
fx.StreamPool = New().NewStreamPool(fx.th)
|
fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{
|
||||||
|
SendQueueWorkers: 4,
|
||||||
|
SendQueueSize: 10,
|
||||||
|
DialQueueWorkers: 1,
|
||||||
|
DialQueueSize: 10,
|
||||||
|
})
|
||||||
return fx
|
return fx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -13,22 +13,34 @@ func New() Service {
|
|||||||
return new(service)
|
return new(service)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StreamConfig struct {
|
||||||
|
// SendQueueWorkers how many workers will write message to streams
|
||||||
|
SendQueueWorkers int
|
||||||
|
// SendQueueSize size of the queue for write
|
||||||
|
SendQueueSize int
|
||||||
|
// DialQueueWorkers how many workers will dial to peers
|
||||||
|
DialQueueWorkers int
|
||||||
|
// DialQueueSize size of the dial queue
|
||||||
|
DialQueueSize int
|
||||||
|
}
|
||||||
|
|
||||||
type Service interface {
|
type Service interface {
|
||||||
NewStreamPool(h StreamHandler) StreamPool
|
NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool
|
||||||
app.Component
|
app.Component
|
||||||
}
|
}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) NewStreamPool(h StreamHandler) StreamPool {
|
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
|
||||||
sp := &streamPool{
|
sp := &streamPool{
|
||||||
handler: h,
|
handler: h,
|
||||||
streamIdsByPeer: map[string][]uint32{},
|
streamIdsByPeer: map[string][]uint32{},
|
||||||
streamIdsByTag: map[string][]uint32{},
|
streamIdsByTag: map[string][]uint32{},
|
||||||
streams: map[uint32]*stream{},
|
streams: map[uint32]*stream{},
|
||||||
opening: map[string]*openingProcess{},
|
opening: map[string]*openingProcess{},
|
||||||
exec: newStreamSender(10, 100),
|
exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize),
|
||||||
|
dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize),
|
||||||
}
|
}
|
||||||
return sp
|
return sp
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user