Move notifiable logic to synctree instead of syncclient

This commit is contained in:
mcrakhman 2022-12-02 14:14:02 +01:00 committed by Mikhail Iudin
parent 3dec56141f
commit c85ae1a335
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
5 changed files with 17 additions and 25 deletions

View File

@ -13,7 +13,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"net/http"
"storj.io/drpc" "storj.io/drpc"
) )
@ -33,7 +32,6 @@ type Service interface {
type service struct { type service struct {
controller Controller controller Controller
transport secure.Service transport secure.Service
srv *http.Server
cfg *config.Config cfg *config.Config
*server.BaseDrpcServer *server.BaseDrpcServer
} }

View File

@ -110,6 +110,7 @@ func (s *settingsDocument) Rebuild(tr tree.ObjectTree) {
} }
func (s *settingsDocument) Init(ctx context.Context) (err error) { func (s *settingsDocument) Init(ctx context.Context) (err error) {
log.Debug("space settings id", zap.String("id", s.store.SpaceSettingsId()))
s.SyncTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s) s.SyncTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s)
if err != nil { if err != nil {
return return

View File

@ -2,7 +2,6 @@
package synctree package synctree
import ( import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
@ -20,27 +19,23 @@ type syncClient struct {
syncservice.StreamPool syncservice.StreamPool
RequestFactory RequestFactory
spaceId string spaceId string
notifiable diffservice.HeadNotifiable
configuration nodeconf.Configuration configuration nodeconf.Configuration
} }
func newSyncClient( func newSyncClient(
spaceId string, spaceId string,
pool syncservice.StreamPool, pool syncservice.StreamPool,
notifiable diffservice.HeadNotifiable,
factory RequestFactory, factory RequestFactory,
configuration nodeconf.Configuration) SyncClient { configuration nodeconf.Configuration) SyncClient {
return &syncClient{ return &syncClient{
StreamPool: pool, StreamPool: pool,
RequestFactory: factory, RequestFactory: factory,
notifiable: notifiable,
configuration: configuration, configuration: configuration,
spaceId: spaceId, spaceId: spaceId,
} }
} }
func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) {
s.notifyIfNeeded(message)
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
if err != nil { if err != nil {
return return
@ -57,7 +52,6 @@ func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncM
} }
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) {
s.notifyIfNeeded(message)
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
if err != nil { if err != nil {
return return
@ -68,13 +62,6 @@ func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.Tr
return s.BroadcastAsync(message) return s.BroadcastAsync(message)
} }
func (s *syncClient) notifyIfNeeded(message *treechangeproto.TreeSyncMessage) {
if message.GetContent().GetHeadUpdate() != nil {
update := message.GetContent().GetHeadUpdate()
s.notifiable.UpdateHeads(message.RootChange.Id, update.Heads)
}
}
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
payload, err := message.Marshal() payload, err := message.Marshal()
if err != nil { if err != nil {

View File

@ -33,6 +33,7 @@ type syncTree struct {
tree.ObjectTree tree.ObjectTree
synchandler.SyncHandler synchandler.SyncHandler
syncClient SyncClient syncClient SyncClient
notifiable diffservice.HeadNotifiable
listener updatelistener.UpdateListener listener updatelistener.UpdateListener
isClosed bool isClosed bool
isDeleted bool isDeleted bool
@ -75,12 +76,12 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
syncClient := createSyncClient( syncClient := createSyncClient(
deps.SpaceId, deps.SpaceId,
deps.StreamPool, deps.StreamPool,
deps.HeadNotifiable,
sharedFactory, sharedFactory,
deps.Configuration) deps.Configuration)
syncTree := &syncTree{ syncTree := &syncTree{
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
@ -105,12 +106,12 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
syncClient := createSyncClient( syncClient := createSyncClient(
deps.SpaceId, deps.SpaceId,
deps.StreamPool, deps.StreamPool,
deps.HeadNotifiable,
GetRequestFactory(), GetRequestFactory(),
deps.Configuration) deps.Configuration)
syncTree := &syncTree{ syncTree := &syncTree{
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
@ -200,12 +201,12 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
syncClient := createSyncClient( syncClient := createSyncClient(
deps.SpaceId, deps.SpaceId,
deps.StreamPool, deps.StreamPool,
deps.HeadNotifiable,
GetRequestFactory(), GetRequestFactory(),
deps.Configuration) deps.Configuration)
syncTree := &syncTree{ syncTree := &syncTree{
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
@ -251,6 +252,9 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
if err != nil { if err != nil {
return return
} }
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.ID(), res.Heads)
}
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate) err = s.syncClient.BroadcastAsync(headUpdate)
return return
@ -274,10 +278,13 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot
s.listener.Rebuild(s) s.listener.Rebuild(s)
} }
} }
//if res.Mode != tree.Nothing { if res.Mode != tree.Nothing {
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) if s.notifiable != nil {
err = s.syncClient.BroadcastAsync(headUpdate) s.notifiable.UpdateHeads(s.ID(), res.Heads)
//} }
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate)
}
return return
} }

View File

@ -2,7 +2,6 @@ package synctree
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree"
@ -55,7 +54,7 @@ func Test_DeriveSyncTree(t *testing.T) {
require.Equal(t, expectedPayload, payload) require.Equal(t, expectedPayload, payload)
return objTreeMock, nil return objTreeMock, nil
} }
createSyncClient = func(spaceId string, pool syncservice.StreamPool, notifiable diffservice.HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient { createSyncClient = func(spaceId string, pool syncservice.StreamPool, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
return syncClientMock return syncClientMock
} }
headUpdate := &treechangeproto.TreeSyncMessage{} headUpdate := &treechangeproto.TreeSyncMessage{}
@ -91,7 +90,7 @@ func Test_CreateSyncTree(t *testing.T) {
require.Equal(t, expectedPayload, payload) require.Equal(t, expectedPayload, payload)
return objTreeMock, nil return objTreeMock, nil
} }
createSyncClient = func(spaceId string, pool syncservice.StreamPool, notifiable diffservice.HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient { createSyncClient = func(spaceId string, pool syncservice.StreamPool, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
return syncClientMock return syncClientMock
} }
headUpdate := &treechangeproto.TreeSyncMessage{} headUpdate := &treechangeproto.TreeSyncMessage{}