Move notifiable logic to synctree instead of syncclient
This commit is contained in:
parent
1e4a25c15d
commit
aaecd2ae6d
@ -13,7 +13,6 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
|
||||
"net/http"
|
||||
"storj.io/drpc"
|
||||
)
|
||||
|
||||
@ -33,7 +32,6 @@ type Service interface {
|
||||
type service struct {
|
||||
controller Controller
|
||||
transport secure.Service
|
||||
srv *http.Server
|
||||
cfg *config.Config
|
||||
*server.BaseDrpcServer
|
||||
}
|
||||
|
||||
@ -110,6 +110,7 @@ func (s *settingsDocument) Rebuild(tr tree.ObjectTree) {
|
||||
}
|
||||
|
||||
func (s *settingsDocument) Init(ctx context.Context) (err error) {
|
||||
log.Debug("space settings id", zap.String("id", s.store.SpaceSettingsId()))
|
||||
s.SyncTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@ -2,7 +2,6 @@
|
||||
package synctree
|
||||
|
||||
import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
@ -20,27 +19,23 @@ type syncClient struct {
|
||||
syncservice.StreamPool
|
||||
RequestFactory
|
||||
spaceId string
|
||||
notifiable diffservice.HeadNotifiable
|
||||
configuration nodeconf.Configuration
|
||||
}
|
||||
|
||||
func newSyncClient(
|
||||
spaceId string,
|
||||
pool syncservice.StreamPool,
|
||||
notifiable diffservice.HeadNotifiable,
|
||||
factory RequestFactory,
|
||||
configuration nodeconf.Configuration) SyncClient {
|
||||
return &syncClient{
|
||||
StreamPool: pool,
|
||||
RequestFactory: factory,
|
||||
notifiable: notifiable,
|
||||
configuration: configuration,
|
||||
spaceId: spaceId,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
s.notifyIfNeeded(message)
|
||||
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
|
||||
if err != nil {
|
||||
return
|
||||
@ -57,7 +52,6 @@ func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncM
|
||||
}
|
||||
|
||||
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
s.notifyIfNeeded(message)
|
||||
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
|
||||
if err != nil {
|
||||
return
|
||||
@ -68,13 +62,6 @@ func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.Tr
|
||||
return s.BroadcastAsync(message)
|
||||
}
|
||||
|
||||
func (s *syncClient) notifyIfNeeded(message *treechangeproto.TreeSyncMessage) {
|
||||
if message.GetContent().GetHeadUpdate() != nil {
|
||||
update := message.GetContent().GetHeadUpdate()
|
||||
s.notifiable.UpdateHeads(message.RootChange.Id, update.Heads)
|
||||
}
|
||||
}
|
||||
|
||||
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
payload, err := message.Marshal()
|
||||
if err != nil {
|
||||
|
||||
@ -35,6 +35,7 @@ type syncTree struct {
|
||||
tree.ObjectTree
|
||||
synchandler.SyncHandler
|
||||
syncClient SyncClient
|
||||
notifiable diffservice.HeadNotifiable
|
||||
listener updatelistener.UpdateListener
|
||||
isClosed bool
|
||||
isDeleted bool
|
||||
@ -77,12 +78,12 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
|
||||
syncClient := createSyncClient(
|
||||
deps.SpaceId,
|
||||
deps.StreamPool,
|
||||
deps.HeadNotifiable,
|
||||
sharedFactory,
|
||||
deps.Configuration)
|
||||
syncTree := &syncTree{
|
||||
ObjectTree: objTree,
|
||||
syncClient: syncClient,
|
||||
notifiable: deps.HeadNotifiable,
|
||||
listener: deps.Listener,
|
||||
}
|
||||
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
||||
@ -107,12 +108,12 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
|
||||
syncClient := createSyncClient(
|
||||
deps.SpaceId,
|
||||
deps.StreamPool,
|
||||
deps.HeadNotifiable,
|
||||
GetRequestFactory(),
|
||||
deps.Configuration)
|
||||
syncTree := &syncTree{
|
||||
ObjectTree: objTree,
|
||||
syncClient: syncClient,
|
||||
notifiable: deps.HeadNotifiable,
|
||||
listener: deps.Listener,
|
||||
}
|
||||
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
||||
@ -207,12 +208,12 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
||||
syncClient := createSyncClient(
|
||||
deps.SpaceId,
|
||||
deps.StreamPool,
|
||||
deps.HeadNotifiable,
|
||||
GetRequestFactory(),
|
||||
deps.Configuration)
|
||||
syncTree := &syncTree{
|
||||
ObjectTree: objTree,
|
||||
syncClient: syncClient,
|
||||
notifiable: deps.HeadNotifiable,
|
||||
listener: deps.Listener,
|
||||
}
|
||||
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
||||
@ -258,6 +259,9 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.notifiable != nil {
|
||||
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
||||
}
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||
return
|
||||
@ -281,10 +285,13 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot
|
||||
s.listener.Rebuild(s)
|
||||
}
|
||||
}
|
||||
//if res.Mode != tree.Nothing {
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||
//}
|
||||
if res.Mode != tree.Nothing {
|
||||
if s.notifiable != nil {
|
||||
s.notifiable.UpdateHeads(s.ID(), res.Heads)
|
||||
}
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -2,7 +2,6 @@ package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree"
|
||||
@ -55,7 +54,7 @@ func Test_DeriveSyncTree(t *testing.T) {
|
||||
require.Equal(t, expectedPayload, payload)
|
||||
return objTreeMock, nil
|
||||
}
|
||||
createSyncClient = func(spaceId string, pool syncservice.StreamPool, notifiable diffservice.HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
|
||||
createSyncClient = func(spaceId string, pool syncservice.StreamPool, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
|
||||
return syncClientMock
|
||||
}
|
||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||
@ -91,7 +90,7 @@ func Test_CreateSyncTree(t *testing.T) {
|
||||
require.Equal(t, expectedPayload, payload)
|
||||
return objTreeMock, nil
|
||||
}
|
||||
createSyncClient = func(spaceId string, pool syncservice.StreamPool, notifiable diffservice.HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
|
||||
createSyncClient = func(spaceId string, pool syncservice.StreamPool, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
|
||||
return syncClientMock
|
||||
}
|
||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user