Add conf connector and change configuration logic

This commit is contained in:
mcrakhman 2022-10-09 21:54:34 +02:00
parent afc9141a7c
commit 2999f2dcdd
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
11 changed files with 136 additions and 109 deletions

View File

@ -36,14 +36,14 @@ func NewDiffService(
spaceId string,
syncPeriod int,
storage storage.SpaceStorage,
conf nodeconf.Configuration,
confConnector nodeconf.ConfConnector,
cache cache.TreeCache,
log *zap.Logger) DiffService {
diff := ldiff.New(16, 16)
l := log.With(zap.String("spaceId", spaceId))
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
syncer := newDiffSyncer(spaceId, diff, conf, cache, storage, factory, l)
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, l)
periodicSync := newPeriodicSync(syncPeriod, syncer, l)
return &diffService{

View File

@ -21,17 +21,17 @@ type DiffSyncer interface {
func newDiffSyncer(
spaceId string,
diff ldiff.Diff,
nconf nodeconf.Configuration,
confConnector nodeconf.ConfConnector,
cache cache.TreeCache,
storage storage.SpaceStorage,
clientFactory spacesyncproto.ClientFactory,
log *zap.Logger) DiffSyncer {
return &diffSyncer{
diff: diff,
nconf: nconf,
spaceId: spaceId,
cache: cache,
storage: storage,
confConnector: confConnector,
clientFactory: clientFactory,
log: log,
}
@ -40,7 +40,7 @@ func newDiffSyncer(
type diffSyncer struct {
spaceId string
diff ldiff.Diff
nconf nodeconf.Configuration
confConnector nodeconf.ConfConnector
cache cache.TreeCache
storage storage.SpaceStorage
clientFactory spacesyncproto.ClientFactory
@ -50,7 +50,7 @@ type diffSyncer struct {
func (d *diffSyncer) Sync(ctx context.Context) error {
st := time.Now()
// diffing with responsible peers according to configuration
peers, err := d.nconf.ResponsiblePeers(ctx, d.spaceId)
peers, err := d.confConnector.GetResponsiblePeers(ctx, d.spaceId)
if err != nil {
return err
}

View File

@ -88,7 +88,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
defer ctrl.Finish()
diffMock := mock_ldiff.NewMockDiff(ctrl)
nconfMock := mock_nodeconf.NewMockConfiguration(ctrl)
connectorMock := mock_nodeconf.NewMockConfConnector(ctrl)
cacheMock := mock_cache.NewMockTreeCache(ctrl)
stMock := mock_storage.NewMockSpaceStorage(ctrl)
clientMock := mock_spacesyncproto.NewMockDRPCSpaceClient(ctrl)
@ -97,11 +97,11 @@ func TestDiffSyncer_Sync(t *testing.T) {
})
spaceId := "spaceId"
l := logger.NewNamed(spaceId)
diffSyncer := newDiffSyncer(spaceId, diffMock, nconfMock, cacheMock, stMock, factory, l)
diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, l)
t.Run("diff syncer sync simple", func(t *testing.T) {
nconfMock.EXPECT().
ResponsiblePeers(gomock.Any(), spaceId).
connectorMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId).
Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remotediff.NewRemoteDiff(spaceId, clientMock))).
@ -115,8 +115,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
})
t.Run("diff syncer sync conf error", func(t *testing.T) {
nconfMock.EXPECT().
ResponsiblePeers(gomock.Any(), spaceId).
connectorMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId).
Return(nil, fmt.Errorf("some error"))
require.Error(t, diffSyncer.Sync(ctx))
@ -127,8 +127,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
aclRoot := &aclrecordproto.RawACLRecordWithId{}
spaceHeader := &spacesyncproto.SpaceHeader{}
nconfMock.EXPECT().
ResponsiblePeers(gomock.Any(), spaceId).
connectorMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId).
Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remotediff.NewRemoteDiff(spaceId, clientMock))).
@ -150,8 +150,8 @@ func TestDiffSyncer_Sync(t *testing.T) {
})
t.Run("diff syncer sync other error", func(t *testing.T) {
nconfMock.EXPECT().
ResponsiblePeers(gomock.Any(), spaceId).
connectorMock.EXPECT().
GetResponsiblePeers(gomock.Any(), spaceId).
Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remotediff.NewRemoteDiff(spaceId, clientMock))).

View File

@ -8,6 +8,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
)
@ -32,6 +33,7 @@ type service struct {
configurationService nodeconf.Service
storageProvider storage.SpaceStorageProvider
cache cache.TreeCache
pool pool.Pool
}
func (s *service) Init(a *app.App) (err error) {
@ -39,6 +41,7 @@ func (s *service) Init(a *app.App) (err error) {
s.storageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.cache = a.MustComponent(cache.CName).(cache.TreeCache)
s.pool = a.MustComponent(pool.CName).(pool.Pool)
return nil
}
@ -84,8 +87,9 @@ func (s *service) GetSpace(ctx context.Context, id string) (Space, error) {
return nil, err
}
lastConfiguration := s.configurationService.GetLast()
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, lastConfiguration, s.cache, log)
syncService := syncservice.NewSyncService(id, diffService, s.cache, lastConfiguration)
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.cache, log)
syncService := syncservice.NewSyncService(id, diffService, s.cache, lastConfiguration, confConnector)
sp := &space{
id: id,
syncService: syncService,

View File

@ -30,37 +30,42 @@ type syncService struct {
spaceId string
syncClient SyncClient
configuration nodeconf.Configuration
clientFactory spacesyncproto.ClientFactory
streamLoopCtx context.Context
stopStreamLoop context.CancelFunc
connector nodeconf.ConfConnector
streamLoopDone chan struct{}
}
func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService {
func NewSyncService(
spaceId string,
headNotifiable HeadNotifiable,
cache cache.TreeCache,
configuration nodeconf.Configuration,
confConnector nodeconf.ConfConnector) SyncService {
var syncHandler SyncHandler
pool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncHandler.HandleMessage(ctx, senderId, message)
})
factory := newRequestFactory()
syncClient := newSyncClient(spaceId, pool, headNotifiable, factory, configuration)
syncClient := newSyncClient(spaceId, streamPool, headNotifiable, factory, configuration)
syncHandler = newSyncHandler(spaceId, cache, syncClient)
return newSyncService(
spaceId,
syncClient,
spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient),
configuration)
confConnector)
}
func newSyncService(
spaceId string,
syncClient SyncClient,
clientFactory spacesyncproto.ClientFactory,
configuration nodeconf.Configuration) *syncService {
connector nodeconf.ConfConnector) *syncService {
return &syncService{
syncClient: syncClient,
configuration: configuration,
connector: connector,
clientFactory: clientFactory,
spaceId: spaceId,
streamLoopDone: make(chan struct{}),
@ -81,7 +86,7 @@ func (s *syncService) Close() (err error) {
func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
defer close(s.streamLoopDone)
checkResponsiblePeers := func() {
respPeers, err := s.configuration.ResponsiblePeers(ctx, s.spaceId)
respPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId)
if err != nil {
return
}

View File

@ -0,0 +1,54 @@
package nodeconf
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
)
type ConfConnector interface {
GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
DialResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
}
type confConnector struct {
conf Configuration
pool pool.Pool
}
func NewConfConnector(conf Configuration, pool pool.Pool) ConfConnector {
return &confConnector{conf: conf, pool: pool}
}
func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) {
return s.dialOrConnect(ctx, spaceId, s.pool.Get, s.pool.GetOneOf)
}
func (s *confConnector) DialResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) {
return s.dialOrConnect(ctx, spaceId, s.pool.Dial, s.pool.DialOneOf)
}
func (s *confConnector) dialOrConnect(
ctx context.Context, spaceId string,
connectOne func(context.Context, string) (peer.Peer, error),
connectOneOf func(context.Context, []string) (peer.Peer, error)) (peers []peer.Peer, err error) {
allNodes := s.conf.NodeIds(spaceId)
if !s.conf.IsResponsible(spaceId) {
for _, id := range allNodes {
var p peer.Peer
p, err = connectOne(ctx, id)
if err != nil {
continue
}
peers = append(peers, p)
}
} else {
var p peer.Peer
p, err = connectOneOf(ctx, allNodes)
if err != nil {
return
}
peers = []peer.Peer{p}
}
return
}

View File

@ -1,11 +1,7 @@
//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Configuration
//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Configuration,ConfConnector
package nodeconf
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-chash"
)
@ -16,12 +12,6 @@ func New() Service {
type Configuration interface {
// Id returns current nodeconf id
Id() string
// AllPeers returns all peers by spaceId except current account
AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error)
// OnePeer returns one of peer for spaceId
OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error)
// ResponsiblePeers returns peers for the space id that are responsible for the space
ResponsiblePeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error)
// NodeIds returns list of peerId for given spaceId
NodeIds(spaceId string) []string
// IsResponsible checks if current account responsible for given spaceId
@ -31,7 +21,6 @@ type Configuration interface {
type configuration struct {
id string
accountId string
pool pool.Pool
chash chash.CHash
}
@ -39,40 +28,6 @@ func (c *configuration) Id() string {
return c.id
}
func (c *configuration) AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) {
nodeIds := c.NodeIds(spaceId)
peers = make([]peer.Peer, 0, len(nodeIds))
for _, id := range nodeIds {
p, e := c.pool.Get(ctx, id)
if e == nil {
peers = append(peers, p)
}
}
if len(peers) == 0 {
return nil, fmt.Errorf("unable to connect to any node")
}
return
}
func (c *configuration) ResponsiblePeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) {
if c.IsResponsible(spaceId) {
return c.AllPeers(ctx, spaceId)
} else {
var one peer.Peer
one, err = c.OnePeer(ctx, spaceId)
if err != nil {
return
}
peers = []peer.Peer{one}
return
}
}
func (c *configuration) OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) {
nodeIds := c.NodeIds(spaceId)
return c.pool.GetOneOf(ctx, nodeIds)
}
func (c *configuration) NodeIds(spaceId string) []string {
members := c.chash.GetMembers(spaceId)
res := make([]string, 0, len(members))

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Configuration)
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Configuration,ConfConnector)
// Package mock_nodeconf is a generated GoMock package.
package mock_nodeconf
@ -35,21 +35,6 @@ func (m *MockConfiguration) EXPECT() *MockConfigurationMockRecorder {
return m.recorder
}
// AllPeers mocks base method.
func (m *MockConfiguration) AllPeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AllPeers", arg0, arg1)
ret0, _ := ret[0].([]peer.Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// AllPeers indicates an expected call of AllPeers.
func (mr *MockConfigurationMockRecorder) AllPeers(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllPeers", reflect.TypeOf((*MockConfiguration)(nil).AllPeers), arg0, arg1)
}
// Id mocks base method.
func (m *MockConfiguration) Id() string {
m.ctrl.T.Helper()
@ -92,32 +77,55 @@ func (mr *MockConfigurationMockRecorder) NodeIds(arg0 interface{}) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NodeIds", reflect.TypeOf((*MockConfiguration)(nil).NodeIds), arg0)
}
// OnePeer mocks base method.
func (m *MockConfiguration) OnePeer(arg0 context.Context, arg1 string) (peer.Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "OnePeer", arg0, arg1)
ret0, _ := ret[0].(peer.Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
// MockConfConnector is a mock of ConfConnector interface.
type MockConfConnector struct {
ctrl *gomock.Controller
recorder *MockConfConnectorMockRecorder
}
// OnePeer indicates an expected call of OnePeer.
func (mr *MockConfigurationMockRecorder) OnePeer(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnePeer", reflect.TypeOf((*MockConfiguration)(nil).OnePeer), arg0, arg1)
// MockConfConnectorMockRecorder is the mock recorder for MockConfConnector.
type MockConfConnectorMockRecorder struct {
mock *MockConfConnector
}
// ResponsiblePeers mocks base method.
func (m *MockConfiguration) ResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) {
// NewMockConfConnector creates a new mock instance.
func NewMockConfConnector(ctrl *gomock.Controller) *MockConfConnector {
mock := &MockConfConnector{ctrl: ctrl}
mock.recorder = &MockConfConnectorMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockConfConnector) EXPECT() *MockConfConnectorMockRecorder {
return m.recorder
}
// DialResponsiblePeers mocks base method.
func (m *MockConfConnector) DialResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ResponsiblePeers", arg0, arg1)
ret := m.ctrl.Call(m, "DialResponsiblePeers", arg0, arg1)
ret0, _ := ret[0].([]peer.Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ResponsiblePeers indicates an expected call of ResponsiblePeers.
func (mr *MockConfigurationMockRecorder) ResponsiblePeers(arg0, arg1 interface{}) *gomock.Call {
// DialResponsiblePeers indicates an expected call of DialResponsiblePeers.
func (mr *MockConfConnectorMockRecorder) DialResponsiblePeers(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResponsiblePeers", reflect.TypeOf((*MockConfiguration)(nil).ResponsiblePeers), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialResponsiblePeers), arg0, arg1)
}
// GetResponsiblePeers mocks base method.
func (m *MockConfConnector) GetResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0, arg1)
ret0, _ := ret[0].([]peer.Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetResponsiblePeers indicates an expected call of GetResponsiblePeers.
func (mr *MockConfConnectorMockRecorder) GetResponsiblePeers(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).GetResponsiblePeers), arg0, arg1)
}

View File

@ -3,7 +3,6 @@ package nodeconf
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/encryptionkey"
@ -29,7 +28,6 @@ type Service interface {
type service struct {
accountId string
pool pool.Pool
consensusPeers []string
last Configuration
@ -53,12 +51,10 @@ func (n *Node) Capacity() float64 {
func (s *service) Init(a *app.App) (err error) {
conf := a.MustComponent(config.CName).(*config.Config)
s.accountId = conf.Account.PeerId
s.pool = a.MustComponent(pool.CName).(pool.Pool)
config := &configuration{
id: "config",
accountId: s.accountId,
pool: s.pool,
}
if config.chash, err = chash.New(chash.Config{
PartitionCount: partitionCount,

3
go.mod
View File

@ -61,9 +61,12 @@ require (
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.5 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.6 // indirect

2
go.sum
View File

@ -167,6 +167,7 @@ golang.org/x/image v0.0.0-20200119044424-58c23975cae1/go.mod h1:FeLwcggjj3mMvU+o
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@ -207,6 +208,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=