Move concrete logic to concrete packages

This commit is contained in:
mcrakhman 2022-10-22 14:23:42 +02:00 committed by Mikhail Iudin
parent e289fd4f89
commit 69211dfe66
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
11 changed files with 136 additions and 94 deletions

View File

@ -14,8 +14,8 @@ import (
)
type DiffService interface {
HeadNotifiable
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
UpdateHeads(id string, heads []string)
RemoveObject(id string)
AllIds() []string

View File

@ -0,0 +1,5 @@
package diffservice
type HeadNotifiable interface {
UpdateHeads(id string, heads []string)
}

View File

@ -91,14 +91,15 @@ func (s *service) GetSpace(ctx context.Context, id string) (Space, error) {
lastConfiguration := s.configurationService.GetLast()
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.cache, log)
syncService := syncservice.NewSyncService(id, diffService, lastConfiguration, confConnector)
syncService := syncservice.NewSyncService(id, confConnector)
sp := &space{
id: id,
syncService: syncService,
diffService: diffService,
cache: s.cache,
account: s.account,
storage: st,
id: id,
syncService: syncService,
diffService: diffService,
cache: s.cache,
account: s.account,
configuration: lastConfiguration,
storage: st,
}
if err := sp.Init(ctx); err != nil {
return nil, err

View File

@ -13,6 +13,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
aclstorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
@ -69,12 +70,13 @@ type space struct {
rpc *rpcHandler
syncService syncservice.SyncService
diffService diffservice.DiffService
storage storage.SpaceStorage
cache treegetter.TreeGetter
account account.Service
aclList *syncacl.SyncACL
syncService syncservice.SyncService
diffService diffservice.DiffService
storage storage.SpaceStorage
cache treegetter.TreeGetter
account account.Service
aclList *syncacl.SyncACL
configuration nodeconf.Configuration
isClosed atomic.Bool
}
@ -101,7 +103,7 @@ func (s *space) Init(ctx context.Context) (err error) {
if err != nil {
return
}
s.aclList = syncacl.NewSyncACL(aclList, s.syncService.SyncClient())
s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool())
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache)
s.syncService.Init(objectGetter)
s.diffService.Init(initialIds)
@ -129,12 +131,15 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
err = ErrSpaceClosed
return
}
deps := synctree.SyncTreeCreateDeps{
Payload: payload,
SyncClient: s.syncService.SyncClient(),
Listener: listener,
AclList: s.aclList,
CreateStorage: s.storage.CreateTreeStorage,
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
CreateStorage: s.storage.CreateTreeStorage,
}
return synctree.DeriveSyncTree(ctx, deps)
}
@ -144,12 +149,15 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
err = ErrSpaceClosed
return
}
deps := synctree.SyncTreeCreateDeps{
Payload: payload,
SyncClient: s.syncService.SyncClient(),
Listener: listener,
AclList: s.aclList,
CreateStorage: s.storage.CreateTreeStorage,
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
CreateStorage: s.storage.CreateTreeStorage,
}
return synctree.CreateSyncTree(ctx, deps)
}
@ -165,9 +173,9 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
if err != nil {
return nil, err
}
return s.syncService.SyncClient().SendSync(
return s.syncService.StreamPool().SendSync(
peerId,
s.syncService.SyncClient().CreateNewTreeRequest(id),
synctree.GetRequestFactory().CreateNewTreeRequest(id),
)
}
@ -204,11 +212,14 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
return
}
}
deps := synctree.SyncTreeBuildDeps{
SyncClient: s.syncService.SyncClient(),
Listener: listener,
AclList: s.aclList,
Storage: store,
deps := synctree.BuildDeps{
SpaceId: s.id,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
Storage: store,
}
return synctree.BuildSyncTree(ctx, isFirstBuild, deps)
}

View File

@ -8,14 +8,14 @@ import (
type SyncACL struct {
list.ACLList
syncservice.SyncClient
synchandler.SyncHandler
streamPool syncservice.StreamPool
}
func NewSyncACL(aclList list.ACLList, syncClient syncservice.SyncClient) *SyncACL {
func NewSyncACL(aclList list.ACLList, streamPool syncservice.StreamPool) *SyncACL {
return &SyncACL{
ACLList: aclList,
SyncClient: syncClient,
SyncHandler: nil,
streamPool: streamPool,
}
}

View File

@ -18,22 +18,18 @@ var log = logger.NewNamed("syncservice").Sugar()
type SyncService interface {
ocache.ObjectLastUsage
synchandler.SyncHandler
SyncClient() SyncClient
StreamPool() StreamPool
Init(getter objectgetter.ObjectGetter)
Close() (err error)
}
type HeadNotifiable interface {
UpdateHeads(id string, heads []string)
}
const respPeersStreamCheckInterval = time.Second * 10
type syncService struct {
spaceId string
syncClient SyncClient
streamPool StreamPool
clientFactory spacesyncproto.ClientFactory
objectGetter objectgetter.ObjectGetter
@ -45,17 +41,13 @@ type syncService struct {
func NewSyncService(
spaceId string,
headNotifiable HeadNotifiable,
configuration nodeconf.Configuration,
confConnector nodeconf.ConfConnector) (syncService SyncService) {
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncService.HandleMessage(ctx, senderId, message)
})
factory := newRequestFactory()
syncClient := newSyncClient(spaceId, streamPool, headNotifiable, factory, configuration)
syncService = newSyncService(
spaceId,
syncClient,
streamPool,
spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient),
confConnector)
return
@ -63,11 +55,11 @@ func NewSyncService(
func newSyncService(
spaceId string,
syncClient SyncClient,
streamPool StreamPool,
clientFactory spacesyncproto.ClientFactory,
connector nodeconf.ConfConnector) *syncService {
return &syncService{
syncClient: syncClient,
streamPool: streamPool,
connector: connector,
clientFactory: clientFactory,
spaceId: spaceId,
@ -84,11 +76,11 @@ func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) {
func (s *syncService) Close() (err error) {
s.stopStreamLoop()
<-s.streamLoopDone
return s.syncClient.Close()
return s.streamPool.Close()
}
func (s *syncService) LastUsage() time.Time {
return s.syncClient.LastUsage()
return s.streamPool.LastUsage()
}
func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
@ -107,7 +99,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
return
}
for _, peer := range respPeers {
if s.syncClient.HasActiveStream(peer.Id()) {
if s.streamPool.HasActiveStream(peer.Id()) {
continue
}
stream, err := s.clientFactory.Client(peer).Stream(ctx)
@ -125,7 +117,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err)
continue
}
s.syncClient.AddAndReadStreamAsync(stream)
s.streamPool.AddAndReadStreamAsync(stream)
}
}
@ -142,6 +134,6 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
}
}
func (s *syncService) SyncClient() SyncClient {
return s.syncClient
func (s *syncService) StreamPool() StreamPool {
return s.streamPool
}

View File

@ -1,4 +1,4 @@
package syncservice
package synctree
import (
"fmt"
@ -15,8 +15,10 @@ type RequestFactory interface {
CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error)
}
func newRequestFactory() RequestFactory {
return &requestFactory{}
var factory = &requestFactory{}
func GetRequestFactory() RequestFactory {
return factory
}
type requestFactory struct{}

View File

@ -1,28 +1,35 @@
package syncservice
package synctree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"time"
)
type SyncClient interface {
StreamPool
syncservice.StreamPool
RequestFactory
ocache.ObjectLastUsage
BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error)
}
type syncClient struct {
StreamPool
syncservice.StreamPool
RequestFactory
spaceId string
notifiable HeadNotifiable
notifiable diffservice.HeadNotifiable
configuration nodeconf.Configuration
}
func newSyncClient(spaceId string, pool StreamPool, notifiable HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
func newSyncClient(
spaceId string,
pool syncservice.StreamPool,
notifiable diffservice.HeadNotifiable,
factory RequestFactory,
configuration nodeconf.Configuration) SyncClient {
return &syncClient{
StreamPool: pool,
RequestFactory: factory,

View File

@ -4,9 +4,11 @@ import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
@ -19,7 +21,7 @@ var ErrSyncTreeClosed = errors.New("sync tree is closed")
type SyncTree struct {
tree.ObjectTree
synchandler.SyncHandler
syncClient syncservice.SyncClient
syncClient SyncClient
listener updatelistener.UpdateListener
isClosed bool
}
@ -30,78 +32,102 @@ var createDerivedObjectTree = tree.CreateDerivedObjectTree
var createObjectTree = tree.CreateObjectTree
var buildObjectTree = tree.BuildObjectTree
type SyncTreeCreateDeps struct {
Payload tree.ObjectTreeCreatePayload
SyncClient syncservice.SyncClient
Listener updatelistener.UpdateListener
AclList list.ACLList
CreateStorage storage.TreeStorageCreatorFunc
type CreateDeps struct {
SpaceId string
Payload tree.ObjectTreeCreatePayload
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
StreamPool syncservice.StreamPool
Listener updatelistener.UpdateListener
AclList list.ACLList
CreateStorage storage.TreeStorageCreatorFunc
}
type SyncTreeBuildDeps struct {
SyncClient syncservice.SyncClient
Listener updatelistener.UpdateListener
AclList list.ACLList
Storage storage.TreeStorage
type BuildDeps struct {
SpaceId string
StreamPool syncservice.StreamPool
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.ACLList
Storage storage.TreeStorage
}
func DeriveSyncTree(
ctx context.Context,
deps SyncTreeCreateDeps) (t tree.ObjectTree, err error) {
deps CreateDeps) (t tree.ObjectTree, err error) {
t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage)
if err != nil {
return
}
syncClient := newSyncClient(
deps.SpaceId,
deps.StreamPool,
deps.HeadNotifiable,
GetRequestFactory(),
deps.Configuration)
syncTree := &SyncTree{
ObjectTree: t,
syncClient: deps.SyncClient,
syncClient: syncClient,
listener: deps.Listener,
}
syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient)
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
t = syncTree
headUpdate := deps.SyncClient.CreateHeadUpdate(t, nil)
err = deps.SyncClient.BroadcastAsync(headUpdate)
headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate)
return
}
func CreateSyncTree(
ctx context.Context,
deps SyncTreeCreateDeps) (t tree.ObjectTree, err error) {
deps CreateDeps) (t tree.ObjectTree, err error) {
t, err = createObjectTree(deps.Payload, deps.AclList, deps.CreateStorage)
if err != nil {
return
}
syncClient := newSyncClient(
deps.SpaceId,
deps.StreamPool,
deps.HeadNotifiable,
GetRequestFactory(),
deps.Configuration)
syncTree := &SyncTree{
ObjectTree: t,
syncClient: deps.SyncClient,
syncClient: syncClient,
listener: deps.Listener,
}
syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient)
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
t = syncTree
headUpdate := deps.SyncClient.CreateHeadUpdate(t, nil)
err = deps.SyncClient.BroadcastAsync(headUpdate)
headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate)
return
}
func BuildSyncTree(
ctx context.Context,
isFirstBuild bool,
deps SyncTreeBuildDeps) (t tree.ObjectTree, err error) {
deps BuildDeps) (t tree.ObjectTree, err error) {
t, err = buildObjectTree(deps.Storage, deps.AclList)
if err != nil {
return
}
syncClient := newSyncClient(
deps.SpaceId,
deps.StreamPool,
deps.HeadNotifiable,
GetRequestFactory(),
deps.Configuration)
syncTree := &SyncTree{
ObjectTree: t,
syncClient: deps.SyncClient,
syncClient: syncClient,
listener: deps.Listener,
}
syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient)
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
t = syncTree

View File

@ -3,7 +3,6 @@ package synctree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/mock_syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener/mock_updatelistener"
@ -21,7 +20,7 @@ import (
type syncTreeMatcher struct {
objTree tree2.ObjectTree
client syncservice.SyncClient
client SyncClient
listener updatelistener.UpdateListener
}

View File

@ -3,7 +3,6 @@ package synctree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
@ -11,10 +10,10 @@ import (
type syncTreeHandler struct {
objTree tree.ObjectTree
syncClient syncservice.SyncClient
syncClient SyncClient
}
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient syncservice.SyncClient) synchandler.SyncHandler {
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler {
return &syncTreeHandler{
objTree: objTree,
syncClient: syncClient,