Merge branch 'main' of github.com:anytypeio/any-sync into any-handshake

This commit is contained in:
Sergey Cherepanov 2023-02-15 20:47:25 +03:00
commit 20768266ca
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
14 changed files with 145 additions and 39 deletions

View File

@ -176,8 +176,12 @@ Load:
} }
c.mu.Unlock() c.mu.Unlock()
if closing { if closing {
<-e.close select {
goto Load case <-ctx.Done():
return nil, ctx.Err()
case <-e.close:
goto Load
}
} }
if load { if load {

View File

@ -5,18 +5,21 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter"
"sync/atomic"
) )
type commonGetter struct { type commonGetter struct {
treegetter.TreeGetter treegetter.TreeGetter
spaceId string spaceId string
reservedObjects []syncobjectgetter.SyncObject reservedObjects []syncobjectgetter.SyncObject
spaceIsClosed *atomic.Bool
} }
func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter { func newCommonGetter(spaceId string, getter treegetter.TreeGetter, spaceIsClosed *atomic.Bool) *commonGetter {
return &commonGetter{ return &commonGetter{
TreeGetter: getter, TreeGetter: getter,
spaceId: spaceId, spaceId: spaceId,
spaceIsClosed: spaceIsClosed,
} }
} }
@ -25,6 +28,9 @@ func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
} }
func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) { func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
if c.spaceIsClosed.Load() {
return nil, ErrSpaceClosed
}
if obj := c.getReservedObject(treeId); obj != nil { if obj := c.getReservedObject(treeId); obj != nil {
return obj.(objecttree.ObjectTree), nil return obj.(objecttree.ObjectTree), nil
} }
@ -41,6 +47,9 @@ func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject
} }
func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) {
if c.spaceIsClosed.Load() {
return nil, ErrSpaceClosed
}
if obj := c.getReservedObject(objectId); obj != nil { if obj := c.getReservedObject(objectId); obj != nil {
return obj, nil return obj, nil
} }

View File

@ -58,7 +58,7 @@ func NewHeadSync(
l := log.With(zap.String("spaceId", spaceId)) l := log.With(zap.String("spaceId", spaceId))
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l)
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l) periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
return &headSync{ return &headSync{
spaceId: spaceId, spaceId: spaceId,

View File

@ -321,6 +321,34 @@ func (mr *MockObjectTreeMockRecorder) Storage() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockObjectTree)(nil).Storage)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockObjectTree)(nil).Storage))
} }
// TryLock mocks base method.
func (m *MockObjectTree) TryLock() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TryLock")
ret0, _ := ret[0].(bool)
return ret0
}
// TryLock indicates an expected call of TryLock.
func (mr *MockObjectTreeMockRecorder) TryLock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryLock", reflect.TypeOf((*MockObjectTree)(nil).TryLock))
}
// TryRLock mocks base method.
func (m *MockObjectTree) TryRLock() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TryRLock")
ret0, _ := ret[0].(bool)
return ret0
}
// TryRLock indicates an expected call of TryRLock.
func (mr *MockObjectTreeMockRecorder) TryRLock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryRLock", reflect.TypeOf((*MockObjectTree)(nil).TryRLock))
}
// Unlock mocks base method. // Unlock mocks base method.
func (m *MockObjectTree) Unlock() { func (m *MockObjectTree) Unlock() {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -4,19 +4,22 @@ package objecttree
import ( import (
"context" "context"
"errors" "errors"
"sync"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
list "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/util/keys/symmetric" "github.com/anytypeio/any-sync/util/keys/symmetric"
"github.com/anytypeio/any-sync/util/slice" "github.com/anytypeio/any-sync/util/slice"
"sync"
) )
type RWLocker interface { type RWLocker interface {
sync.Locker sync.Locker
RLock() RLock()
RUnlock() RUnlock()
TryRLock() bool
TryLock() bool
} }
var ( var (

View File

@ -472,6 +472,34 @@ func (mr *MockSyncTreeMockRecorder) SyncWithPeer(arg0, arg1 interface{}) *gomock
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncTree)(nil).SyncWithPeer), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncTree)(nil).SyncWithPeer), arg0, arg1)
} }
// TryLock mocks base method.
func (m *MockSyncTree) TryLock() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TryLock")
ret0, _ := ret[0].(bool)
return ret0
}
// TryLock indicates an expected call of TryLock.
func (mr *MockSyncTreeMockRecorder) TryLock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryLock", reflect.TypeOf((*MockSyncTree)(nil).TryLock))
}
// TryRLock mocks base method.
func (m *MockSyncTree) TryRLock() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TryRLock")
ret0, _ := ret[0].(bool)
return ret0
}
// TryRLock indicates an expected call of TryRLock.
func (mr *MockSyncTreeMockRecorder) TryRLock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryRLock", reflect.TypeOf((*MockSyncTree)(nil).TryRLock))
}
// Unlock mocks base method. // Unlock mocks base method.
func (m *MockSyncTree) Unlock() { func (m *MockSyncTree) Unlock() {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -15,7 +15,6 @@ import (
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/nodeconf"
"go.uber.org/zap" "go.uber.org/zap"
"sync/atomic"
) )
var ( var (
@ -46,7 +45,7 @@ type syncTree struct {
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
notifiable HeadNotifiable notifiable HeadNotifiable
listener updatelistener.UpdateListener listener updatelistener.UpdateListener
treeUsage *atomic.Int32 onClose func(id string)
isClosed bool isClosed bool
isDeleted bool isDeleted bool
} }
@ -69,7 +68,7 @@ type BuildDeps struct {
AclList list.AclList AclList list.AclList
SpaceStorage spacestorage.SpaceStorage SpaceStorage spacestorage.SpaceStorage
TreeStorage treestorage.TreeStorage TreeStorage treestorage.TreeStorage
TreeUsage *atomic.Int32 OnClose func(id string)
SyncStatus syncstatus.StatusUpdater SyncStatus syncstatus.StatusUpdater
PeerGetter ResponsiblePeersGetter PeerGetter ResponsiblePeersGetter
WaitTreeRemoteSync bool WaitTreeRemoteSync bool
@ -106,7 +105,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable, notifiable: deps.HeadNotifiable,
treeUsage: deps.TreeUsage, onClose: deps.OnClose,
listener: deps.Listener, listener: deps.Listener,
syncStatus: deps.SyncStatus, syncStatus: deps.SyncStatus,
} }
@ -213,7 +212,7 @@ func (s *syncTree) Close() (err error) {
if s.isClosed { if s.isClosed {
return ErrSyncTreeClosed return ErrSyncTreeClosed
} }
s.treeUsage.Add(-1) s.onClose(s.Id())
s.isClosed = true s.isClosed = true
return return
} }
@ -239,7 +238,6 @@ func (s *syncTree) afterBuild() {
if s.listener != nil { if s.listener != nil {
s.listener.Rebuild(s) s.listener.Rebuild(s)
} }
s.treeUsage.Add(1)
if s.notifiable != nil { if s.notifiable != nil {
s.notifiable.UpdateHeads(s.Id(), s.Heads()) s.notifiable.UpdateHeads(s.Id(), s.Heads())
} }

View File

@ -3,6 +3,9 @@ package synctree
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"testing"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree/mock_objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
@ -12,13 +15,11 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"sync"
"testing"
) )
type testObjTreeMock struct { type testObjTreeMock struct {
*mock_objecttree.MockObjectTree *mock_objecttree.MockObjectTree
m sync.Mutex m sync.RWMutex
} }
func newTestObjMock(mockTree *mock_objecttree.MockObjectTree) *testObjTreeMock { func newTestObjMock(mockTree *mock_objecttree.MockObjectTree) *testObjTreeMock {
@ -31,10 +32,26 @@ func (t *testObjTreeMock) Lock() {
t.m.Lock() t.m.Lock()
} }
func (t *testObjTreeMock) RLock() {
t.m.RLock()
}
func (t *testObjTreeMock) Unlock() { func (t *testObjTreeMock) Unlock() {
t.m.Unlock() t.m.Unlock()
} }
func (t *testObjTreeMock) RUnlock() {
t.m.RUnlock()
}
func (t *testObjTreeMock) TryLock() bool {
return t.m.TryLock()
}
func (t *testObjTreeMock) TryRLock() bool {
return t.m.TryRLock()
}
type syncHandlerFixture struct { type syncHandlerFixture struct {
ctrl *gomock.Controller ctrl *gomock.Controller
syncClientMock *mock_synctree.MockSyncClient syncClientMock *mock_synctree.MockSyncClient

View File

@ -127,8 +127,8 @@ type space struct {
handleQueue multiqueue.MultiQueue[HandleMessage] handleQueue multiqueue.MultiQueue[HandleMessage]
isClosed atomic.Bool isClosed *atomic.Bool
treesUsed atomic.Int32 treesUsed *atomic.Int32
} }
func (s *space) LastUsage() time.Time { func (s *space) LastUsage() time.Time {
@ -295,7 +295,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsage: &s.treesUsed, OnClose: func(id string) {},
SyncStatus: s.syncStatus, SyncStatus: s.syncStatus,
PeerGetter: s.peerManager, PeerGetter: s.peerManager,
} }
@ -326,12 +326,16 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
Listener: opts.Listener, Listener: opts.Listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsage: &s.treesUsed, OnClose: s.onObjectClose,
SyncStatus: s.syncStatus, SyncStatus: s.syncStatus,
WaitTreeRemoteSync: opts.WaitTreeRemoteSync, WaitTreeRemoteSync: opts.WaitTreeRemoteSync,
PeerGetter: s.peerManager, PeerGetter: s.peerManager,
} }
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil {
return nil, err
}
s.treesUsed.Add(1)
return
} }
func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) { func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) {
@ -388,10 +392,19 @@ func (s *space) handleMessage(msg HandleMessage) {
} }
if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
if msg.Message.ObjectId != "" {
// cleanup thread on error
_ = s.handleQueue.CloseThread(msg.Message.ObjectId)
}
log.InfoCtx(ctx, "handleMessage error", zap.Error(err)) log.InfoCtx(ctx, "handleMessage error", zap.Error(err))
} }
} }
func (s *space) onObjectClose(id string) {
s.treesUsed.Add(-1)
_ = s.handleQueue.CloseThread(id)
}
func (s *space) Close() error { func (s *space) Close() error {
if s.isClosed.Swap(true) { if s.isClosed.Swap(true) {
log.Warn("call space.Close on closed space", zap.String("id", s.id)) log.Warn("call space.Close on closed space", zap.String("id", s.id))

View File

@ -17,6 +17,7 @@ import (
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/pool"
"github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/nodeconf"
"sync/atomic"
) )
const CName = "common.commonspace" const CName = "common.commonspace"
@ -116,7 +117,8 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
} }
lastConfiguration := s.configurationService.GetLast() lastConfiguration := s.configurationService.GetLast()
getter := newCommonGetter(st.Id(), s.treeGetter) var spaceIsClosed = &atomic.Bool{}
getter := newCommonGetter(st.Id(), s.treeGetter, spaceIsClosed)
syncStatus := syncstatus.NewNoOpSyncStatus() syncStatus := syncstatus.NewNoOpSyncStatus()
// this will work only for clients, not the best solution, but... // this will work only for clients, not the best solution, but...
if !lastConfiguration.IsResponsible(st.Id()) { if !lastConfiguration.IsResponsible(st.Id()) {
@ -141,6 +143,8 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
configuration: lastConfiguration, configuration: lastConfiguration,
peerManager: peerManager, peerManager: peerManager,
storage: st, storage: st,
treesUsed: &atomic.Int32{},
isClosed: spaceIsClosed,
} }
return sp, nil return sp, nil
} }

8
go.mod
View File

@ -19,7 +19,7 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipld-format v0.4.0 github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-merkledag v0.9.0 github.com/ipfs/go-merkledag v0.9.0
github.com/ipfs/go-unixfs v0.4.2 github.com/ipfs/go-unixfs v0.4.3
github.com/libp2p/go-libp2p v0.24.1 github.com/libp2p/go-libp2p v0.24.1
github.com/minio/sha256-simd v1.0.0 github.com/minio/sha256-simd v1.0.0
github.com/multiformats/go-multibase v0.1.1 github.com/multiformats/go-multibase v0.1.1
@ -30,7 +30,7 @@ require (
github.com/zeebo/errs v1.3.0 github.com/zeebo/errs v1.3.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
golang.org/x/net v0.5.0 golang.org/x/net v0.6.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
storj.io/drpc v0.0.32 storj.io/drpc v0.0.32
@ -52,7 +52,7 @@ require (
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.0.0 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-posinfo v0.0.1 // indirect github.com/ipfs/go-ipfs-posinfo v0.0.1 // indirect
@ -99,7 +99,7 @@ require (
golang.org/x/crypto v0.4.0 // indirect golang.org/x/crypto v0.4.0 // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sync v0.1.0 // indirect golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.4.0 // indirect golang.org/x/sys v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect google.golang.org/protobuf v1.28.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect lukechampine.com/blake3 v1.1.7 // indirect

16
go.sum
View File

@ -189,8 +189,8 @@ github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/go-bitfield v1.0.0 h1:y/XHm2GEmD9wKngheWNNCNL0pzrWXZwCdQGv1ikXknQ= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.0.0/go.mod h1:N/UiujQy+K+ceU1EF5EkVd1TNqevLrCQMIcAEPrdtus= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=
github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk=
@ -253,8 +253,8 @@ github.com/ipfs/go-merkledag v0.9.0/go.mod h1:bPHqkHt5OZ0p1n3iqPeDiw2jIBkjAytRjS
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.8.0 h1:JyNO144tfu9bx6Hpo119zvbEL9iQ760FHOiJYsUjqaU= github.com/ipfs/go-peertaskqueue v0.8.0 h1:JyNO144tfu9bx6Hpo119zvbEL9iQ760FHOiJYsUjqaU=
github.com/ipfs/go-unixfs v0.4.2 h1:hdQlsHHK5tek9gC9mjGVua8xyTqC+eopGseCRcbCZNg= github.com/ipfs/go-unixfs v0.4.3 h1:EdDc1sNZNFDUlo4UrVAvvAofVI5EwTnKu8Nv8mgXkWQ=
github.com/ipfs/go-unixfs v0.4.2/go.mod h1:L+x6JRlFE0PfyMqeoLYVOKLhn5IeZHvNT7ZI51Y9Qyc= github.com/ipfs/go-unixfs v0.4.3/go.mod h1:TSG7G1UuT+l4pNj91raXAPkX0BhJi3jST1FDTfQ5QyM=
github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs=
github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU=
github.com/ipld/go-codec-dagpb v1.5.0 h1:RspDRdsJpLfgCI0ONhTAnbHdySGD4t+LHSPK4X1+R0k= github.com/ipld/go-codec-dagpb v1.5.0 h1:RspDRdsJpLfgCI0ONhTAnbHdySGD4t+LHSPK4X1+R0k=
@ -576,8 +576,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -647,8 +647,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=

View File

@ -3,6 +3,7 @@ package dialer
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
net2 "github.com/anytypeio/any-sync/net" net2 "github.com/anytypeio/any-sync/net"
@ -100,15 +101,17 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro
} }
func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) { func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) {
tcpConn, err := net.Dial("tcp", addr) st := time.Now()
// TODO: move dial timeout to config
tcpConn, err := net.DialTimeout("tcp", addr, time.Second*3)
if err != nil { if err != nil {
return return nil, nil, fmt.Errorf("dialTimeout error: %v; since start: %v", err, time.Since(st))
} }
timeoutConn := timeoutconn.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds)) timeoutConn := timeoutconn.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds))
sc, err = d.transport.TLSConn(ctx, timeoutConn) sc, err = d.transport.TLSConn(ctx, timeoutConn)
if err != nil { if err != nil {
return return nil, nil, fmt.Errorf("tls handshaeke error: %v; since start: %v", err, time.Since(st))
} }
log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("addr", addr)) log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("addr", addr))
conn = drpcconn.NewWithOptions(sc, drpcconn.Options{Manager: drpcmanager.Options{ conn = drpcconn.NewWithOptions(sc, drpcconn.Options{Manager: drpcmanager.Options{

View File

@ -21,7 +21,6 @@ func NewConn(conn net.Conn, timeout time.Duration) *Conn {
} }
func (c *Conn) Write(p []byte) (n int, err error) { func (c *Conn) Write(p []byte) (n int, err error) {
return c.Conn.Write(p)
for { for {
if c.timeout != 0 { if c.timeout != 0 {
if e := c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)); e != nil { if e := c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)); e != nil {