Merge pull request #4 from anytypeio/net-fixes

Net fixes
This commit is contained in:
Sergey Cherepanov 2023-01-15 22:53:07 +03:00 committed by GitHub
commit bded394761
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 73 additions and 167 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/confconnector/mock_confconnector"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage"
"github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter"
@ -160,7 +161,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
})
t.Run("diff syncer sync space missing", func(t *testing.T) {
aclStorageMock := mock_treestorage.NewMockListStorage(ctrl)
aclStorageMock := mock_liststorage.NewMockListStorage(ctrl)
settingsStorage := mock_treestorage.NewMockTreeStorage(ctrl)
settingsId := "settingsId"
aclRoot := &aclrecordproto.RawAclRecordWithId{

View File

@ -175,21 +175,21 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont
panic(err)
}
err = ot.treeStorage.AddRawChange(rawChange)
err = ot.treeStorage.TransactionAdd([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id})
if err != nil {
return
}
err = ot.treeStorage.SetHeads([]string{objChange.Id})
if err != nil {
return
mode := Append
if content.IsSnapshot {
mode = Rebuild
}
res = AddResult{
OldHeads: oldHeads,
Heads: []string{objChange.Id},
Added: []*treechangeproto.RawTreeChangeWithId{rawChange},
Mode: Append,
Mode: mode,
}
log.With("treeId", ot.id).With("head", objChange.Id).
Debug("finished adding content")
@ -234,6 +234,7 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt
}
func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChangesPayload) (addResult AddResult, err error) {
lastHeadId := ot.tree.lastIteratedHeadId
addResult, err = ot.addRawChanges(ctx, changesPayload)
if err != nil {
return
@ -242,16 +243,12 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang
// reducing tree if we have new roots
ot.tree.reduceTree()
// adding to database all the added changes only after they are good
for _, ch := range addResult.Added {
err = ot.treeStorage.AddRawChange(ch)
if err != nil {
return
}
// that means that we removed the ids while reducing
if _, exists := ot.tree.attached[lastHeadId]; !exists {
addResult.Mode = Rebuild
}
// setting heads
err = ot.treeStorage.SetHeads(ot.tree.Heads())
err = ot.treeStorage.TransactionAdd(addResult.Added, addResult.Heads)
return
}

View File

@ -270,7 +270,8 @@ func TestObjectTree(t *testing.T) {
assert.Equal(t, []string{"0"}, res.OldHeads)
assert.Equal(t, []string{"4"}, res.Heads)
assert.Equal(t, len(rawChanges), len(res.Added))
assert.Equal(t, Append, res.Mode)
// here we have rebuild, because we reduced tree to new snapshot
assert.Equal(t, Rebuild, res.Mode)
// check tree heads
assert.Equal(t, []string{"4"}, objTree.Heads())

View File

@ -7,8 +7,6 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
"github.com/anytypeio/any-sync/util/keys/symmetric"
"github.com/anytypeio/any-sync/util/slice"
"go.uber.org/zap"
"math/rand"
"time"
)
@ -132,22 +130,6 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
if err != nil {
return nil, err
}
storageHeads, err := objTree.treeStorage.Heads()
if err != nil {
return nil, err
}
// comparing rebuilt heads with heads in storage
// in theory it can happen that we didn't set heads because the process has crashed
// therefore we want to set them later
if !slice.UnsortedEquals(storageHeads, objTree.tree.Heads()) {
log.With(zap.Strings("storage", storageHeads), zap.Strings("rebuilt", objTree.tree.Heads())).
Errorf("the heads in storage and objTree are different")
err = objTree.treeStorage.SetHeads(objTree.tree.Heads())
if err != nil {
return nil, err
}
}
objTree.id = objTree.treeStorage.Id()
objTree.rawRoot, err = objTree.treeStorage.Root()

View File

@ -16,6 +16,17 @@ type inMemoryTreeStorage struct {
sync.RWMutex
}
func (t *inMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
t.RLock()
defer t.RUnlock()
for _, ch := range changes {
t.changes[ch.Id] = ch
}
t.heads = append(t.heads[:0], heads...)
return nil
}
func NewInMemoryTreeStorage(
root *treechangeproto.RawTreeChangeWithId,
heads []string,
@ -61,11 +72,7 @@ func (t *inMemoryTreeStorage) Heads() ([]string, error) {
func (t *inMemoryTreeStorage) SetHeads(heads []string) error {
t.Lock()
defer t.Unlock()
t.heads = t.heads[:0]
for _, h := range heads {
t.heads = append(t.heads, h)
}
t.heads = append(t.heads[:0], heads...)
return nil
}

View File

@ -1,128 +1,17 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/any-sync/pkg/acl/storage (interfaces: ListStorage,TreeStorage)
// Source: github.com/anytypeio/any-sync/commonspace/object/tree/treestorage (interfaces: TreeStorage)
// Package mock_storage is a generated GoMock package.
// Package mock_treestorage is a generated GoMock package.
package mock_treestorage
import (
context "context"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
reflect "reflect"
treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
gomock "github.com/golang/mock/gomock"
)
// MockListStorage is a mock of ListStorage interface.
type MockListStorage struct {
ctrl *gomock.Controller
recorder *MockListStorageMockRecorder
}
// MockListStorageMockRecorder is the mock recorder for MockListStorage.
type MockListStorageMockRecorder struct {
mock *MockListStorage
}
// NewMockListStorage creates a new mock instance.
func NewMockListStorage(ctrl *gomock.Controller) *MockListStorage {
mock := &MockListStorage{ctrl: ctrl}
mock.recorder = &MockListStorageMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockListStorage) EXPECT() *MockListStorageMockRecorder {
return m.recorder
}
// AddRawRecord mocks base method.
func (m *MockListStorage) AddRawRecord(arg0 context.Context, arg1 *aclrecordproto.RawAclRecordWithId) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddRawRecord", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// AddRawRecord indicates an expected call of AddRawRecord.
func (mr *MockListStorageMockRecorder) AddRawRecord(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawRecord", reflect.TypeOf((*MockListStorage)(nil).AddRawRecord), arg0, arg1)
}
// GetRawRecord mocks base method.
func (m *MockListStorage) GetRawRecord(arg0 context.Context, arg1 string) (*aclrecordproto.RawAclRecordWithId, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetRawRecord", arg0, arg1)
ret0, _ := ret[0].(*aclrecordproto.RawAclRecordWithId)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetRawRecord indicates an expected call of GetRawRecord.
func (mr *MockListStorageMockRecorder) GetRawRecord(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRawRecord", reflect.TypeOf((*MockListStorage)(nil).GetRawRecord), arg0, arg1)
}
// Head mocks base method.
func (m *MockListStorage) Head() (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Head")
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Head indicates an expected call of Head.
func (mr *MockListStorageMockRecorder) Head() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Head", reflect.TypeOf((*MockListStorage)(nil).Head))
}
// Id mocks base method.
func (m *MockListStorage) Id() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Id")
ret0, _ := ret[0].(string)
return ret0
}
// Id indicates an expected call of Id.
func (mr *MockListStorageMockRecorder) Id() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockListStorage)(nil).Id))
}
// Root mocks base method.
func (m *MockListStorage) Root() (*aclrecordproto.RawAclRecordWithId, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Root")
ret0, _ := ret[0].(*aclrecordproto.RawAclRecordWithId)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Root indicates an expected call of Root.
func (mr *MockListStorageMockRecorder) Root() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockListStorage)(nil).Root))
}
// SetHead mocks base method.
func (m *MockListStorage) SetHead(arg0 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetHead", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// SetHead indicates an expected call of SetHead.
func (mr *MockListStorageMockRecorder) SetHead(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHead", reflect.TypeOf((*MockListStorage)(nil).SetHead), arg0)
}
// MockTreeStorage is a mock of TreeStorage interface.
type MockTreeStorage struct {
ctrl *gomock.Controller
@ -261,3 +150,17 @@ func (mr *MockTreeStorageMockRecorder) SetHeads(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeads", reflect.TypeOf((*MockTreeStorage)(nil).SetHeads), arg0)
}
// TransactionAdd mocks base method.
func (m *MockTreeStorage) TransactionAdd(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TransactionAdd", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// TransactionAdd indicates an expected call of TransactionAdd.
func (mr *MockTreeStorageMockRecorder) TransactionAdd(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionAdd", reflect.TypeOf((*MockTreeStorage)(nil).TransactionAdd), arg0, arg1)
}

View File

@ -1,3 +1,4 @@
//go:generate mockgen -destination mock_treestorage/mock_treestorage.go github.com/anytypeio/any-sync/commonspace/object/tree/treestorage TreeStorage
package treestorage
import (
@ -25,8 +26,9 @@ type TreeStorage interface {
Root() (*treechangeproto.RawTreeChangeWithId, error)
Heads() ([]string, error)
SetHeads(heads []string) error
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
HasChange(ctx context.Context, id string) (bool, error)
Delete() error

View File

@ -51,9 +51,8 @@ func (p *provider) ProvideIds(tr objecttree.ObjectTree, startId string) (ids []s
}
if startId == "" {
err = tr.IterateFrom(tr.Id(), convert, process)
} else {
err = tr.IterateFrom(startId, convert, process)
startId = rootId
}
err = tr.IterateFrom(startId, convert, process)
return
}

View File

@ -78,8 +78,7 @@ func TestProvider_ProvideIds(t *testing.T) {
t.Run("startId is empty", func(t *testing.T) {
ch := &objecttree.Change{Id: "rootId"}
objTree.EXPECT().Root().Return(ch)
objTree.EXPECT().Id().Return("id")
objTree.EXPECT().IterateFrom("id", gomock.Any(), gomock.Any()).Return(nil)
objTree.EXPECT().IterateFrom("rootId", gomock.Any(), gomock.Any()).Return(nil)
_, _, err := prov.ProvideIds(objTree, "")
require.NoError(t, err)
})

View File

@ -24,6 +24,7 @@ import (
"github.com/anytypeio/any-sync/nodeconf"
"github.com/anytypeio/any-sync/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
"github.com/anytypeio/any-sync/util/slice"
"github.com/zeebo/errs"
"go.uber.org/zap"
"strconv"
@ -228,7 +229,9 @@ func (s *space) Storage() spacestorage.SpaceStorage {
}
func (s *space) StoredIds() []string {
return s.headSync.AllIds()
return slice.DiscardFromSlice(s.headSync.AllIds(), func(id string) bool {
return id == s.settingsObject.Id()
})
}
func (s *space) DebugAllHeads() []headsync.TreeHeads {

View File

@ -170,7 +170,7 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string)
return
}
p, err = s.pool.DialOneOf(ctx, lastConfiguration.NodeIds(id))
p, err = s.pool.GetOneOf(ctx, lastConfiguration.NodeIds(id))
if err != nil {
return
}

View File

@ -76,9 +76,8 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro
for _, addr := range addrs {
conn, sc, err = d.handshake(ctx, addr)
if err != nil {
log.Info("can't connect to host", zap.String("addr", addr))
log.Info("can't connect to host", zap.String("addr", addr), zap.Error(err))
} else {
err = nil
break
}
}
@ -99,7 +98,7 @@ func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc
if err != nil {
return
}
log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("per", sc.LocalPeer().String()))
log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("addr", addr))
conn = drpcconn.NewWithOptions(sc, drpcconn.Options{Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{MaximumBufferSize: d.config.Stream.MaxMsgSizeMb * (1 << 20)},
}})

View File

@ -66,13 +66,12 @@ func (s *BaseDrpcServer) serve(ctx context.Context, lis secureservice.ContextLis
return
default:
}
ctx, conn, err := lis.Accept(ctx)
cctx, conn, err := lis.Accept(ctx)
if err != nil {
if isTemporary(err) {
l.Debug("listener temporary accept error", zap.Error(err))
t := time.NewTimer(500 * time.Millisecond)
select {
case <-t.C:
case <-time.After(time.Second):
case <-ctx.Done():
return
}
@ -85,7 +84,7 @@ func (s *BaseDrpcServer) serve(ctx context.Context, lis secureservice.ContextLis
l.Error("listener accept error", zap.Error(err))
return
}
go s.serveConn(ctx, conn)
go s.serveConn(cctx, conn)
}
}

View File

@ -212,3 +212,17 @@ func (mr *MockConfigurationMockRecorder) NodeIds(arg0 interface{}) *gomock.Call
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NodeIds", reflect.TypeOf((*MockConfiguration)(nil).NodeIds), arg0)
}
// Partition mocks base method.
func (m *MockConfiguration) Partition(arg0 string) int {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Partition", arg0)
ret0, _ := ret[0].(int)
return ret0
}
// Partition indicates an expected call of Partition.
func (mr *MockConfigurationMockRecorder) Partition(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Partition", reflect.TypeOf((*MockConfiguration)(nil).Partition), arg0)
}