streammanger -> peermanager, remove confconnector

This commit is contained in:
Sergey Cherepanov 2023-01-30 19:18:53 +03:00 committed by Mikhail Iudin
parent 854d832c81
commit 6cb9fc5092
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
12 changed files with 165 additions and 249 deletions

View File

@ -1,85 +0,0 @@
//go:generate mockgen -destination mock_confconnector/mock_confconnector.go github.com/anytypeio/any-sync/commonspace/confconnector ConfConnector
package confconnector
import (
"context"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool"
"github.com/anytypeio/any-sync/nodeconf"
"golang.org/x/exp/slices"
)
type ConfConnector interface {
Configuration() nodeconf.Configuration
Pool() pool.Pool
GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error)
}
type confConnector struct {
conf nodeconf.Configuration
pool pool.Pool
}
func NewConfConnector(conf nodeconf.Configuration, pool pool.Pool) ConfConnector {
return &confConnector{conf: conf, pool: pool}
}
func (s *confConnector) Configuration() nodeconf.Configuration {
return s.conf
}
func (s *confConnector) Pool() pool.Pool {
return s.pool
}
func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) {
return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get)
}
func (s *confConnector) DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) {
return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial)
}
func (s *confConnector) connectOneOrMany(
ctx context.Context,
spaceId string,
activeNodeIds []string,
connectOne func(context.Context, string) (peer.Peer, error)) (peers []peer.Peer, err error) {
var (
inactiveNodeIds []string
allNodes = s.conf.NodeIds(spaceId)
)
for _, id := range allNodes {
if !slices.Contains(activeNodeIds, id) {
inactiveNodeIds = append(inactiveNodeIds, id)
}
}
if s.conf.IsResponsible(spaceId) {
for _, id := range inactiveNodeIds {
var p peer.Peer
p, err = connectOne(ctx, id)
if err != nil {
continue
}
peers = append(peers, p)
}
} else if len(activeNodeIds) == 0 {
// that means that all connected ids
var p peer.Peer
p, err = s.pool.GetOneOf(ctx, allNodes)
if err != nil {
return
}
// if we are dialling someone, we want to dial to the same peer which we cached
// thus communication through streams and through diff will go to the same node
p, err = connectOne(ctx, p.Id())
if err != nil {
return
}
peers = []peer.Peer{p}
}
return
}

View File

@ -1,96 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/any-sync/commonspace/confconnector (interfaces: ConfConnector)
// Package mock_confconnector is a generated GoMock package.
package mock_confconnector
import (
context "context"
reflect "reflect"
peer "github.com/anytypeio/any-sync/net/peer"
pool "github.com/anytypeio/any-sync/net/pool"
nodeconf "github.com/anytypeio/any-sync/nodeconf"
gomock "github.com/golang/mock/gomock"
)
// MockConfConnector is a mock of ConfConnector interface.
type MockConfConnector struct {
ctrl *gomock.Controller
recorder *MockConfConnectorMockRecorder
}
// MockConfConnectorMockRecorder is the mock recorder for MockConfConnector.
type MockConfConnectorMockRecorder struct {
mock *MockConfConnector
}
// NewMockConfConnector creates a new mock instance.
func NewMockConfConnector(ctrl *gomock.Controller) *MockConfConnector {
mock := &MockConfConnector{ctrl: ctrl}
mock.recorder = &MockConfConnectorMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockConfConnector) EXPECT() *MockConfConnectorMockRecorder {
return m.recorder
}
// Configuration mocks base method.
func (m *MockConfConnector) Configuration() nodeconf.Configuration {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Configuration")
ret0, _ := ret[0].(nodeconf.Configuration)
return ret0
}
// Configuration indicates an expected call of Configuration.
func (mr *MockConfConnectorMockRecorder) Configuration() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configuration", reflect.TypeOf((*MockConfConnector)(nil).Configuration))
}
// DialInactiveResponsiblePeers mocks base method.
func (m *MockConfConnector) DialInactiveResponsiblePeers(arg0 context.Context, arg1 string, arg2 []string) ([]peer.Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DialInactiveResponsiblePeers", arg0, arg1, arg2)
ret0, _ := ret[0].([]peer.Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DialInactiveResponsiblePeers indicates an expected call of DialInactiveResponsiblePeers.
func (mr *MockConfConnectorMockRecorder) DialInactiveResponsiblePeers(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialInactiveResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialInactiveResponsiblePeers), arg0, arg1, arg2)
}
// GetResponsiblePeers mocks base method.
func (m *MockConfConnector) GetResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0, arg1)
ret0, _ := ret[0].([]peer.Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetResponsiblePeers indicates an expected call of GetResponsiblePeers.
func (mr *MockConfConnectorMockRecorder) GetResponsiblePeers(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).GetResponsiblePeers), arg0, arg1)
}
// Pool mocks base method.
func (m *MockConfConnector) Pool() pool.Pool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Pool")
ret0, _ := ret[0].(pool.Pool)
return ret0
}
// Pool indicates an expected call of Pool.
func (mr *MockConfConnectorMockRecorder) Pool() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pool", reflect.TypeOf((*MockConfConnector)(nil).Pool))
}

View File

@ -5,9 +5,9 @@ import (
"fmt" "fmt"
"github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
"github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -28,7 +28,7 @@ type DiffSyncer interface {
func newDiffSyncer( func newDiffSyncer(
spaceId string, spaceId string,
diff ldiff.Diff, diff ldiff.Diff,
confConnector confconnector.ConfConnector, peerManager peermanager.PeerManager,
cache treegetter.TreeGetter, cache treegetter.TreeGetter,
storage spacestorage.SpaceStorage, storage spacestorage.SpaceStorage,
clientFactory spacesyncproto.ClientFactory, clientFactory spacesyncproto.ClientFactory,
@ -39,7 +39,7 @@ func newDiffSyncer(
spaceId: spaceId, spaceId: spaceId,
cache: cache, cache: cache,
storage: storage, storage: storage,
confConnector: confConnector, peerManager: peerManager,
clientFactory: clientFactory, clientFactory: clientFactory,
log: log, log: log,
syncStatus: syncStatus, syncStatus: syncStatus,
@ -49,7 +49,7 @@ func newDiffSyncer(
type diffSyncer struct { type diffSyncer struct {
spaceId string spaceId string
diff ldiff.Diff diff ldiff.Diff
confConnector confconnector.ConfConnector peerManager peermanager.PeerManager
cache treegetter.TreeGetter cache treegetter.TreeGetter
storage spacestorage.SpaceStorage storage spacestorage.SpaceStorage
clientFactory spacesyncproto.ClientFactory clientFactory spacesyncproto.ClientFactory
@ -88,7 +88,7 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) {
func (d *diffSyncer) Sync(ctx context.Context) error { func (d *diffSyncer) Sync(ctx context.Context) error {
st := time.Now() st := time.Now()
// diffing with responsible peers according to configuration // diffing with responsible peers according to configuration
peers, err := d.confConnector.GetResponsiblePeers(ctx, d.spaceId) peers, err := d.peerManager.GetResponsiblePeers(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -6,12 +6,12 @@ import (
"github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff"
"github.com/anytypeio/any-sync/app/ldiff/mock_ldiff" "github.com/anytypeio/any-sync/app/ldiff/mock_ldiff"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/confconnector/mock_confconnector"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage" "github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage" mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage"
"github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter"
"github.com/anytypeio/any-sync/commonspace/peermanager/mock_peermanager"
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate/mock_deletionstate" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate/mock_deletionstate"
"github.com/anytypeio/any-sync/commonspace/spacestorage/mock_spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage/mock_spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -99,7 +99,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
defer ctrl.Finish() defer ctrl.Finish()
diffMock := mock_ldiff.NewMockDiff(ctrl) diffMock := mock_ldiff.NewMockDiff(ctrl)
connectorMock := mock_confconnector.NewMockConfConnector(ctrl) peerManagerMock := mock_peermanager.NewMockPeerManager(ctrl)
cacheMock := mock_treegetter.NewMockTreeGetter(ctrl) cacheMock := mock_treegetter.NewMockTreeGetter(ctrl)
stMock := mock_spacestorage.NewMockSpaceStorage(ctrl) stMock := mock_spacestorage.NewMockSpaceStorage(ctrl)
clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl) clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl)
@ -110,13 +110,13 @@ func TestDiffSyncer_Sync(t *testing.T) {
spaceId := "spaceId" spaceId := "spaceId"
aclRootId := "aclRootId" aclRootId := "aclRootId"
l := logger.NewNamed(spaceId) l := logger.NewNamed(spaceId)
diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l) diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l)
delState.EXPECT().AddObserver(gomock.Any()) delState.EXPECT().AddObserver(gomock.Any())
diffSyncer.Init(delState) diffSyncer.Init(delState)
t.Run("diff syncer sync", func(t *testing.T) { t.Run("diff syncer sync", func(t *testing.T) {
connectorMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId). GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil) Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT(). diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
@ -131,8 +131,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
}) })
t.Run("diff syncer sync conf error", func(t *testing.T) { t.Run("diff syncer sync conf error", func(t *testing.T) {
connectorMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId). GetResponsiblePeers(gomock.Any()).
Return(nil, fmt.Errorf("some error")) Return(nil, fmt.Errorf("some error"))
require.Error(t, diffSyncer.Sync(ctx)) require.Error(t, diffSyncer.Sync(ctx))
@ -173,8 +173,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{} spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{}
spaceSettingsId := "spaceSettingsId" spaceSettingsId := "spaceSettingsId"
connectorMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId). GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil) Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT(). diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
@ -197,8 +197,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
}) })
t.Run("diff syncer sync other error", func(t *testing.T) { t.Run("diff syncer sync other error", func(t *testing.T) {
connectorMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId). GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil) Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT(). diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).

View File

@ -5,8 +5,8 @@ import (
"context" "context"
"github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -49,7 +49,7 @@ func NewHeadSync(
spaceId string, spaceId string,
syncPeriod int, syncPeriod int,
storage spacestorage.SpaceStorage, storage spacestorage.SpaceStorage,
confConnector confconnector.ConfConnector, peerManager peermanager.PeerManager,
cache treegetter.TreeGetter, cache treegetter.TreeGetter,
syncStatus syncstatus.StatusUpdater, syncStatus syncstatus.StatusUpdater,
log logger.CtxLogger) HeadSync { log logger.CtxLogger) HeadSync {
@ -57,7 +57,7 @@ func NewHeadSync(
diff := ldiff.New(16, 16) diff := ldiff.New(16, 16)
l := log.With(zap.String("spaceId", spaceId)) l := log.With(zap.String("spaceId", spaceId))
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l) syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l)
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l) periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l)
return &headSync{ return &headSync{

View File

@ -3,7 +3,6 @@ package synctree
import ( import (
"context" "context"
"github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -21,7 +20,6 @@ type syncClient struct {
objectsync.MessagePool objectsync.MessagePool
RequestFactory RequestFactory
spaceId string spaceId string
connector confconnector.ConfConnector
configuration nodeconf.Configuration configuration nodeconf.Configuration
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"go.uber.org/zap" "go.uber.org/zap"
"strconv" "strconv"
@ -14,17 +15,11 @@ import (
"time" "time"
) )
type StreamManager interface {
SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
// MessagePool can be made generic to work with different streams // MessagePool can be made generic to work with different streams
type MessagePool interface { type MessagePool interface {
ocache.ObjectLastUsage ocache.ObjectLastUsage
synchandler.SyncHandler synchandler.SyncHandler
StreamManager peermanager.PeerManager
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
} }
@ -36,7 +31,7 @@ type responseWaiter struct {
type messagePool struct { type messagePool struct {
sync.Mutex sync.Mutex
StreamManager peermanager.PeerManager
messageHandler MessageHandler messageHandler MessageHandler
waiters map[string]responseWaiter waiters map[string]responseWaiter
waitersMx sync.Mutex waitersMx sync.Mutex
@ -44,9 +39,9 @@ type messagePool struct {
lastUsage atomic.Int64 lastUsage atomic.Int64
} }
func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool { func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageHandler) MessagePool {
s := &messagePool{ s := &messagePool{
StreamManager: streamManager, PeerManager: peerManager,
messageHandler: messageHandler, messageHandler: messageHandler,
waiters: make(map[string]responseWaiter), waiters: make(map[string]responseWaiter),
} }
@ -88,16 +83,16 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage() s.updateLastUsage()
return s.StreamManager.SendPeer(ctx, peerId, msg) return s.PeerManager.SendPeer(ctx, peerId, msg)
} }
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage() s.updateLastUsage()
return s.StreamManager.SendResponsible(ctx, msg) return s.PeerManager.SendResponsible(ctx, msg)
} }
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage() s.updateLastUsage()
return s.StreamManager.Broadcast(ctx, msg) return s.PeerManager.Broadcast(ctx, msg)
} }
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {

View File

@ -6,6 +6,7 @@ import (
"github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"go.uber.org/zap" "go.uber.org/zap"
"time" "time"
@ -34,7 +35,7 @@ type objectSync struct {
func NewObjectSync( func NewObjectSync(
spaceId string, spaceId string,
streamManager StreamManager, peerManager peermanager.PeerManager,
objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync { objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync {
syncCtx, cancel := context.WithCancel(context.Background()) syncCtx, cancel := context.WithCancel(context.Background())
os := newObjectSync( os := newObjectSync(
@ -42,7 +43,7 @@ func NewObjectSync(
objectGetter, objectGetter,
syncCtx, syncCtx,
cancel) cancel)
msgPool := newMessagePool(streamManager, os.handleMessage) msgPool := newMessagePool(peerManager, os.handleMessage)
os.messagePool = msgPool os.messagePool = msgPool
return os return os
} }

View File

@ -0,0 +1,94 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/any-sync/commonspace/peermanager (interfaces: PeerManager)
// Package mock_peermanager is a generated GoMock package.
package mock_peermanager
import (
context "context"
reflect "reflect"
spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
peer "github.com/anytypeio/any-sync/net/peer"
gomock "github.com/golang/mock/gomock"
)
// MockPeerManager is a mock of PeerManager interface.
type MockPeerManager struct {
ctrl *gomock.Controller
recorder *MockPeerManagerMockRecorder
}
// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager.
type MockPeerManagerMockRecorder struct {
mock *MockPeerManager
}
// NewMockPeerManager creates a new mock instance.
func NewMockPeerManager(ctrl *gomock.Controller) *MockPeerManager {
mock := &MockPeerManager{ctrl: ctrl}
mock.recorder = &MockPeerManagerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder {
return m.recorder
}
// Broadcast mocks base method.
func (m *MockPeerManager) Broadcast(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Broadcast", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Broadcast indicates an expected call of Broadcast.
func (mr *MockPeerManagerMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockPeerManager)(nil).Broadcast), arg0, arg1)
}
// GetResponsiblePeers mocks base method.
func (m *MockPeerManager) GetResponsiblePeers(arg0 context.Context) ([]peer.Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0)
ret0, _ := ret[0].([]peer.Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetResponsiblePeers indicates an expected call of GetResponsiblePeers.
func (mr *MockPeerManagerMockRecorder) GetResponsiblePeers(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockPeerManager)(nil).GetResponsiblePeers), arg0)
}
// SendPeer mocks base method.
func (m *MockPeerManager) SendPeer(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendPeer", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// SendPeer indicates an expected call of SendPeer.
func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2)
}
// SendResponsible mocks base method.
func (m *MockPeerManager) SendResponsible(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendResponsible", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SendResponsible indicates an expected call of SendResponsible.
func (mr *MockPeerManagerMockRecorder) SendResponsible(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendResponsible", reflect.TypeOf((*MockPeerManager)(nil).SendResponsible), arg0, arg1)
}

View File

@ -0,0 +1,27 @@
//go:generate mockgen -destination mock_peermanager/mock_peermanager.go github.com/anytypeio/any-sync/commonspace/peermanager PeerManager
package peermanager
import (
"context"
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/net/peer"
)
const CName = "common.commonspace.peermanager"
type PeerManager interface {
// SendPeer sends a message to a stream by peerId
SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
// SendResponsible sends a message to responsible peers streams
SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
// Broadcast sends a message to all subscribed peers
Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
// GetResponsiblePeers dials or gets from cache responsible peers to unary operations
GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error)
}
type PeerManagerProvider interface {
app.Component
NewPeerManager(ctx context.Context, spaceId string) (sm PeerManager, err error)
}

View File

@ -5,15 +5,14 @@ import (
"github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/accountservice"
"github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/headsync" "github.com/anytypeio/any-sync/commonspace/headsync"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/streammanager"
"github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/pool"
@ -40,13 +39,13 @@ type SpaceService interface {
} }
type spaceService struct { type spaceService struct {
config Config config Config
account accountservice.Service account accountservice.Service
configurationService nodeconf.Service configurationService nodeconf.Service
storageProvider spacestorage.SpaceStorageProvider storageProvider spacestorage.SpaceStorageProvider
streamManagerProvider streammanager.StreamManagerProvider peermanagerProvider peermanager.PeerManagerProvider
treeGetter treegetter.TreeGetter treeGetter treegetter.TreeGetter
pool pool.Pool pool pool.Pool
} }
func (s *spaceService) Init(a *app.App) (err error) { func (s *spaceService) Init(a *app.App) (err error) {
@ -55,7 +54,7 @@ func (s *spaceService) Init(a *app.App) (err error) {
s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter) s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter)
s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider) s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider)
s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool = a.MustComponent(pool.CName).(pool.Pool)
return nil return nil
} }
@ -117,8 +116,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
} }
lastConfiguration := s.configurationService.GetLast() lastConfiguration := s.configurationService.GetLast()
confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool) getter := newCommonGetter(st.Id(), s.treeGetter)
syncStatus := syncstatus.NewNoOpSyncStatus() syncStatus := syncstatus.NewNoOpSyncStatus()
// this will work only for clients, not the best solution, but... // this will work only for clients, not the best solution, but...
if !lastConfiguration.IsResponsible(st.Id()) { if !lastConfiguration.IsResponsible(st.Id()) {
@ -126,21 +124,19 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
} }
// TODO: [che] remove *5 peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id)
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log)
streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
objectSync := objectsync.NewObjectSync(streamManager, id) headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, peerManager, getter, syncStatus, log)
objectSync := objectsync.NewObjectSync(id, peerManager, getter)
sp := &space{ sp := &space{
id: id, id: id,
objectSync: objectSync, objectSync: objectSync,
headSync: headSync, headSync: headSync,
syncStatus: syncStatus, syncStatus: syncStatus,
cache: s.treeGetter, cache: getter,
account: s.account, account: s.account,
configuration: lastConfiguration, configuration: lastConfiguration,
storage: st, storage: st,

View File

@ -1,14 +0,0 @@
package streammanager
import (
"context"
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/commonspace/objectsync"
)
const CName = "common.commonspace.streammanager"
type StreamManagerProvider interface {
app.Component
NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error)
}