Merge pull request #9 from anytypeio/nodesync
This commit is contained in:
commit
f2ff10e3f7
@ -51,7 +51,7 @@ var hashersPool = &sync.Pool{
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrElementNotFound = errors.New("element not found")
|
var ErrElementNotFound = errors.New("ldiff: element not found")
|
||||||
|
|
||||||
// Element of data
|
// Element of data
|
||||||
type Element struct {
|
type Element struct {
|
||||||
@ -88,10 +88,14 @@ type Diff interface {
|
|||||||
Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error)
|
Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error)
|
||||||
// Elements retrieves all elements in the Diff
|
// Elements retrieves all elements in the Diff
|
||||||
Elements() []Element
|
Elements() []Element
|
||||||
|
// Element returns an element by id
|
||||||
|
Element(id string) (Element, error)
|
||||||
// Ids retrieves ids of all elements in the Diff
|
// Ids retrieves ids of all elements in the Diff
|
||||||
Ids() []string
|
Ids() []string
|
||||||
// Hash returns hash of all elements in the diff
|
// Hash returns hash of all elements in the diff
|
||||||
Hash() string
|
Hash() string
|
||||||
|
// Len returns count of elements in the diff
|
||||||
|
Len() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remote interface for using in the Diff
|
// Remote interface for using in the Diff
|
||||||
@ -157,6 +161,12 @@ func (d *diff) Ids() (ids []string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *diff) Len() int {
|
||||||
|
d.mu.RLock()
|
||||||
|
defer d.mu.RUnlock()
|
||||||
|
return d.sl.Len()
|
||||||
|
}
|
||||||
|
|
||||||
func (d *diff) Elements() (elements []Element) {
|
func (d *diff) Elements() (elements []Element) {
|
||||||
d.mu.RLock()
|
d.mu.RLock()
|
||||||
defer d.mu.RUnlock()
|
defer d.mu.RUnlock()
|
||||||
@ -172,6 +182,19 @@ func (d *diff) Elements() (elements []Element) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *diff) Element(id string) (Element, error) {
|
||||||
|
d.mu.RLock()
|
||||||
|
defer d.mu.RUnlock()
|
||||||
|
el := d.sl.Get(&element{Element: Element{Id: id}, hash: xxhash.Sum64([]byte(id))})
|
||||||
|
if el == nil {
|
||||||
|
return Element{}, ErrElementNotFound
|
||||||
|
}
|
||||||
|
if e, ok := el.Key().(*element); ok {
|
||||||
|
return e.Element, nil
|
||||||
|
}
|
||||||
|
return Element{}, ErrElementNotFound
|
||||||
|
}
|
||||||
|
|
||||||
func (d *diff) Hash() string {
|
func (d *diff) Hash() string {
|
||||||
d.mu.RLock()
|
d.mu.RLock()
|
||||||
defer d.mu.RUnlock()
|
defer d.mu.RUnlock()
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"gopkg.in/mgo.v2/bson"
|
"gopkg.in/mgo.v2/bson"
|
||||||
"math"
|
"math"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -148,3 +149,51 @@ func TestDiff_Hash(t *testing.T) {
|
|||||||
assert.NotEmpty(t, h2)
|
assert.NotEmpty(t, h2)
|
||||||
assert.NotEqual(t, h1, h2)
|
assert.NotEqual(t, h1, h2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDiff_Element(t *testing.T) {
|
||||||
|
d := New(16, 16)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
d.Set(Element{Id: fmt.Sprint("id", i), Head: fmt.Sprint("head", i)})
|
||||||
|
}
|
||||||
|
_, err := d.Element("not found")
|
||||||
|
assert.Equal(t, ErrElementNotFound, err)
|
||||||
|
|
||||||
|
el, err := d.Element("id5")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "head5", el.Head)
|
||||||
|
|
||||||
|
d.Set(Element{"id5", "otherHead"})
|
||||||
|
el, err = d.Element("id5")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "otherHead", el.Head)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDiff_Ids(t *testing.T) {
|
||||||
|
d := New(16, 16)
|
||||||
|
var ids []string
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
id := fmt.Sprint("id", i)
|
||||||
|
d.Set(Element{Id: id, Head: fmt.Sprint("head", i)})
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
gotIds := d.Ids()
|
||||||
|
sort.Strings(gotIds)
|
||||||
|
assert.Equal(t, ids, gotIds)
|
||||||
|
assert.Equal(t, len(ids), d.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDiff_Elements(t *testing.T) {
|
||||||
|
d := New(16, 16)
|
||||||
|
var els []Element
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
id := fmt.Sprint("id", i)
|
||||||
|
el := Element{Id: id, Head: fmt.Sprint("head", i)}
|
||||||
|
d.Set(el)
|
||||||
|
els = append(els, el)
|
||||||
|
}
|
||||||
|
gotEls := d.Elements()
|
||||||
|
sort.Slice(gotEls, func(i, j int) bool {
|
||||||
|
return gotEls[i].Id < gotEls[j].Id
|
||||||
|
})
|
||||||
|
assert.Equal(t, els, gotEls)
|
||||||
|
}
|
||||||
|
|||||||
@ -52,6 +52,21 @@ func (mr *MockDiffMockRecorder) Diff(arg0, arg1 interface{}) *gomock.Call {
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Diff", reflect.TypeOf((*MockDiff)(nil).Diff), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Diff", reflect.TypeOf((*MockDiff)(nil).Diff), arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Element mocks base method.
|
||||||
|
func (m *MockDiff) Element(arg0 string) (ldiff.Element, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Element", arg0)
|
||||||
|
ret0, _ := ret[0].(ldiff.Element)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Element indicates an expected call of Element.
|
||||||
|
func (mr *MockDiffMockRecorder) Element(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Element", reflect.TypeOf((*MockDiff)(nil).Element), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
// Elements mocks base method.
|
// Elements mocks base method.
|
||||||
func (m *MockDiff) Elements() []ldiff.Element {
|
func (m *MockDiff) Elements() []ldiff.Element {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
@ -94,6 +109,20 @@ func (mr *MockDiffMockRecorder) Ids() *gomock.Call {
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ids", reflect.TypeOf((*MockDiff)(nil).Ids))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ids", reflect.TypeOf((*MockDiff)(nil).Ids))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Len mocks base method.
|
||||||
|
func (m *MockDiff) Len() int {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Len")
|
||||||
|
ret0, _ := ret[0].(int)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len indicates an expected call of Len.
|
||||||
|
func (mr *MockDiffMockRecorder) Len() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Len", reflect.TypeOf((*MockDiff)(nil).Len))
|
||||||
|
}
|
||||||
|
|
||||||
// Ranges mocks base method.
|
// Ranges mocks base method.
|
||||||
func (m *MockDiff) Ranges(arg0 context.Context, arg1 []ldiff.Range, arg2 []ldiff.RangeResult) ([]ldiff.RangeResult, error) {
|
func (m *MockDiff) Ranges(arg0 context.Context, arg1 []ldiff.Range, arg2 []ldiff.RangeResult) ([]ldiff.RangeResult, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|||||||
@ -15,12 +15,15 @@ import (
|
|||||||
var ErrCantConnect = errors.New("can't connect to test server")
|
var ErrCantConnect = errors.New("can't connect to test server")
|
||||||
|
|
||||||
func NewTestPool() *TestPool {
|
func NewTestPool() *TestPool {
|
||||||
return &TestPool{}
|
return &TestPool{
|
||||||
|
peers: map[string]peer.Peer{},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type TestPool struct {
|
type TestPool struct {
|
||||||
ts *TesServer
|
ts *TesServer
|
||||||
mu sync.Mutex
|
peers map[string]peer.Peer
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestPool) WithServer(ts *TesServer) *TestPool {
|
func (t *TestPool) WithServer(ts *TesServer) *TestPool {
|
||||||
@ -33,6 +36,9 @@ func (t *TestPool) WithServer(ts *TesServer) *TestPool {
|
|||||||
func (t *TestPool) Get(ctx context.Context, id string) (peer.Peer, error) {
|
func (t *TestPool) Get(ctx context.Context, id string) (peer.Peer, error) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
if p, ok := t.peers[id]; ok {
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
if t.ts == nil {
|
if t.ts == nil {
|
||||||
return nil, ErrCantConnect
|
return nil, ErrCantConnect
|
||||||
}
|
}
|
||||||
@ -40,17 +46,17 @@ func (t *TestPool) Get(ctx context.Context, id string) (peer.Peer, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestPool) Dial(ctx context.Context, id string) (peer.Peer, error) {
|
func (t *TestPool) Dial(ctx context.Context, id string) (peer.Peer, error) {
|
||||||
t.mu.Lock()
|
return t.Get(ctx, id)
|
||||||
defer t.mu.Unlock()
|
|
||||||
if t.ts == nil {
|
|
||||||
return nil, ErrCantConnect
|
|
||||||
}
|
|
||||||
return &testPeer{id: id, Conn: t.ts.Dial(ctx)}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestPool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
|
func (t *TestPool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
for _, peerId := range peerIds {
|
||||||
|
if p, ok := t.peers[peerId]; ok {
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
if t.ts == nil {
|
if t.ts == nil {
|
||||||
return nil, ErrCantConnect
|
return nil, ErrCantConnect
|
||||||
}
|
}
|
||||||
@ -66,6 +72,16 @@ func (t *TestPool) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer,
|
|||||||
return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial(ctx)}, nil
|
return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial(ctx)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestPool) NewPool(name string) pool.Pool {
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestPool) AddPeer(p peer.Peer) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
t.peers[p.Id()] = p
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestPool) Init(a *app.App) (err error) {
|
func (t *TestPool) Init(a *app.App) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,6 +2,8 @@ package rpctest
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/anytypeio/any-sync/app"
|
||||||
|
"github.com/anytypeio/any-sync/net/rpc/server"
|
||||||
"net"
|
"net"
|
||||||
"storj.io/drpc"
|
"storj.io/drpc"
|
||||||
"storj.io/drpc/drpcconn"
|
"storj.io/drpc/drpcconn"
|
||||||
@ -22,6 +24,22 @@ type TesServer struct {
|
|||||||
*drpcserver.Server
|
*drpcserver.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ts *TesServer) Init(a *app.App) (err error) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *TesServer) Name() (name string) {
|
||||||
|
return server.CName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *TesServer) Run(ctx context.Context) (err error) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *TesServer) Close(ctx context.Context) (err error) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ts *TesServer) Dial(ctx context.Context) drpc.Conn {
|
func (ts *TesServer) Dial(ctx context.Context) drpc.Conn {
|
||||||
sc, cc := net.Pipe()
|
sc, cc := net.Pipe()
|
||||||
go ts.Server.ServeOne(ctx, sc)
|
go ts.Server.ServeOne(ctx, sc)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user