From adec5fa746cb050c9800d1cf9715543001b9a48c Mon Sep 17 00:00:00 2001 From: Roman Khafizianov Date: Fri, 10 Feb 2023 14:23:45 +0100 Subject: [PATCH 01/13] RWLocker iface: add TryLock --- commonspace/object/tree/objecttree/objecttree.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index 58c254da..8e5d7c84 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -4,19 +4,22 @@ package objecttree import ( "context" "errors" + "sync" + "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" - list "github.com/anytypeio/any-sync/commonspace/object/acl/list" + "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/util/keys/symmetric" "github.com/anytypeio/any-sync/util/slice" - "sync" ) type RWLocker interface { sync.Locker RLock() RUnlock() + TryRLock() bool + TryLock() bool } var ( From 06016850404699441ac73c459de1fe93677a38d1 Mon Sep 17 00:00:00 2001 From: Roman Khafizianov Date: Fri, 10 Feb 2023 14:40:45 +0100 Subject: [PATCH 02/13] fix tests --- .../mock_objecttree/mock_objecttree.go | 28 +++++++++++++++++++ .../synctree/mock_synctree/mock_synctree.go | 28 +++++++++++++++++++ .../tree/synctree/synctreehandler_test.go | 23 +++++++++++++-- 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go b/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go index 6915f927..95dd7a00 100644 --- a/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go +++ b/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go @@ -321,6 +321,34 @@ func (mr *MockObjectTreeMockRecorder) Storage() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockObjectTree)(nil).Storage)) } +// TryLock mocks base method. +func (m *MockObjectTree) TryLock() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryLock") + ret0, _ := ret[0].(bool) + return ret0 +} + +// TryLock indicates an expected call of TryLock. +func (mr *MockObjectTreeMockRecorder) TryLock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryLock", reflect.TypeOf((*MockObjectTree)(nil).TryLock)) +} + +// TryRLock mocks base method. +func (m *MockObjectTree) TryRLock() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryRLock") + ret0, _ := ret[0].(bool) + return ret0 +} + +// TryRLock indicates an expected call of TryRLock. +func (mr *MockObjectTreeMockRecorder) TryRLock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryRLock", reflect.TypeOf((*MockObjectTree)(nil).TryRLock)) +} + // Unlock mocks base method. func (m *MockObjectTree) Unlock() { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index 992ae5c6..1928e62d 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -472,6 +472,34 @@ func (mr *MockSyncTreeMockRecorder) SyncWithPeer(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncTree)(nil).SyncWithPeer), arg0, arg1) } +// TryLock mocks base method. +func (m *MockSyncTree) TryLock() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryLock") + ret0, _ := ret[0].(bool) + return ret0 +} + +// TryLock indicates an expected call of TryLock. +func (mr *MockSyncTreeMockRecorder) TryLock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryLock", reflect.TypeOf((*MockSyncTree)(nil).TryLock)) +} + +// TryRLock mocks base method. +func (m *MockSyncTree) TryRLock() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryRLock") + ret0, _ := ret[0].(bool) + return ret0 +} + +// TryRLock indicates an expected call of TryRLock. +func (mr *MockSyncTreeMockRecorder) TryRLock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryRLock", reflect.TypeOf((*MockSyncTree)(nil).TryRLock)) +} + // Unlock mocks base method. func (m *MockSyncTree) Unlock() { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/synctree/synctreehandler_test.go b/commonspace/object/tree/synctree/synctreehandler_test.go index 389d55d4..052479d2 100644 --- a/commonspace/object/tree/synctree/synctreehandler_test.go +++ b/commonspace/object/tree/synctree/synctreehandler_test.go @@ -3,6 +3,9 @@ package synctree import ( "context" "fmt" + "sync" + "testing" + "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree/mock_objecttree" @@ -12,13 +15,11 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "go.uber.org/zap" - "sync" - "testing" ) type testObjTreeMock struct { *mock_objecttree.MockObjectTree - m sync.Mutex + m sync.RWMutex } func newTestObjMock(mockTree *mock_objecttree.MockObjectTree) *testObjTreeMock { @@ -31,10 +32,26 @@ func (t *testObjTreeMock) Lock() { t.m.Lock() } +func (t *testObjTreeMock) RLock() { + t.m.RLock() +} + func (t *testObjTreeMock) Unlock() { t.m.Unlock() } +func (t *testObjTreeMock) RUnlock() { + t.m.RUnlock() +} + +func (t *testObjTreeMock) TryLock() bool { + return t.m.TryLock() +} + +func (t *testObjTreeMock) TryRLock() bool { + return t.m.TryRLock() +} + type syncHandlerFixture struct { ctrl *gomock.Controller syncClientMock *mock_synctree.MockSyncClient From 004c8b7f384db1130c5a6358333446a71cffe4a2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 13 Feb 2023 18:12:15 +0000 Subject: [PATCH 03/13] Bump golang.org/x/net from 0.5.0 to 0.6.0 Bumps [golang.org/x/net](https://github.com/golang/net) from 0.5.0 to 0.6.0. - [Release notes](https://github.com/golang/net/releases) - [Commits](https://github.com/golang/net/compare/v0.5.0...v0.6.0) --- updated-dependencies: - dependency-name: golang.org/x/net dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 6da4c240..8d91c677 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/zeebo/errs v1.3.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 - golang.org/x/net v0.5.0 + golang.org/x/net v0.6.0 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/yaml.v3 v3.0.1 storj.io/drpc v0.0.32 @@ -99,7 +99,7 @@ require ( golang.org/x/crypto v0.4.0 // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.4.0 // indirect + golang.org/x/sys v0.5.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/protobuf v1.28.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect diff --git a/go.sum b/go.sum index 3f6d5c5e..6438765f 100644 --- a/go.sum +++ b/go.sum @@ -576,8 +576,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -647,8 +647,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= From 0f89b5a4f5744dc6a8f2484d42c99c05a8bc0599 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 13 Feb 2023 18:12:32 +0000 Subject: [PATCH 04/13] Bump github.com/ipfs/go-unixfs from 0.4.2 to 0.4.3 Bumps [github.com/ipfs/go-unixfs](https://github.com/ipfs/go-unixfs) from 0.4.2 to 0.4.3. - [Release notes](https://github.com/ipfs/go-unixfs/releases) - [Commits](https://github.com/ipfs/go-unixfs/compare/v0.4.2...v0.4.3) --- updated-dependencies: - dependency-name: github.com/ipfs/go-unixfs dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 6da4c240..8b474c03 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/ipfs/go-ipfs-exchange-interface v0.2.0 github.com/ipfs/go-ipld-format v0.4.0 github.com/ipfs/go-merkledag v0.9.0 - github.com/ipfs/go-unixfs v0.4.2 + github.com/ipfs/go-unixfs v0.4.3 github.com/libp2p/go-libp2p v0.24.1 github.com/minio/sha256-simd v1.0.0 github.com/multiformats/go-multibase v0.1.1 @@ -52,7 +52,7 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/ipfs/bbloom v0.0.4 // indirect - github.com/ipfs/go-bitfield v1.0.0 // indirect + github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-datastore v0.6.0 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-posinfo v0.0.1 // indirect diff --git a/go.sum b/go.sum index 3f6d5c5e..2409dd4c 100644 --- a/go.sum +++ b/go.sum @@ -189,8 +189,8 @@ github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/go-bitfield v1.0.0 h1:y/XHm2GEmD9wKngheWNNCNL0pzrWXZwCdQGv1ikXknQ= -github.com/ipfs/go-bitfield v1.0.0/go.mod h1:N/UiujQy+K+ceU1EF5EkVd1TNqevLrCQMIcAEPrdtus= +github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= +github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= @@ -253,8 +253,8 @@ github.com/ipfs/go-merkledag v0.9.0/go.mod h1:bPHqkHt5OZ0p1n3iqPeDiw2jIBkjAytRjS github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.8.0 h1:JyNO144tfu9bx6Hpo119zvbEL9iQ760FHOiJYsUjqaU= -github.com/ipfs/go-unixfs v0.4.2 h1:hdQlsHHK5tek9gC9mjGVua8xyTqC+eopGseCRcbCZNg= -github.com/ipfs/go-unixfs v0.4.2/go.mod h1:L+x6JRlFE0PfyMqeoLYVOKLhn5IeZHvNT7ZI51Y9Qyc= +github.com/ipfs/go-unixfs v0.4.3 h1:EdDc1sNZNFDUlo4UrVAvvAofVI5EwTnKu8Nv8mgXkWQ= +github.com/ipfs/go-unixfs v0.4.3/go.mod h1:TSG7G1UuT+l4pNj91raXAPkX0BhJi3jST1FDTfQ5QyM= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= github.com/ipld/go-codec-dagpb v1.5.0 h1:RspDRdsJpLfgCI0ONhTAnbHdySGD4t+LHSPK4X1+R0k= From 18451bd4fe458193b288ad146805ef629c1213d2 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 13 Feb 2023 21:49:56 +0300 Subject: [PATCH 05/13] listen ctx for closing object --- app/ocache/ocache.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/app/ocache/ocache.go b/app/ocache/ocache.go index 31852286..7132d43d 100644 --- a/app/ocache/ocache.go +++ b/app/ocache/ocache.go @@ -176,8 +176,12 @@ Load: } c.mu.Unlock() if closing { - <-e.close - goto Load + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.close: + goto Load + } } if load { From 55ad2a536bf14d8d8a5daaaa4a89f802e0e5b9cb Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 13 Feb 2023 21:53:27 +0300 Subject: [PATCH 06/13] commongetter: check for space close --- commonspace/commongetter.go | 15 ++++++++++++--- commonspace/space.go | 2 +- commonspace/spaceservice.go | 5 ++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/commonspace/commongetter.go b/commonspace/commongetter.go index 12abdf4c..6807265b 100644 --- a/commonspace/commongetter.go +++ b/commonspace/commongetter.go @@ -5,18 +5,21 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/treegetter" + "sync/atomic" ) type commonGetter struct { treegetter.TreeGetter spaceId string reservedObjects []syncobjectgetter.SyncObject + spaceIsClosed *atomic.Bool } -func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter { +func newCommonGetter(spaceId string, getter treegetter.TreeGetter, spaceIsClosed *atomic.Bool) *commonGetter { return &commonGetter{ - TreeGetter: getter, - spaceId: spaceId, + TreeGetter: getter, + spaceId: spaceId, + spaceIsClosed: spaceIsClosed, } } @@ -25,6 +28,9 @@ func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) { } func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) { + if c.spaceIsClosed.Load() { + return nil, ErrSpaceClosed + } if obj := c.getReservedObject(treeId); obj != nil { return obj.(objecttree.ObjectTree), nil } @@ -41,6 +47,9 @@ func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject } func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { + if c.spaceIsClosed.Load() { + return nil, ErrSpaceClosed + } if obj := c.getReservedObject(objectId); obj != nil { return obj, nil } diff --git a/commonspace/space.go b/commonspace/space.go index 4cb02e9f..b29cd513 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -127,7 +127,7 @@ type space struct { handleQueue multiqueue.MultiQueue[HandleMessage] - isClosed atomic.Bool + isClosed *atomic.Bool treesUsed atomic.Int32 } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index c26839b8..7ae37a81 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -17,6 +17,7 @@ import ( "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/nodeconf" + "sync/atomic" ) const CName = "common.commonspace" @@ -116,7 +117,8 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { } lastConfiguration := s.configurationService.GetLast() - getter := newCommonGetter(st.Id(), s.treeGetter) + var spaceIsClosed = &atomic.Bool{} + getter := newCommonGetter(st.Id(), s.treeGetter, spaceIsClosed) syncStatus := syncstatus.NewNoOpSyncStatus() // this will work only for clients, not the best solution, but... if !lastConfiguration.IsResponsible(st.Id()) { @@ -141,6 +143,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { configuration: lastConfiguration, peerManager: peerManager, storage: st, + isClosed: spaceIsClosed, } return sp, nil } From 92f22d04b0c573dce78c5e5bf912bc45d1f3a36e Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 13 Feb 2023 22:09:06 +0300 Subject: [PATCH 07/13] default periodic sync timeout --- commonspace/headsync/headsync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 7ab3b255..0f61f315 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -58,7 +58,7 @@ func NewHeadSync( l := log.With(zap.String("spaceId", spaceId)) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) - periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l) + periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l) return &headSync{ spaceId: spaceId, From ce0a60df671338a1b0b50f6c87caec4f9f851888 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 13 Feb 2023 22:13:40 +0300 Subject: [PATCH 08/13] timeoutconn: remove debug --- net/timeoutconn/conn.go | 1 - 1 file changed, 1 deletion(-) diff --git a/net/timeoutconn/conn.go b/net/timeoutconn/conn.go index 5459d461..0f5c5d30 100644 --- a/net/timeoutconn/conn.go +++ b/net/timeoutconn/conn.go @@ -21,7 +21,6 @@ func NewConn(conn net.Conn, timeout time.Duration) *Conn { } func (c *Conn) Write(p []byte) (n int, err error) { - return c.Conn.Write(p) for { if c.timeout != 0 { if e := c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)); e != nil { From 047bd1c379871516c5bc411923af889601d00bcc Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Tue, 14 Feb 2023 16:53:43 +0300 Subject: [PATCH 09/13] handleQueue thread close / onObjectClose method --- commonspace/object/tree/synctree/synctree.go | 10 ++++------ commonspace/space.go | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index c63dcc96..08bffb01 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -15,7 +15,6 @@ import ( "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" "go.uber.org/zap" - "sync/atomic" ) var ( @@ -46,7 +45,7 @@ type syncTree struct { syncStatus syncstatus.StatusUpdater notifiable HeadNotifiable listener updatelistener.UpdateListener - treeUsage *atomic.Int32 + onClose func(id string) isClosed bool isDeleted bool } @@ -69,7 +68,7 @@ type BuildDeps struct { AclList list.AclList SpaceStorage spacestorage.SpaceStorage TreeStorage treestorage.TreeStorage - TreeUsage *atomic.Int32 + OnClose func(id string) SyncStatus syncstatus.StatusUpdater PeerGetter ResponsiblePeersGetter WaitTreeRemoteSync bool @@ -106,7 +105,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy ObjectTree: objTree, syncClient: syncClient, notifiable: deps.HeadNotifiable, - treeUsage: deps.TreeUsage, + onClose: deps.OnClose, listener: deps.Listener, syncStatus: deps.SyncStatus, } @@ -213,7 +212,7 @@ func (s *syncTree) Close() (err error) { if s.isClosed { return ErrSyncTreeClosed } - s.treeUsage.Add(-1) + s.onClose(s.Id()) s.isClosed = true return } @@ -239,7 +238,6 @@ func (s *syncTree) afterBuild() { if s.listener != nil { s.listener.Rebuild(s) } - s.treeUsage.Add(1) if s.notifiable != nil { s.notifiable.UpdateHeads(s.Id(), s.Heads()) } diff --git a/commonspace/space.go b/commonspace/space.go index b29cd513..7188b67a 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -128,7 +128,7 @@ type space struct { handleQueue multiqueue.MultiQueue[HandleMessage] isClosed *atomic.Bool - treesUsed atomic.Int32 + treesUsed *atomic.Int32 } func (s *space) LastUsage() time.Time { @@ -295,7 +295,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea Listener: listener, AclList: s.aclList, SpaceStorage: s.storage, - TreeUsage: &s.treesUsed, + OnClose: func(id string) {}, SyncStatus: s.syncStatus, PeerGetter: s.peerManager, } @@ -326,12 +326,16 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t Listener: opts.Listener, AclList: s.aclList, SpaceStorage: s.storage, - TreeUsage: &s.treesUsed, + OnClose: s.onObjectClose, SyncStatus: s.syncStatus, WaitTreeRemoteSync: opts.WaitTreeRemoteSync, PeerGetter: s.peerManager, } - return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) + if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil { + return nil, err + } + s.treesUsed.Add(1) + return } func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) { @@ -392,6 +396,11 @@ func (s *space) handleMessage(msg HandleMessage) { } } +func (s *space) onObjectClose(id string) { + s.treesUsed.Add(-1) + _ = s.handleQueue.CloseThread(id) +} + func (s *space) Close() error { if s.isClosed.Swap(true) { log.Warn("call space.Close on closed space", zap.String("id", s.id)) From 00dc0bcca6ad49bf079b51656d45e76e54ce542d Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Tue, 14 Feb 2023 19:00:27 +0300 Subject: [PATCH 10/13] fix --- commonspace/spaceservice.go | 1 + 1 file changed, 1 insertion(+) diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 7ae37a81..502ca9a8 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -143,6 +143,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { configuration: lastConfiguration, peerManager: peerManager, storage: st, + treesUsed: &atomic.Int32{}, isClosed: spaceIsClosed, } return sp, nil From 4cde54bdb3ac11cb2f895e9e68f43a8ee3d9d286 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Tue, 14 Feb 2023 19:39:57 +0300 Subject: [PATCH 11/13] dial timeout --- net/dialer/dialer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 2076f959..965d700d 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -100,7 +100,7 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro } func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) { - tcpConn, err := net.Dial("tcp", addr) + tcpConn, err := net.DialTimeout("tcp", addr, time.Second) if err != nil { return } From 189922e534e8d50d59732928caa24630878ea68d Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Tue, 14 Feb 2023 21:18:25 +0300 Subject: [PATCH 12/13] cleanup thread on error --- commonspace/space.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/commonspace/space.go b/commonspace/space.go index 7188b67a..c481a912 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -392,6 +392,10 @@ func (s *space) handleMessage(msg HandleMessage) { } if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { + if msg.Message.ObjectId != "" { + // cleanup thread on error + _ = s.handleQueue.CloseThread(msg.Message.ObjectId) + } log.InfoCtx(ctx, "handleMessage error", zap.Error(err)) } } From 4d356318f027fb7675bd042a727b87870a00c978 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 15 Feb 2023 14:58:23 +0300 Subject: [PATCH 13/13] increase dial timeout + debug logs --- net/dialer/dialer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 965d700d..bf8cf2c5 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -3,6 +3,7 @@ package dialer import ( "context" "errors" + "fmt" "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app/logger" net2 "github.com/anytypeio/any-sync/net" @@ -100,15 +101,17 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro } func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) { - tcpConn, err := net.DialTimeout("tcp", addr, time.Second) + st := time.Now() + // TODO: move dial timeout to config + tcpConn, err := net.DialTimeout("tcp", addr, time.Second*3) if err != nil { - return + return nil, nil, fmt.Errorf("dialTimeout error: %v; since start: %v", err, time.Since(st)) } timeoutConn := timeoutconn.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds)) sc, err = d.transport.TLSConn(ctx, timeoutConn) if err != nil { - return + return nil, nil, fmt.Errorf("tls handshaeke error: %v; since start: %v", err, time.Since(st)) } log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("addr", addr)) conn = drpcconn.NewWithOptions(sc, drpcconn.Options{Manager: drpcmanager.Options{