Fixing tests wip
This commit is contained in:
parent
3eade57cba
commit
791ecc56b0
@ -0,0 +1,73 @@
|
|||||||
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
|
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice (interfaces: ActionQueue)
|
||||||
|
|
||||||
|
// Package mock_syncservice is a generated GoMock package.
|
||||||
|
package mock_syncservice
|
||||||
|
|
||||||
|
import (
|
||||||
|
reflect "reflect"
|
||||||
|
|
||||||
|
syncservice "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||||
|
gomock "github.com/golang/mock/gomock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MockActionQueue is a mock of ActionQueue interface.
|
||||||
|
type MockActionQueue struct {
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
recorder *MockActionQueueMockRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockActionQueueMockRecorder is the mock recorder for MockActionQueue.
|
||||||
|
type MockActionQueueMockRecorder struct {
|
||||||
|
mock *MockActionQueue
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMockActionQueue creates a new mock instance.
|
||||||
|
func NewMockActionQueue(ctrl *gomock.Controller) *MockActionQueue {
|
||||||
|
mock := &MockActionQueue{ctrl: ctrl}
|
||||||
|
mock.recorder = &MockActionQueueMockRecorder{mock}
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||||
|
func (m *MockActionQueue) EXPECT() *MockActionQueueMockRecorder {
|
||||||
|
return m.recorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close mocks base method.
|
||||||
|
func (m *MockActionQueue) Close() {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "Close")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close indicates an expected call of Close.
|
||||||
|
func (mr *MockActionQueueMockRecorder) Close() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockActionQueue)(nil).Close))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run mocks base method.
|
||||||
|
func (m *MockActionQueue) Run() {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "Run")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run indicates an expected call of Run.
|
||||||
|
func (mr *MockActionQueueMockRecorder) Run() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockActionQueue)(nil).Run))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send mocks base method.
|
||||||
|
func (m *MockActionQueue) Send(arg0 syncservice.ActionFunc) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Send", arg0)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send indicates an expected call of Send.
|
||||||
|
func (mr *MockActionQueueMockRecorder) Send(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockActionQueue)(nil).Send), arg0)
|
||||||
|
}
|
||||||
@ -1,3 +1,4 @@
|
|||||||
|
//go:generate mockgen -destination mock_syncservice/mock_syncservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice ActionQueue
|
||||||
package syncservice
|
package syncservice
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
// Code generated by MockGen. DO NOT EDIT.
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree (interfaces: SyncClient,SyncTree)
|
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree (interfaces: SyncClient,SyncTree,ReceiveQueue)
|
||||||
|
|
||||||
// Package mock_synctree is a generated GoMock package.
|
// Package mock_synctree is a generated GoMock package.
|
||||||
package mock_synctree
|
package mock_synctree
|
||||||
@ -177,23 +177,18 @@ func (mr *MockSyncTreeMockRecorder) AddContent(arg0, arg1 interface{}) *gomock.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AddRawChanges mocks base method.
|
// AddRawChanges mocks base method.
|
||||||
func (m *MockSyncTree) AddRawChanges(arg0 context.Context, arg1 ...*treechangeproto.RawTreeChangeWithId) (tree.AddResult, error) {
|
func (m *MockSyncTree) AddRawChanges(arg0 context.Context, arg1 tree.RawChangesPayload) (tree.AddResult, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
varargs := []interface{}{arg0}
|
ret := m.ctrl.Call(m, "AddRawChanges", arg0, arg1)
|
||||||
for _, a := range arg1 {
|
|
||||||
varargs = append(varargs, a)
|
|
||||||
}
|
|
||||||
ret := m.ctrl.Call(m, "AddRawChanges", varargs...)
|
|
||||||
ret0, _ := ret[0].(tree.AddResult)
|
ret0, _ := ret[0].(tree.AddResult)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRawChanges indicates an expected call of AddRawChanges.
|
// AddRawChanges indicates an expected call of AddRawChanges.
|
||||||
func (mr *MockSyncTreeMockRecorder) AddRawChanges(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
|
func (mr *MockSyncTreeMockRecorder) AddRawChanges(arg0, arg1 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
varargs := append([]interface{}{arg0}, arg1...)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockSyncTree)(nil).AddRawChanges), arg0, arg1)
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockSyncTree)(nil).AddRawChanges), varargs...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChangesAfterCommonSnapshot mocks base method.
|
// ChangesAfterCommonSnapshot mocks base method.
|
||||||
@ -459,3 +454,68 @@ func (mr *MockSyncTreeMockRecorder) Unlock() *gomock.Call {
|
|||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockSyncTree)(nil).Unlock))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockSyncTree)(nil).Unlock))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MockReceiveQueue is a mock of ReceiveQueue interface.
|
||||||
|
type MockReceiveQueue struct {
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
recorder *MockReceiveQueueMockRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockReceiveQueueMockRecorder is the mock recorder for MockReceiveQueue.
|
||||||
|
type MockReceiveQueueMockRecorder struct {
|
||||||
|
mock *MockReceiveQueue
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMockReceiveQueue creates a new mock instance.
|
||||||
|
func NewMockReceiveQueue(ctrl *gomock.Controller) *MockReceiveQueue {
|
||||||
|
mock := &MockReceiveQueue{ctrl: ctrl}
|
||||||
|
mock.recorder = &MockReceiveQueueMockRecorder{mock}
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||||
|
func (m *MockReceiveQueue) EXPECT() *MockReceiveQueueMockRecorder {
|
||||||
|
return m.recorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddMessage mocks base method.
|
||||||
|
func (m *MockReceiveQueue) AddMessage(arg0 string, arg1 *treechangeproto.TreeSyncMessage, arg2 string) bool {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "AddMessage", arg0, arg1, arg2)
|
||||||
|
ret0, _ := ret[0].(bool)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddMessage indicates an expected call of AddMessage.
|
||||||
|
func (mr *MockReceiveQueueMockRecorder) AddMessage(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddMessage", reflect.TypeOf((*MockReceiveQueue)(nil).AddMessage), arg0, arg1, arg2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearQueue mocks base method.
|
||||||
|
func (m *MockReceiveQueue) ClearQueue(arg0 string) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "ClearQueue", arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearQueue indicates an expected call of ClearQueue.
|
||||||
|
func (mr *MockReceiveQueueMockRecorder) ClearQueue(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearQueue", reflect.TypeOf((*MockReceiveQueue)(nil).ClearQueue), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMessage mocks base method.
|
||||||
|
func (m *MockReceiveQueue) GetMessage(arg0 string) (*treechangeproto.TreeSyncMessage, string, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "GetMessage", arg0)
|
||||||
|
ret0, _ := ret[0].(*treechangeproto.TreeSyncMessage)
|
||||||
|
ret1, _ := ret[1].(string)
|
||||||
|
ret2, _ := ret[2].(error)
|
||||||
|
return ret0, ret1, ret2
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMessage indicates an expected call of GetMessage.
|
||||||
|
func (mr *MockReceiveQueueMockRecorder) GetMessage(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMessage", reflect.TypeOf((*MockReceiveQueue)(nil).GetMessage), arg0)
|
||||||
|
}
|
||||||
|
|||||||
@ -2,44 +2,50 @@ package synctree
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ReceiveQueue interface {
|
type ReceiveQueue interface {
|
||||||
AddMessage(senderId string, msg treeMsg) (queueFull bool)
|
AddMessage(senderId string, msg *treechangeproto.TreeSyncMessage, replyId string) (queueFull bool)
|
||||||
GetMessage(senderId string) (msg treeMsg, err error)
|
GetMessage(senderId string) (msg *treechangeproto.TreeSyncMessage, replyId string, err error)
|
||||||
ClearQueue(senderId string)
|
ClearQueue(senderId string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type queueMsg struct {
|
||||||
|
replyId string
|
||||||
|
syncMessage *treechangeproto.TreeSyncMessage
|
||||||
|
}
|
||||||
|
|
||||||
type receiveQueue struct {
|
type receiveQueue struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
handlerMap map[string][]treeMsg
|
handlerMap map[string][]queueMsg
|
||||||
maxSize int
|
maxSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newReceiveQueue(maxSize int) ReceiveQueue {
|
func newReceiveQueue(maxSize int) ReceiveQueue {
|
||||||
return &receiveQueue{
|
return &receiveQueue{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
handlerMap: map[string][]treeMsg{},
|
handlerMap: map[string][]queueMsg{},
|
||||||
maxSize: maxSize,
|
maxSize: maxSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errEmptyQueue = errors.New("the queue is empty")
|
var errEmptyQueue = errors.New("the queue is empty")
|
||||||
|
|
||||||
func (q *receiveQueue) AddMessage(senderId string, msg treeMsg) (queueFull bool) {
|
func (q *receiveQueue) AddMessage(senderId string, msg *treechangeproto.TreeSyncMessage, replyId string) (queueFull bool) {
|
||||||
q.Lock()
|
q.Lock()
|
||||||
defer q.Unlock()
|
defer q.Unlock()
|
||||||
|
|
||||||
queue := q.handlerMap[senderId]
|
queue := q.handlerMap[senderId]
|
||||||
queueFull = len(queue) >= maxQueueSize
|
queueFull = len(queue) >= maxQueueSize
|
||||||
queue = append(queue, msg)
|
queue = append(queue, queueMsg{replyId, msg})
|
||||||
q.handlerMap[senderId] = queue
|
q.handlerMap[senderId] = queue
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *receiveQueue) GetMessage(senderId string) (msg treeMsg, err error) {
|
func (q *receiveQueue) GetMessage(senderId string) (msg *treechangeproto.TreeSyncMessage, replyId string, err error) {
|
||||||
q.Lock()
|
q.Lock()
|
||||||
defer q.Unlock()
|
defer q.Unlock()
|
||||||
|
|
||||||
@ -48,7 +54,9 @@ func (q *receiveQueue) GetMessage(senderId string) (msg treeMsg, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
msg = q.handlerMap[senderId][0]
|
qMsg := q.handlerMap[senderId][0]
|
||||||
|
msg = qMsg.syncMessage
|
||||||
|
replyId = qMsg.replyId
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
//go:generate mockgen -destination mock_synctree/mock_synctree.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree SyncClient,SyncTree
|
//go:generate mockgen -destination mock_synctree/mock_synctree.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree SyncClient,SyncTree,ReceiveQueue
|
||||||
package synctree
|
package synctree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|||||||
@ -37,6 +37,12 @@ func (s syncTreeMatcher) String() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func syncClientFuncCreator(client SyncClient) func(spaceId string, factory RequestFactory, syncService syncservice.SyncService, configuration nodeconf.Configuration) SyncClient {
|
||||||
|
return func(spaceId string, factory RequestFactory, syncService syncservice.SyncService, configuration nodeconf.Configuration) SyncClient {
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func Test_DeriveSyncTree(t *testing.T) {
|
func Test_DeriveSyncTree(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
@ -53,9 +59,7 @@ func Test_DeriveSyncTree(t *testing.T) {
|
|||||||
require.Equal(t, expectedPayload, payload)
|
require.Equal(t, expectedPayload, payload)
|
||||||
return objTreeMock, nil
|
return objTreeMock, nil
|
||||||
}
|
}
|
||||||
createSyncClient = func(spaceId string, pool syncservice.StreamPool, factory RequestFactory, configuration nodeconf.Configuration, checker syncservice.StreamChecker) SyncClient {
|
createSyncClient = syncClientFuncCreator(syncClientMock)
|
||||||
return syncClientMock
|
|
||||||
}
|
|
||||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
|
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
|
||||||
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
||||||
@ -87,9 +91,8 @@ func Test_CreateSyncTree(t *testing.T) {
|
|||||||
require.Equal(t, expectedPayload, payload)
|
require.Equal(t, expectedPayload, payload)
|
||||||
return objTreeMock, nil
|
return objTreeMock, nil
|
||||||
}
|
}
|
||||||
createSyncClient = func(spaceId string, pool syncservice.StreamPool, factory RequestFactory, configuration nodeconf.Configuration, checker syncservice.StreamChecker) SyncClient {
|
|
||||||
return syncClientMock
|
createSyncClient = syncClientFuncCreator(syncClientMock)
|
||||||
}
|
|
||||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
|
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
|
||||||
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
||||||
@ -124,40 +127,54 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||||
t.Run("AddRawChanges update", func(t *testing.T) {
|
t.Run("AddRawChanges update", func(t *testing.T) {
|
||||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||||
|
payload := tree.RawChangesPayload{
|
||||||
|
NewHeads: nil,
|
||||||
|
RawChanges: changes,
|
||||||
|
}
|
||||||
expectedRes := tree.AddResult{
|
expectedRes := tree.AddResult{
|
||||||
Added: changes,
|
Added: changes,
|
||||||
Mode: tree.Append,
|
Mode: tree.Append,
|
||||||
}
|
}
|
||||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(changes)).
|
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||||
Return(expectedRes, nil)
|
Return(expectedRes, nil)
|
||||||
updateListenerMock.EXPECT().Update(tr)
|
updateListenerMock.EXPECT().Update(tr)
|
||||||
|
|
||||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
||||||
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
||||||
res, err := tr.AddRawChanges(ctx, changes...)
|
res, err := tr.AddRawChanges(ctx, payload)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedRes, res)
|
require.Equal(t, expectedRes, res)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("AddRawChanges rebuild", func(t *testing.T) {
|
t.Run("AddRawChanges rebuild", func(t *testing.T) {
|
||||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||||
|
payload := tree.RawChangesPayload{
|
||||||
|
NewHeads: nil,
|
||||||
|
RawChanges: changes,
|
||||||
|
}
|
||||||
|
|
||||||
expectedRes := tree.AddResult{
|
expectedRes := tree.AddResult{
|
||||||
Added: changes,
|
Added: changes,
|
||||||
Mode: tree.Rebuild,
|
Mode: tree.Rebuild,
|
||||||
}
|
}
|
||||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(changes)).
|
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||||
Return(expectedRes, nil)
|
Return(expectedRes, nil)
|
||||||
updateListenerMock.EXPECT().Rebuild(tr)
|
updateListenerMock.EXPECT().Rebuild(tr)
|
||||||
|
|
||||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
||||||
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
||||||
res, err := tr.AddRawChanges(ctx, changes...)
|
res, err := tr.AddRawChanges(ctx, payload)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedRes, res)
|
require.Equal(t, expectedRes, res)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("AddRawChanges nothing", func(t *testing.T) {
|
t.Run("AddRawChanges nothing", func(t *testing.T) {
|
||||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||||
|
payload := tree.RawChangesPayload{
|
||||||
|
NewHeads: nil,
|
||||||
|
RawChanges: changes,
|
||||||
|
}
|
||||||
|
|
||||||
expectedRes := tree.AddResult{
|
expectedRes := tree.AddResult{
|
||||||
Added: changes,
|
Added: changes,
|
||||||
Mode: tree.Nothing,
|
Mode: tree.Nothing,
|
||||||
@ -165,7 +182,7 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(changes)).
|
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(changes)).
|
||||||
Return(expectedRes, nil)
|
Return(expectedRes, nil)
|
||||||
|
|
||||||
res, err := tr.AddRawChanges(ctx, changes...)
|
res, err := tr.AddRawChanges(ctx, payload)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedRes, res)
|
require.Equal(t, expectedRes, res)
|
||||||
})
|
})
|
||||||
|
|||||||
@ -21,11 +21,6 @@ type syncTreeHandler struct {
|
|||||||
|
|
||||||
const maxQueueSize = 5
|
const maxQueueSize = 5
|
||||||
|
|
||||||
type treeMsg struct {
|
|
||||||
replyId string
|
|
||||||
syncMessage *treechangeproto.TreeSyncMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler {
|
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler {
|
||||||
return &syncTreeHandler{
|
return &syncTreeHandler{
|
||||||
objTree: objTree,
|
objTree: objTree,
|
||||||
@ -42,7 +37,7 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queueFull := s.queue.AddMessage(senderId, treeMsg{msg.ReplyId, unmarshalled})
|
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.ReplyId)
|
||||||
if queueFull {
|
if queueFull {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -53,19 +48,19 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
|
|||||||
func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) {
|
func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) {
|
||||||
s.objTree.Lock()
|
s.objTree.Lock()
|
||||||
defer s.objTree.Unlock()
|
defer s.objTree.Unlock()
|
||||||
msg, err := s.queue.GetMessage(senderId)
|
msg, replyId, err := s.queue.GetMessage(senderId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer s.queue.ClearQueue(senderId)
|
defer s.queue.ClearQueue(senderId)
|
||||||
|
|
||||||
content := msg.syncMessage.GetContent()
|
content := msg.GetContent()
|
||||||
switch {
|
switch {
|
||||||
case content.GetHeadUpdate() != nil:
|
case content.GetHeadUpdate() != nil:
|
||||||
return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), msg.replyId)
|
return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), replyId)
|
||||||
case content.GetFullSyncRequest() != nil:
|
case content.GetFullSyncRequest() != nil:
|
||||||
return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), msg.replyId)
|
return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), replyId)
|
||||||
case content.GetFullSyncResponse() != nil:
|
case content.GetFullSyncResponse() != nil:
|
||||||
return s.handleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
|
return s.handleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -40,8 +40,13 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
||||||
objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl))
|
objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl))
|
||||||
|
receiveQueueMock := mock_synctree.NewMockReceiveQueue(ctrl)
|
||||||
|
|
||||||
syncHandler := newSyncTreeHandler(objectTreeMock, syncClientMock)
|
syncHandler := &syncTreeHandler{
|
||||||
|
objTree: objectTreeMock,
|
||||||
|
syncClient: syncClientMock,
|
||||||
|
queue: receiveQueueMock,
|
||||||
|
}
|
||||||
log = zap.NewNop().Sugar()
|
log = zap.NewNop().Sugar()
|
||||||
|
|
||||||
t.Run("head update non empty all heads added", func(t *testing.T) {
|
t.Run("head update non empty all heads added", func(t *testing.T) {
|
||||||
@ -56,16 +61,22 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
|||||||
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
|
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
|
||||||
objectMsg, _ := marshallTreeMessage(treeMsg, treeId, "")
|
objectMsg, _ := marshallTreeMessage(treeMsg, treeId, "")
|
||||||
|
|
||||||
|
receiveQueueMock.EXPECT().AddMessage(senderId, gomock.Eq(treeMsg), "").
|
||||||
|
Return(false)
|
||||||
|
receiveQueueMock.EXPECT().GetMessage(senderId).Return(treeMsg, "", nil)
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
ID().AnyTimes().Return(treeId)
|
ID().AnyTimes().Return(treeId)
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
Heads().
|
Heads().
|
||||||
Return([]string{"h2"})
|
Return([]string{"h2"}).Times(2)
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
HasChanges(gomock.Eq([]string{"h1"})).
|
HasChanges(gomock.Eq([]string{"h1"})).
|
||||||
Return(false)
|
Return(false)
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})).
|
AddRawChanges(gomock.Any(), gomock.Eq(tree.RawChangesPayload{
|
||||||
|
NewHeads: []string{"h1"},
|
||||||
|
RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId},
|
||||||
|
})).
|
||||||
Return(tree.AddResult{}, nil)
|
Return(tree.AddResult{}, nil)
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
Heads().
|
Heads().
|
||||||
@ -73,6 +84,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
|||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
HasChanges(gomock.Eq([]string{"h1"})).
|
HasChanges(gomock.Eq([]string{"h1"})).
|
||||||
Return(true)
|
Return(true)
|
||||||
|
receiveQueueMock.EXPECT().ClearQueue(senderId)
|
||||||
|
|
||||||
err := syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -90,32 +102,33 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
|||||||
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
|
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
|
||||||
objectMsg, _ := marshallTreeMessage(treeMsg, treeId, "")
|
objectMsg, _ := marshallTreeMessage(treeMsg, treeId, "")
|
||||||
fullRequest := &treechangeproto.TreeSyncMessage{}
|
fullRequest := &treechangeproto.TreeSyncMessage{}
|
||||||
|
receiveQueueMock.EXPECT().AddMessage(senderId, gomock.Eq(treeMsg), "").
|
||||||
|
Return(false)
|
||||||
|
receiveQueueMock.EXPECT().GetMessage(senderId).Return(treeMsg, "", nil)
|
||||||
|
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
ID().AnyTimes().Return(treeId)
|
ID().AnyTimes().Return(treeId)
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
Heads().
|
Heads().
|
||||||
Return([]string{"h2"})
|
Return([]string{"h2"}).AnyTimes()
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
HasChanges(gomock.Eq([]string{"h1"})).
|
HasChanges(gomock.Eq([]string{"h1"})).
|
||||||
Return(false)
|
Return(false)
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})).
|
AddRawChanges(gomock.Any(), gomock.Eq(tree.RawChangesPayload{
|
||||||
|
NewHeads: []string{"h1"},
|
||||||
|
RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId},
|
||||||
|
})).
|
||||||
Return(tree.AddResult{}, nil)
|
Return(tree.AddResult{}, nil)
|
||||||
objectTreeMock.EXPECT().
|
|
||||||
Heads().
|
|
||||||
Return([]string{"h2"})
|
|
||||||
objectTreeMock.EXPECT().
|
objectTreeMock.EXPECT().
|
||||||
HasChanges(gomock.Eq([]string{"h1"})).
|
HasChanges(gomock.Eq([]string{"h1"})).
|
||||||
Return(false)
|
Return(false)
|
||||||
syncClientMock.EXPECT().
|
syncClientMock.EXPECT().
|
||||||
CreateFullSyncRequest(gomock.Eq(objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
CreateFullSyncRequest(gomock.Eq(objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
||||||
Return(fullRequest, nil)
|
Return(fullRequest, nil)
|
||||||
objectTreeMock.EXPECT().
|
|
||||||
Heads().
|
|
||||||
Return([]string{"h2"})
|
|
||||||
|
|
||||||
syncClientMock.EXPECT().SendAsync(gomock.Eq(senderId), gomock.Eq(fullRequest), gomock.Eq(""))
|
syncClientMock.EXPECT().SendAsync(gomock.Eq(senderId), gomock.Eq(fullRequest), gomock.Eq(""))
|
||||||
|
receiveQueueMock.EXPECT().ClearQueue(senderId)
|
||||||
|
|
||||||
err := syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -204,8 +217,13 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
||||||
objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl))
|
objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl))
|
||||||
|
receiveQueueMock := mock_synctree.NewMockReceiveQueue(ctrl)
|
||||||
|
syncHandler := &syncTreeHandler{
|
||||||
|
objTree: objectTreeMock,
|
||||||
|
syncClient: syncClientMock,
|
||||||
|
queue: receiveQueueMock,
|
||||||
|
}
|
||||||
|
|
||||||
syncHandler := newSyncTreeHandler(objectTreeMock, syncClientMock)
|
|
||||||
log = zap.NewNop().Sugar()
|
log = zap.NewNop().Sugar()
|
||||||
t.Run("full sync request with change", func(t *testing.T) {
|
t.Run("full sync request with change", func(t *testing.T) {
|
||||||
treeId := "treeId"
|
treeId := "treeId"
|
||||||
@ -324,8 +342,13 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
||||||
objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl))
|
objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl))
|
||||||
|
receiveQueueMock := mock_synctree.NewMockReceiveQueue(ctrl)
|
||||||
|
syncHandler := &syncTreeHandler{
|
||||||
|
objTree: objectTreeMock,
|
||||||
|
syncClient: syncClientMock,
|
||||||
|
queue: receiveQueueMock,
|
||||||
|
}
|
||||||
|
|
||||||
syncHandler := newSyncTreeHandler(objectTreeMock, syncClientMock)
|
|
||||||
log = zap.NewNop().Sugar()
|
log = zap.NewNop().Sugar()
|
||||||
|
|
||||||
t.Run("full sync response with change", func(t *testing.T) {
|
t.Run("full sync response with change", func(t *testing.T) {
|
||||||
|
|||||||
@ -53,23 +53,18 @@ func (mr *MockObjectTreeMockRecorder) AddContent(arg0, arg1 interface{}) *gomock
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AddRawChanges mocks base method.
|
// AddRawChanges mocks base method.
|
||||||
func (m *MockObjectTree) AddRawChanges(arg0 context.Context, arg1 ...*treechangeproto.RawTreeChangeWithId) (tree.AddResult, error) {
|
func (m *MockObjectTree) AddRawChanges(arg0 context.Context, arg1 tree.RawChangesPayload) (tree.AddResult, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
varargs := []interface{}{arg0}
|
ret := m.ctrl.Call(m, "AddRawChanges", arg0, arg1)
|
||||||
for _, a := range arg1 {
|
|
||||||
varargs = append(varargs, a)
|
|
||||||
}
|
|
||||||
ret := m.ctrl.Call(m, "AddRawChanges", varargs...)
|
|
||||||
ret0, _ := ret[0].(tree.AddResult)
|
ret0, _ := ret[0].(tree.AddResult)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRawChanges indicates an expected call of AddRawChanges.
|
// AddRawChanges indicates an expected call of AddRawChanges.
|
||||||
func (mr *MockObjectTreeMockRecorder) AddRawChanges(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
|
func (mr *MockObjectTreeMockRecorder) AddRawChanges(arg0, arg1 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
varargs := append([]interface{}{arg0}, arg1...)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockObjectTree)(nil).AddRawChanges), arg0, arg1)
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockObjectTree)(nil).AddRawChanges), varargs...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChangesAfterCommonSnapshot mocks base method.
|
// ChangesAfterCommonSnapshot mocks base method.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user