Update synctree ping logic
This commit is contained in:
parent
00b1b8e4a5
commit
8c60e77b6f
@ -21,7 +21,7 @@ type TreeHeads struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type DiffService interface {
|
type DiffService interface {
|
||||||
HeadNotifiable
|
UpdateHeads(id string, heads []string)
|
||||||
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
|
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
|
||||||
RemoveObjects(ids []string)
|
RemoveObjects(ids []string)
|
||||||
AllIds() []string
|
AllIds() []string
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
|
||||||
@ -116,7 +117,19 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
|
|
||||||
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
||||||
for _, tId := range trees {
|
for _, tId := range trees {
|
||||||
_, _ = d.cache.GetTree(ctx, d.spaceId, tId)
|
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
syncTree, ok := tree.(synctree.SyncTree)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// the idea why we call it directly is that if we try to get it from cache
|
||||||
|
// it may be already there (i.e. loaded)
|
||||||
|
// and build func will not be called, thus we won't sync the tree
|
||||||
|
// therefore we just do it manually
|
||||||
|
syncTree.Ping()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,11 +0,0 @@
|
|||||||
package diffservice
|
|
||||||
|
|
||||||
type HeadNotifiable interface {
|
|
||||||
UpdateHeads(id string, heads []string)
|
|
||||||
}
|
|
||||||
|
|
||||||
type HeadNotifiableFunc func(id string, heads []string)
|
|
||||||
|
|
||||||
func (h HeadNotifiableFunc) UpdateHeads(id string, heads []string) {
|
|
||||||
h(id, heads)
|
|
||||||
}
|
|
||||||
@ -110,8 +110,9 @@ func (s *settingsDocument) Rebuild(tr tree.ObjectTree) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *settingsDocument) Init(ctx context.Context) (err error) {
|
func (s *settingsDocument) Init(ctx context.Context) (err error) {
|
||||||
log.Debug("space settings id", zap.String("id", s.store.SpaceSettingsId()))
|
settingsId := s.store.SpaceSettingsId()
|
||||||
s.SyncTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s)
|
log.Debug("space settings id", zap.String("id", settingsId))
|
||||||
|
s.SyncTree, err = s.buildFunc(ctx, settingsId, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -368,6 +368,20 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call {
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockSyncTree)(nil).Lock))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockSyncTree)(nil).Lock))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping mocks base method.
|
||||||
|
func (m *MockSyncTree) Ping() error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Ping")
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping indicates an expected call of Ping.
|
||||||
|
func (mr *MockSyncTreeMockRecorder) Ping() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping))
|
||||||
|
}
|
||||||
|
|
||||||
// RLock mocks base method.
|
// RLock mocks base method.
|
||||||
func (m *MockSyncTree) RLock() {
|
func (m *MockSyncTree) RLock() {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|||||||
@ -5,7 +5,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
|
|
||||||
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
|
||||||
@ -26,9 +25,14 @@ var (
|
|||||||
ErrSyncTreeDeleted = errors.New("sync tree is deleted")
|
ErrSyncTreeDeleted = errors.New("sync tree is deleted")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type HeadNotifiable interface {
|
||||||
|
UpdateHeads(id string, heads []string)
|
||||||
|
}
|
||||||
|
|
||||||
type SyncTree interface {
|
type SyncTree interface {
|
||||||
tree.ObjectTree
|
tree.ObjectTree
|
||||||
synchandler.SyncHandler
|
synchandler.SyncHandler
|
||||||
|
Ping() (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncTree sends head updates to sync service and also sends new changes to update listener
|
// SyncTree sends head updates to sync service and also sends new changes to update listener
|
||||||
@ -36,7 +40,7 @@ type syncTree struct {
|
|||||||
tree.ObjectTree
|
tree.ObjectTree
|
||||||
synchandler.SyncHandler
|
synchandler.SyncHandler
|
||||||
syncClient SyncClient
|
syncClient SyncClient
|
||||||
notifiable diffservice.HeadNotifiable
|
notifiable HeadNotifiable
|
||||||
listener updatelistener.UpdateListener
|
listener updatelistener.UpdateListener
|
||||||
treeUsage *atomic.Int32
|
treeUsage *atomic.Int32
|
||||||
isClosed bool
|
isClosed bool
|
||||||
@ -54,7 +58,7 @@ type CreateDeps struct {
|
|||||||
SpaceId string
|
SpaceId string
|
||||||
Payload tree.ObjectTreeCreatePayload
|
Payload tree.ObjectTreeCreatePayload
|
||||||
Configuration nodeconf.Configuration
|
Configuration nodeconf.Configuration
|
||||||
HeadNotifiable diffservice.HeadNotifiable
|
HeadNotifiable HeadNotifiable
|
||||||
StreamPool syncservice.StreamPool
|
StreamPool syncservice.StreamPool
|
||||||
Listener updatelistener.UpdateListener
|
Listener updatelistener.UpdateListener
|
||||||
AclList list.ACLList
|
AclList list.ACLList
|
||||||
@ -66,7 +70,7 @@ type BuildDeps struct {
|
|||||||
SpaceId string
|
SpaceId string
|
||||||
StreamPool syncservice.StreamPool
|
StreamPool syncservice.StreamPool
|
||||||
Configuration nodeconf.Configuration
|
Configuration nodeconf.Configuration
|
||||||
HeadNotifiable diffservice.HeadNotifiable
|
HeadNotifiable HeadNotifiable
|
||||||
Listener updatelistener.UpdateListener
|
Listener updatelistener.UpdateListener
|
||||||
AclList list.ACLList
|
AclList list.ACLList
|
||||||
SpaceStorage spacestorage.SpaceStorage
|
SpaceStorage spacestorage.SpaceStorage
|
||||||
@ -246,9 +250,6 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
|||||||
if isFirstBuild {
|
if isFirstBuild {
|
||||||
// send to everybody, because everybody should know that the node or client got new tree
|
// send to everybody, because everybody should know that the node or client got new tree
|
||||||
err = syncTree.syncClient.BroadcastAsync(headUpdate)
|
err = syncTree.syncClient.BroadcastAsync(headUpdate)
|
||||||
} else {
|
|
||||||
// send either to everybody if client or to replica set if node
|
|
||||||
err = syncTree.syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -347,3 +348,8 @@ func (s *syncTree) checkAlive() (err error) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *syncTree) Ping() (err error) {
|
||||||
|
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
||||||
|
return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
|
||||||
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -67,6 +68,7 @@ func Test_DeriveSyncTree(t *testing.T) {
|
|||||||
Payload: expectedPayload,
|
Payload: expectedPayload,
|
||||||
Listener: updateListenerMock,
|
Listener: updateListenerMock,
|
||||||
SpaceStorage: spaceStorageMock,
|
SpaceStorage: spaceStorageMock,
|
||||||
|
TreeUsage: &atomic.Int32{},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := DeriveSyncTree(ctx, deps)
|
_, err := DeriveSyncTree(ctx, deps)
|
||||||
@ -103,6 +105,7 @@ func Test_CreateSyncTree(t *testing.T) {
|
|||||||
Payload: expectedPayload,
|
Payload: expectedPayload,
|
||||||
Listener: updateListenerMock,
|
Listener: updateListenerMock,
|
||||||
SpaceStorage: spaceStorageMock,
|
SpaceStorage: spaceStorageMock,
|
||||||
|
TreeUsage: &atomic.Int32{},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := CreateSyncTree(ctx, deps)
|
_, err := CreateSyncTree(ctx, deps)
|
||||||
|
|||||||
@ -209,19 +209,33 @@ func (m *MockConfConnector) EXPECT() *MockConfConnectorMockRecorder {
|
|||||||
return m.recorder
|
return m.recorder
|
||||||
}
|
}
|
||||||
|
|
||||||
// DialResponsiblePeers mocks base method.
|
// Configuration mocks base method.
|
||||||
func (m *MockConfConnector) DialResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) {
|
func (m *MockConfConnector) Configuration() nodeconf.Configuration {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "DialResponsiblePeers", arg0, arg1)
|
ret := m.ctrl.Call(m, "Configuration")
|
||||||
|
ret0, _ := ret[0].(nodeconf.Configuration)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configuration indicates an expected call of Configuration.
|
||||||
|
func (mr *MockConfConnectorMockRecorder) Configuration() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configuration", reflect.TypeOf((*MockConfConnector)(nil).Configuration))
|
||||||
|
}
|
||||||
|
|
||||||
|
// DialInactiveResponsiblePeers mocks base method.
|
||||||
|
func (m *MockConfConnector) DialInactiveResponsiblePeers(arg0 context.Context, arg1 string, arg2 []string) ([]peer.Peer, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "DialInactiveResponsiblePeers", arg0, arg1, arg2)
|
||||||
ret0, _ := ret[0].([]peer.Peer)
|
ret0, _ := ret[0].([]peer.Peer)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
// DialResponsiblePeers indicates an expected call of DialResponsiblePeers.
|
// DialInactiveResponsiblePeers indicates an expected call of DialInactiveResponsiblePeers.
|
||||||
func (mr *MockConfConnectorMockRecorder) DialResponsiblePeers(arg0, arg1 interface{}) *gomock.Call {
|
func (mr *MockConfConnectorMockRecorder) DialInactiveResponsiblePeers(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialResponsiblePeers), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialInactiveResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialInactiveResponsiblePeers), arg0, arg1, arg2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetResponsiblePeers mocks base method.
|
// GetResponsiblePeers mocks base method.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user