From 564c636391576f6a096afdc686e5b9d84218f973 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 7 Jun 2023 14:09:29 +0200 Subject: [PATCH 1/5] Fix diffsyncer tests --- commonspace/headsync/diffsyncer_test.go | 165 +++++++++++++++++++++++- 1 file changed, 162 insertions(+), 3 deletions(-) diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 4c6fcdc9..b49c2178 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -1,7 +1,14 @@ package headsync import ( + "bytes" "context" + "fmt" + "github.com/anyproto/any-sync/app/ldiff" + "github.com/anyproto/any-sync/commonspace/object/acl/aclrecordproto" + "github.com/anyproto/any-sync/commonspace/object/acl/liststorage/mock_liststorage" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/commonspace/object/tree/treestorage/mock_treestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/net/peer" "github.com/golang/mock/gomock" @@ -11,6 +18,42 @@ import ( "time" ) +type pushSpaceRequestMatcher struct { + spaceId string + aclRootId string + settingsId string + credential []byte + spaceHeader *spacesyncproto.RawSpaceHeaderWithId +} + +func newPushSpaceRequestMatcher( + spaceId string, + aclRootId string, + settingsId string, + credential []byte, + spaceHeader *spacesyncproto.RawSpaceHeaderWithId) *pushSpaceRequestMatcher { + return &pushSpaceRequestMatcher{ + spaceId: spaceId, + aclRootId: aclRootId, + settingsId: settingsId, + credential: credential, + spaceHeader: spaceHeader, + } +} + +func (p pushSpaceRequestMatcher) Matches(x interface{}) bool { + res, ok := x.(*spacesyncproto.SpacePushRequest) + if !ok { + return false + } + + return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader && res.Payload.SpaceSettingsPayloadId == p.settingsId && bytes.Equal(p.credential, res.Credential) +} + +func (p pushSpaceRequestMatcher) String() string { + return "" +} + type mockPeer struct { } @@ -58,12 +101,12 @@ func (fx *headSyncFixture) initDiffSyncer(t *testing.T) { } func TestDiffSyncer(t *testing.T) { - fx := newHeadSyncFixture(t) - fx.initDiffSyncer(t) - defer fx.stop() ctx := context.Background() t.Run("diff syncer sync", func(t *testing.T) { + fx := newHeadSyncFixture(t) + fx.initDiffSyncer(t) + defer fx.stop() mPeer := mockPeer{} fx.peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). @@ -77,6 +120,122 @@ func TestDiffSyncer(t *testing.T) { fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil) require.NoError(t, fx.diffSyncer.Sync(ctx)) }) + + t.Run("diff syncer sync conf error", func(t *testing.T) { + fx := newHeadSyncFixture(t) + fx.initDiffSyncer(t) + defer fx.stop() + ctx := context.Background() + fx.peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). + Return(nil, fmt.Errorf("some error")) + + require.Error(t, fx.diffSyncer.Sync(ctx)) + }) + + t.Run("deletion state remove objects", func(t *testing.T) { + fx := newHeadSyncFixture(t) + fx.initDiffSyncer(t) + defer fx.stop() + deletedId := "id" + fx.deletionStateMock.EXPECT().Exists(deletedId).Return(true) + + // this should not result in any mock being called + fx.diffSyncer.UpdateHeads(deletedId, []string{"someHead"}) + }) + + t.Run("update heads updates diff", func(t *testing.T) { + fx := newHeadSyncFixture(t) + fx.initDiffSyncer(t) + defer fx.stop() + newId := "newId" + newHeads := []string{"h1", "h2"} + hash := "hash" + fx.diffMock.EXPECT().Set(ldiff.Element{ + Id: newId, + Head: concatStrings(newHeads), + }) + fx.diffMock.EXPECT().Hash().Return(hash) + fx.deletionStateMock.EXPECT().Exists(newId).Return(false) + fx.storageMock.EXPECT().WriteSpaceHash(hash) + fx.diffSyncer.UpdateHeads(newId, newHeads) + }) + + t.Run("diff syncer sync space missing", func(t *testing.T) { + fx := newHeadSyncFixture(t) + fx.initDiffSyncer(t) + defer fx.stop() + aclStorageMock := mock_liststorage.NewMockListStorage(fx.ctrl) + settingsStorage := mock_treestorage.NewMockTreeStorage(fx.ctrl) + settingsId := "settingsId" + aclRootId := "aclRootId" + aclRoot := &aclrecordproto.RawAclRecordWithId{ + Id: aclRootId, + } + settingsRoot := &treechangeproto.RawTreeChangeWithId{ + Id: settingsId, + } + spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{} + spaceSettingsId := "spaceSettingsId" + credential := []byte("credential") + + fx.peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). + Return([]peer.Peer{mockPeer{}}, nil) + fx.diffMock.EXPECT(). + Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). + Return(nil, nil, nil, spacesyncproto.ErrSpaceMissing) + + fx.storageMock.EXPECT().AclStorage().Return(aclStorageMock, nil) + fx.storageMock.EXPECT().SpaceHeader().Return(spaceHeader, nil) + fx.storageMock.EXPECT().SpaceSettingsId().Return(spaceSettingsId) + fx.storageMock.EXPECT().TreeStorage(spaceSettingsId).Return(settingsStorage, nil) + + settingsStorage.EXPECT().Root().Return(settingsRoot, nil) + aclStorageMock.EXPECT(). + Root(). + Return(aclRoot, nil) + fx.credentialProviderMock.EXPECT(). + GetCredential(gomock.Any(), spaceHeader). + Return(credential, nil) + fx.clientMock.EXPECT(). + SpacePush(gomock.Any(), newPushSpaceRequestMatcher(fx.spaceState.SpaceId, aclRootId, settingsId, credential, spaceHeader)). + Return(nil, nil) + fx.peerManagerMock.EXPECT().SendPeer(gomock.Any(), "peerId", gomock.Any()) + + require.NoError(t, fx.diffSyncer.Sync(ctx)) + }) + + t.Run("diff syncer sync unexpected", func(t *testing.T) { + fx := newHeadSyncFixture(t) + fx.initDiffSyncer(t) + defer fx.stop() + fx.peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). + Return([]peer.Peer{mockPeer{}}, nil) + fx.diffMock.EXPECT(). + Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). + Return(nil, nil, nil, spacesyncproto.ErrUnexpected) + + require.NoError(t, fx.diffSyncer.Sync(ctx)) + }) + + t.Run("diff syncer sync space is deleted error", func(t *testing.T) { + fx := newHeadSyncFixture(t) + fx.initDiffSyncer(t) + defer fx.stop() + mPeer := mockPeer{} + fx.peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). + Return([]peer.Peer{mPeer}, nil) + fx.diffMock.EXPECT(). + Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). + Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) + fx.storageMock.EXPECT().SpaceSettingsId().Return("settingsId") + fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil) + + require.NoError(t, fx.diffSyncer.Sync(ctx)) + }) } // From 51eb5b1a42b94d5ccd178c44665c054c92cc804d Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 7 Jun 2023 18:05:13 +0200 Subject: [PATCH 2/5] Fix settings and deletion tests --- commonspace/deletion_test.go | 9 +- .../deletionstate/deletionstate_test.go | 4 +- commonspace/headsync/diffsyncer_test.go | 216 ------------------ commonspace/settings/deleter_test.go | 4 +- commonspace/settings/deletionmanager_test.go | 6 +- commonspace/settings/settingsobject_test.go | 5 +- commonspace/spaceutils_test.go | 4 +- 7 files changed, 21 insertions(+), 227 deletions(-) diff --git a/commonspace/deletion_test.go b/commonspace/deletion_test.go index 6862a472..33745884 100644 --- a/commonspace/deletion_test.go +++ b/commonspace/deletion_test.go @@ -73,7 +73,8 @@ func TestSpaceDeleteIds(t *testing.T) { fx.treeManager.space = spc err = spc.Init(ctx) require.NoError(t, err) - + close(fx.treeManager.waitLoad) + var ids []string for i := 0; i < totalObjs; i++ { // creating a tree @@ -147,6 +148,7 @@ func TestSpaceDeleteIdsIncorrectSnapshot(t *testing.T) { // adding space to tree manager fx.treeManager.space = spc err = spc.Init(ctx) + close(fx.treeManager.waitLoad) require.NoError(t, err) settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject() @@ -183,10 +185,12 @@ func TestSpaceDeleteIdsIncorrectSnapshot(t *testing.T) { spc, err = fx.spaceService.NewSpace(ctx, sp) require.NoError(t, err) require.NotNil(t, spc) + fx.treeManager.waitLoad = make(chan struct{}) fx.treeManager.space = spc fx.treeManager.deletedIds = nil err = spc.Init(ctx) require.NoError(t, err) + close(fx.treeManager.waitLoad) // waiting until everything is deleted time.Sleep(3 * time.Second) @@ -230,6 +234,7 @@ func TestSpaceDeleteIdsMarkDeleted(t *testing.T) { fx.treeManager.space = spc err = spc.Init(ctx) require.NoError(t, err) + close(fx.treeManager.waitLoad) settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject() var ids []string @@ -259,10 +264,12 @@ func TestSpaceDeleteIdsMarkDeleted(t *testing.T) { require.NoError(t, err) require.NotNil(t, spc) fx.treeManager.space = spc + fx.treeManager.waitLoad = make(chan struct{}) fx.treeManager.deletedIds = nil fx.treeManager.markedIds = nil err = spc.Init(ctx) require.NoError(t, err) + close(fx.treeManager.waitLoad) // waiting until everything is deleted time.Sleep(3 * time.Second) diff --git a/commonspace/deletionstate/deletionstate_test.go b/commonspace/deletionstate/deletionstate_test.go index d95bcd9b..e5489bd8 100644 --- a/commonspace/deletionstate/deletionstate_test.go +++ b/commonspace/deletionstate/deletionstate_test.go @@ -1,7 +1,6 @@ package deletionstate import ( - "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage" "github.com/golang/mock/gomock" @@ -19,7 +18,8 @@ type fixture struct { func newFixture(t *testing.T) *fixture { ctrl := gomock.NewController(t) spaceStorage := mock_spacestorage.NewMockSpaceStorage(ctrl) - delState := New(logger.NewNamed("test"), spaceStorage).(*objectDeletionState) + delState := New().(*objectDeletionState) + delState.storage = spaceStorage return &fixture{ ctrl: ctrl, delState: delState, diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index b49c2178..7cdb870a 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -237,219 +237,3 @@ func TestDiffSyncer(t *testing.T) { require.NoError(t, fx.diffSyncer.Sync(ctx)) }) } - -// -//type pushSpaceRequestMatcher struct { -// spaceId string -// aclRootId string -// settingsId string -// credential []byte -// spaceHeader *spacesyncproto.RawSpaceHeaderWithId -//} -// -//func (p pushSpaceRequestMatcher) Matches(x interface{}) bool { -// res, ok := x.(*spacesyncproto.SpacePushRequest) -// if !ok { -// return false -// } -// -// return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader && res.Payload.SpaceSettingsPayloadId == p.settingsId && bytes.Equal(p.credential, res.Credential) -//} -// -//func (p pushSpaceRequestMatcher) String() string { -// return "" -//} -// -//type mockPeer struct{} -// -//func (m mockPeer) Addr() string { -// return "" -//} -// -//func (m mockPeer) TryClose(objectTTL time.Duration) (res bool, err error) { -// return true, m.Close() -//} -// -//func (m mockPeer) Id() string { -// return "mockId" -//} -// -//func (m mockPeer) LastUsage() time.Time { -// return time.Time{} -//} -// -//func (m mockPeer) Secure() sec.SecureConn { -// return nil -//} -// -//func (m mockPeer) UpdateLastUsage() { -//} -// -//func (m mockPeer) Close() error { -// return nil -//} -// -//func (m mockPeer) Closed() <-chan struct{} { -// return make(chan struct{}) -//} -// -//func (m mockPeer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { -// return nil -//} -// -//func (m mockPeer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { -// return nil, nil -//} -// -//func newPushSpaceRequestMatcher( -// spaceId string, -// aclRootId string, -// settingsId string, -// credential []byte, -// spaceHeader *spacesyncproto.RawSpaceHeaderWithId) *pushSpaceRequestMatcher { -// return &pushSpaceRequestMatcher{ -// spaceId: spaceId, -// aclRootId: aclRootId, -// settingsId: settingsId, -// credential: credential, -// spaceHeader: spaceHeader, -// } -//} -// -//func TestDiffSyncer_Sync(t *testing.T) { -// // setup -// fx := newHeadSyncFixture(t) -// fx.initDiffSyncer(t) -// defer fx.stop() -// -// diffMock := mock_ldiff.NewMockDiff(ctrl) -// peerManagerMock := mock_peermanager.NewMockPeerManager(ctrl) -// cacheMock := mock_treemanager.NewMockTreeManager(ctrl) -// stMock := mock_spacestorage.NewMockSpaceStorage(ctrl) -// clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl) -// factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { -// return clientMock -// }) -// treeSyncerMock := mock_treemanager.NewMockTreeSyncer(ctrl) -// credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl) -// delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) -// spaceId := "spaceId" -// aclRootId := "aclRootId" -// l := logger.NewNamed(spaceId) -// diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l) -// delState.EXPECT().AddObserver(gomock.Any()) -// cacheMock.EXPECT().NewTreeSyncer(spaceId, gomock.Any()).Return(treeSyncerMock) -// diffSyncer.Init(delState) -// -// t.Run("diff syncer sync", func(t *testing.T) { -// mPeer := mockPeer{} -// peerManagerMock.EXPECT(). -// GetResponsiblePeers(gomock.Any()). -// Return([]peer.Peer{mPeer}, nil) -// diffMock.EXPECT(). -// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). -// Return([]string{"new"}, []string{"changed"}, nil, nil) -// delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1) -// delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1) -// delState.EXPECT().Filter(nil).Return(nil).Times(1) -// treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil) -// require.NoError(t, diffSyncer.Sync(ctx)) -// }) -//} -// -// t.Run("diff syncer sync conf error", func(t *testing.T) { -// peerManagerMock.EXPECT(). -// GetResponsiblePeers(gomock.Any()). -// Return(nil, fmt.Errorf("some error")) -// -// require.Error(t, diffSyncer.Sync(ctx)) -// }) -// -// t.Run("deletion state remove objects", func(t *testing.T) { -// deletedId := "id" -// delState.EXPECT().Exists(deletedId).Return(true) -// -// // this should not result in any mock being called -// diffSyncer.UpdateHeads(deletedId, []string{"someHead"}) -// }) -// -// t.Run("update heads updates diff", func(t *testing.T) { -// newId := "newId" -// newHeads := []string{"h1", "h2"} -// hash := "hash" -// diffMock.EXPECT().Set(ldiff.Element{ -// Id: newId, -// Head: concatStrings(newHeads), -// }) -// diffMock.EXPECT().Hash().Return(hash) -// delState.EXPECT().Exists(newId).Return(false) -// stMock.EXPECT().WriteSpaceHash(hash) -// diffSyncer.UpdateHeads(newId, newHeads) -// }) -// -// t.Run("diff syncer sync space missing", func(t *testing.T) { -// aclStorageMock := mock_liststorage.NewMockListStorage(ctrl) -// settingsStorage := mock_treestorage.NewMockTreeStorage(ctrl) -// settingsId := "settingsId" -// aclRoot := &aclrecordproto.RawAclRecordWithId{ -// Id: aclRootId, -// } -// settingsRoot := &treechangeproto.RawTreeChangeWithId{ -// Id: settingsId, -// } -// spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{} -// spaceSettingsId := "spaceSettingsId" -// credential := []byte("credential") -// -// peerManagerMock.EXPECT(). -// GetResponsiblePeers(gomock.Any()). -// Return([]peer.Peer{mockPeer{}}, nil) -// diffMock.EXPECT(). -// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). -// Return(nil, nil, nil, spacesyncproto.ErrSpaceMissing) -// -// stMock.EXPECT().AclStorage().Return(aclStorageMock, nil) -// stMock.EXPECT().SpaceHeader().Return(spaceHeader, nil) -// stMock.EXPECT().SpaceSettingsId().Return(spaceSettingsId) -// stMock.EXPECT().TreeStorage(spaceSettingsId).Return(settingsStorage, nil) -// -// settingsStorage.EXPECT().Root().Return(settingsRoot, nil) -// aclStorageMock.EXPECT(). -// Root(). -// Return(aclRoot, nil) -// credentialProvider.EXPECT(). -// GetCredential(gomock.Any(), spaceHeader). -// Return(credential, nil) -// clientMock.EXPECT(). -// SpacePush(gomock.Any(), newPushSpaceRequestMatcher(spaceId, aclRootId, settingsId, credential, spaceHeader)). -// Return(nil, nil) -// peerManagerMock.EXPECT().SendPeer(gomock.Any(), "mockId", gomock.Any()) -// -// require.NoError(t, diffSyncer.Sync(ctx)) -// }) -// -// t.Run("diff syncer sync unexpected", func(t *testing.T) { -// peerManagerMock.EXPECT(). -// GetResponsiblePeers(gomock.Any()). -// Return([]peer.Peer{mockPeer{}}, nil) -// diffMock.EXPECT(). -// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). -// Return(nil, nil, nil, spacesyncproto.ErrUnexpected) -// -// require.NoError(t, diffSyncer.Sync(ctx)) -// }) -// -// t.Run("diff syncer sync space is deleted error", func(t *testing.T) { -// mPeer := mockPeer{} -// peerManagerMock.EXPECT(). -// GetResponsiblePeers(gomock.Any()). -// Return([]peer.Peer{mPeer}, nil) -// diffMock.EXPECT(). -// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). -// Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) -// stMock.EXPECT().SpaceSettingsId().Return("settingsId") -// treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil) -// -// require.NoError(t, diffSyncer.Sync(ctx)) -// }) -//} diff --git a/commonspace/settings/deleter_test.go b/commonspace/settings/deleter_test.go index 54feed56..e4a32e84 100644 --- a/commonspace/settings/deleter_test.go +++ b/commonspace/settings/deleter_test.go @@ -2,9 +2,9 @@ package settings import ( "fmt" + "github.com/anyproto/any-sync/commonspace/deletionstate/mock_deletionstate" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager" - "github.com/anyproto/any-sync/commonspace/settings/settingsstate/mock_settingsstate" "github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage" "github.com/golang/mock/gomock" "testing" @@ -14,7 +14,7 @@ func TestDeleter_Delete(t *testing.T) { ctrl := gomock.NewController(t) treeManager := mock_treemanager.NewMockTreeManager(ctrl) st := mock_spacestorage.NewMockSpaceStorage(ctrl) - delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) + delState := mock_deletionstate.NewMockObjectDeletionState(ctrl) deleter := newDeleter(st, delState, treeManager) diff --git a/commonspace/settings/deletionmanager_test.go b/commonspace/settings/deletionmanager_test.go index 9e6b4f05..69e8830d 100644 --- a/commonspace/settings/deletionmanager_test.go +++ b/commonspace/settings/deletionmanager_test.go @@ -2,10 +2,10 @@ package settings import ( "context" + "github.com/anyproto/any-sync/commonspace/deletionstate/mock_deletionstate" "github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager" "github.com/anyproto/any-sync/commonspace/settings/mock_settings" "github.com/anyproto/any-sync/commonspace/settings/settingsstate" - "github.com/anyproto/any-sync/commonspace/settings/settingsstate/mock_settingsstate" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "testing" @@ -26,7 +26,7 @@ func TestDeletionManager_UpdateState_NotResponsible(t *testing.T) { onDeleted := func() { deleted = true } - delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) + delState := mock_deletionstate.NewMockObjectDeletionState(ctrl) treeManager := mock_treemanager.NewMockTreeManager(ctrl) delState.EXPECT().Add(state.DeletedIds) @@ -58,7 +58,7 @@ func TestDeletionManager_UpdateState_Responsible(t *testing.T) { onDeleted := func() { deleted = true } - delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) + delState := mock_deletionstate.NewMockObjectDeletionState(ctrl) treeManager := mock_treemanager.NewMockTreeManager(ctrl) provider := mock_settings.NewMockSpaceIdsProvider(ctrl) diff --git a/commonspace/settings/settingsobject_test.go b/commonspace/settings/settingsobject_test.go index 31956c81..9d83d9cd 100644 --- a/commonspace/settings/settingsobject_test.go +++ b/commonspace/settings/settingsobject_test.go @@ -3,6 +3,7 @@ package settings import ( "context" "github.com/anyproto/any-sync/accountservice/mock_accountservice" + "github.com/anyproto/any-sync/commonspace/deletionstate/mock_deletionstate" "github.com/anyproto/any-sync/commonspace/object/accountdata" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree" @@ -54,7 +55,7 @@ type settingsFixture struct { deleter *mock_settings.MockDeleter syncTree *mock_synctree.MockSyncTree historyTree *mock_objecttree.MockObjectTree - delState *mock_settingsstate.MockObjectDeletionState + delState *mock_deletionstate.MockObjectDeletionState account *mock_accountservice.MockService } @@ -66,7 +67,7 @@ func newSettingsFixture(t *testing.T) *settingsFixture { acc := mock_accountservice.NewMockService(ctrl) treeManager := mock_treemanager.NewMockTreeManager(ctrl) st := mock_spacestorage.NewMockSpaceStorage(ctrl) - delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) + delState := mock_deletionstate.NewMockObjectDeletionState(ctrl) delManager := mock_settings.NewMockDeletionManager(ctrl) stateBuilder := mock_settingsstate.NewMockStateBuilder(ctrl) changeFactory := mock_settingsstate.NewMockChangeFactory(ctrl) diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index c1d40f5c..cc82cecd 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -270,6 +270,7 @@ type mockTreeManager struct { cache ocache.OCache deletedIds []string markedIds []string + waitLoad chan struct{} } func (t *mockTreeManager) NewTreeSyncer(spaceId string, treeManager treemanager.TreeManager) treemanager.TreeSyncer { @@ -283,6 +284,7 @@ func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId s func (t *mockTreeManager) Init(a *app.App) (err error) { t.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) { + <-t.waitLoad return t.space.TreeBuilder().BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{}) }, ocache.WithGCPeriod(time.Minute), @@ -352,7 +354,7 @@ func newFixture(t *testing.T) *spaceFixture { configurationService: &mockConf{}, storageProvider: spacestorage.NewInMemorySpaceStorageProvider(), peermanagerProvider: &mockPeerManagerProvider{}, - treeManager: &mockTreeManager{}, + treeManager: &mockTreeManager{waitLoad: make(chan struct{})}, pool: &mockPool{}, spaceService: New(), } From 4d1494a17ab1eb1c1a751b098e05cf5f3f3c82af Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 7 Jun 2023 19:31:15 +0200 Subject: [PATCH 3/5] Add mocks and some requestmanager tests --- commonspace/requestmanager/requestmanager.go | 48 +++--- .../requestmanager/requestmanager_test.go | 96 +++++++++++ net/peer/mock_peer/mock_peer.go | 149 ++++++++++++++++++ net/peer/peer.go | 1 + net/pool/mock_pool/mock_pool.go | 80 ++++++++++ net/pool/pool.go | 1 + 6 files changed, 355 insertions(+), 20 deletions(-) create mode 100644 net/peer/mock_peer/mock_peer.go create mode 100644 net/pool/mock_pool/mock_pool.go diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go index ac4351c0..d72ea394 100644 --- a/commonspace/requestmanager/requestmanager.go +++ b/commonspace/requestmanager/requestmanager.go @@ -38,19 +38,21 @@ type MessageHandler interface { type requestManager struct { sync.Mutex - pools map[string]*streampool.ExecPool - peerPool pool.Pool - workers int - queueSize int - handler MessageHandler - ctx context.Context - cancel context.CancelFunc + pools map[string]*streampool.ExecPool + peerPool pool.Pool + workers int + queueSize int + handler MessageHandler + ctx context.Context + cancel context.CancelFunc + clientFactory spacesyncproto.ClientFactory } func (r *requestManager) Init(a *app.App) (err error) { r.ctx, r.cancel = context.WithCancel(context.Background()) r.handler = a.MustComponent(objectsync.CName).(MessageHandler) r.peerPool = a.MustComponent(pool.CName).(pool.Pool) + r.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) return } @@ -89,18 +91,24 @@ func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectS // TODO: for later think when many clients are there, // we need to close pools for inactive clients return pl.TryAdd(func() { - ctx := r.ctx - resp, err := r.doRequest(ctx, peerId, req) - if err != nil { - log.Warn("failed to send request", zap.Error(err)) - return - } - ctx = peer.CtxWithPeerId(ctx, peerId) - _ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{ - SenderId: peerId, - Message: resp, - PeerCtx: ctx, - }) + doRequestAndHandle(r, peerId, req) + }) +} + +var doRequestAndHandle = (*requestManager).requestAndHandle + +func (r *requestManager) requestAndHandle(peerId string, req *spacesyncproto.ObjectSyncMessage) { + ctx := r.ctx + resp, err := r.doRequest(ctx, peerId, req) + if err != nil { + log.Warn("failed to send request", zap.Error(err)) + return + } + ctx = peer.CtxWithPeerId(ctx, peerId) + _ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{ + SenderId: peerId, + Message: resp, + PeerCtx: ctx, }) } @@ -110,7 +118,7 @@ func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spac return } err = pr.DoDrpc(ctx, func(conn drpc.Conn) error { - cl := spacesyncproto.NewDRPCSpaceSyncClient(conn) + cl := r.clientFactory.Client(conn) resp, err = cl.ObjectSync(ctx, msg) return err }) diff --git a/commonspace/requestmanager/requestmanager_test.go b/commonspace/requestmanager/requestmanager_test.go index e5e89798..1bdaa884 100644 --- a/commonspace/requestmanager/requestmanager_test.go +++ b/commonspace/requestmanager/requestmanager_test.go @@ -1 +1,97 @@ package requestmanager + +import ( + "context" + "github.com/anyproto/any-sync/commonspace/objectsync" + "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/commonspace/spacesyncproto/mock_spacesyncproto" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/peer/mock_peer" + "github.com/anyproto/any-sync/net/pool/mock_pool" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "storj.io/drpc" + "storj.io/drpc/drpcconn" + "testing" +) + +type fixture struct { + requestManager *requestManager + messageHandlerMock *mock_objectsync.MockObjectSync + peerPoolMock *mock_pool.MockPool + clientMock *mock_spacesyncproto.MockDRPCSpaceSyncClient + ctrl *gomock.Controller +} + +func newFixture(t *testing.T) *fixture { + ctrl := gomock.NewController(t) + manager := New().(*requestManager) + peerPoolMock := mock_pool.NewMockPool(ctrl) + messageHandlerMock := mock_objectsync.NewMockObjectSync(ctrl) + clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl) + manager.peerPool = peerPoolMock + manager.handler = messageHandlerMock + manager.clientFactory = spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { + return clientMock + }) + manager.ctx, manager.cancel = context.WithCancel(context.Background()) + return &fixture{ + requestManager: manager, + messageHandlerMock: messageHandlerMock, + peerPoolMock: peerPoolMock, + clientMock: clientMock, + ctrl: ctrl, + } +} + +func (fx *fixture) stop() { + fx.ctrl.Finish() +} + +func TestRequestManager_Request(t *testing.T) { + ctx := context.Background() + + t.Run("send request", func(t *testing.T) { + fx := newFixture(t) + defer fx.stop() + + peerId := "peerId" + peerMock := mock_peer.NewMockPeer(fx.ctrl) + conn := &drpcconn.Conn{} + msg := &spacesyncproto.ObjectSyncMessage{} + resp := &spacesyncproto.ObjectSyncMessage{} + fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil) + fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil) + peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) { + drpcHandler(conn) + }).Return(nil) + res, err := fx.requestManager.SendRequest(ctx, peerId, msg) + require.NoError(t, err) + require.Equal(t, resp, res) + }) + + t.Run("request and handle", func(t *testing.T) { + fx := newFixture(t) + defer fx.stop() + ctx = fx.requestManager.ctx + + peerId := "peerId" + peerMock := mock_peer.NewMockPeer(fx.ctrl) + conn := &drpcconn.Conn{} + msg := &spacesyncproto.ObjectSyncMessage{} + resp := &spacesyncproto.ObjectSyncMessage{} + fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil) + fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil) + peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) { + drpcHandler(conn) + }).Return(nil) + fx.messageHandlerMock.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, msg objectsync.HandleMessage) { + require.Equal(t, peerId, msg.SenderId) + require.Equal(t, resp, msg.Message) + pId, _ := peer.CtxPeerId(msg.PeerCtx) + require.Equal(t, peerId, pId) + }).Return(nil) + fx.requestManager.requestAndHandle(peerId, msg) + }) +} diff --git a/net/peer/mock_peer/mock_peer.go b/net/peer/mock_peer/mock_peer.go new file mode 100644 index 00000000..dc0a5b6a --- /dev/null +++ b/net/peer/mock_peer/mock_peer.go @@ -0,0 +1,149 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anyproto/any-sync/net/peer (interfaces: Peer) + +// Package mock_peer is a generated GoMock package. +package mock_peer + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + drpc "storj.io/drpc" +) + +// MockPeer is a mock of Peer interface. +type MockPeer struct { + ctrl *gomock.Controller + recorder *MockPeerMockRecorder +} + +// MockPeerMockRecorder is the mock recorder for MockPeer. +type MockPeerMockRecorder struct { + mock *MockPeer +} + +// NewMockPeer creates a new mock instance. +func NewMockPeer(ctrl *gomock.Controller) *MockPeer { + mock := &MockPeer{ctrl: ctrl} + mock.recorder = &MockPeerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPeer) EXPECT() *MockPeerMockRecorder { + return m.recorder +} + +// AcquireDrpcConn mocks base method. +func (m *MockPeer) AcquireDrpcConn(arg0 context.Context) (drpc.Conn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AcquireDrpcConn", arg0) + ret0, _ := ret[0].(drpc.Conn) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AcquireDrpcConn indicates an expected call of AcquireDrpcConn. +func (mr *MockPeerMockRecorder) AcquireDrpcConn(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireDrpcConn", reflect.TypeOf((*MockPeer)(nil).AcquireDrpcConn), arg0) +} + +// Close mocks base method. +func (m *MockPeer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockPeerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPeer)(nil).Close)) +} + +// Context mocks base method. +func (m *MockPeer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockPeerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockPeer)(nil).Context)) +} + +// DoDrpc mocks base method. +func (m *MockPeer) DoDrpc(arg0 context.Context, arg1 func(drpc.Conn) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DoDrpc", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DoDrpc indicates an expected call of DoDrpc. +func (mr *MockPeerMockRecorder) DoDrpc(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoDrpc", reflect.TypeOf((*MockPeer)(nil).DoDrpc), arg0, arg1) +} + +// Id mocks base method. +func (m *MockPeer) Id() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Id") + ret0, _ := ret[0].(string) + return ret0 +} + +// Id indicates an expected call of Id. +func (mr *MockPeerMockRecorder) Id() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockPeer)(nil).Id)) +} + +// IsClosed mocks base method. +func (m *MockPeer) IsClosed() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsClosed") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsClosed indicates an expected call of IsClosed. +func (mr *MockPeerMockRecorder) IsClosed() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockPeer)(nil).IsClosed)) +} + +// ReleaseDrpcConn mocks base method. +func (m *MockPeer) ReleaseDrpcConn(arg0 drpc.Conn) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReleaseDrpcConn", arg0) +} + +// ReleaseDrpcConn indicates an expected call of ReleaseDrpcConn. +func (mr *MockPeerMockRecorder) ReleaseDrpcConn(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseDrpcConn", reflect.TypeOf((*MockPeer)(nil).ReleaseDrpcConn), arg0) +} + +// TryClose mocks base method. +func (m *MockPeer) TryClose(arg0 time.Duration) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryClose", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TryClose indicates an expected call of TryClose. +func (mr *MockPeerMockRecorder) TryClose(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryClose", reflect.TypeOf((*MockPeer)(nil).TryClose), arg0) +} diff --git a/net/peer/peer.go b/net/peer/peer.go index 7f42260c..300243c4 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_peer/mock_peer.go github.com/anyproto/any-sync/net/peer Peer package peer import ( diff --git a/net/pool/mock_pool/mock_pool.go b/net/pool/mock_pool/mock_pool.go new file mode 100644 index 00000000..be884903 --- /dev/null +++ b/net/pool/mock_pool/mock_pool.go @@ -0,0 +1,80 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anyproto/any-sync/net/pool (interfaces: Pool) + +// Package mock_pool is a generated GoMock package. +package mock_pool + +import ( + context "context" + reflect "reflect" + + peer "github.com/anyproto/any-sync/net/peer" + gomock "github.com/golang/mock/gomock" +) + +// MockPool is a mock of Pool interface. +type MockPool struct { + ctrl *gomock.Controller + recorder *MockPoolMockRecorder +} + +// MockPoolMockRecorder is the mock recorder for MockPool. +type MockPoolMockRecorder struct { + mock *MockPool +} + +// NewMockPool creates a new mock instance. +func NewMockPool(ctrl *gomock.Controller) *MockPool { + mock := &MockPool{ctrl: ctrl} + mock.recorder = &MockPoolMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPool) EXPECT() *MockPoolMockRecorder { + return m.recorder +} + +// AddPeer mocks base method. +func (m *MockPool) AddPeer(arg0 context.Context, arg1 peer.Peer) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddPeer", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddPeer indicates an expected call of AddPeer. +func (mr *MockPoolMockRecorder) AddPeer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPeer", reflect.TypeOf((*MockPool)(nil).AddPeer), arg0, arg1) +} + +// Get mocks base method. +func (m *MockPool) Get(arg0 context.Context, arg1 string) (peer.Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0, arg1) + ret0, _ := ret[0].(peer.Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockPoolMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPool)(nil).Get), arg0, arg1) +} + +// GetOneOf mocks base method. +func (m *MockPool) GetOneOf(arg0 context.Context, arg1 []string) (peer.Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOneOf", arg0, arg1) + ret0, _ := ret[0].(peer.Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOneOf indicates an expected call of GetOneOf. +func (mr *MockPoolMockRecorder) GetOneOf(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOneOf", reflect.TypeOf((*MockPool)(nil).GetOneOf), arg0, arg1) +} diff --git a/net/pool/pool.go b/net/pool/pool.go index 37f8328e..7e936e1b 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_pool/mock_pool.go github.com/anyproto/any-sync/net/pool Pool package pool import ( From c8c0839a57a232dc8d976966701300bd6325996b Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 7 Jun 2023 21:48:13 +0200 Subject: [PATCH 4/5] Add request manager tests --- .../requestmanager/requestmanager_test.go | 94 ++++++++++++++++++- net/streampool/sendpool.go | 8 +- 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/commonspace/requestmanager/requestmanager_test.go b/commonspace/requestmanager/requestmanager_test.go index 1bdaa884..35d497fc 100644 --- a/commonspace/requestmanager/requestmanager_test.go +++ b/commonspace/requestmanager/requestmanager_test.go @@ -13,7 +13,9 @@ import ( "github.com/stretchr/testify/require" "storj.io/drpc" "storj.io/drpc/drpcconn" + "sync" "testing" + "time" ) type fixture struct { @@ -49,7 +51,7 @@ func (fx *fixture) stop() { fx.ctrl.Finish() } -func TestRequestManager_Request(t *testing.T) { +func TestRequestManager_SyncRequest(t *testing.T) { ctx := context.Background() t.Run("send request", func(t *testing.T) { @@ -95,3 +97,93 @@ func TestRequestManager_Request(t *testing.T) { fx.requestManager.requestAndHandle(peerId, msg) }) } + +func TestRequestManager_QueueRequest(t *testing.T) { + t.Run("max concurrent reqs for peer, independent reqs for other peer", func(t *testing.T) { + // testing 2 concurrent requests to one peer and simultaneous to another peer + fx := newFixture(t) + defer fx.stop() + fx.requestManager.workers = 2 + msgRelease := make(chan struct{}) + msgWait := make(chan struct{}) + msgs := sync.Map{} + doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) { + msgs.Store(req.ObjectId, struct{}{}) + <-msgWait + <-msgRelease + } + otherPeer := "otherPeer" + msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"} + msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"} + msg3 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id3"} + otherMsg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "otherId1"} + + // sending requests to first peer + peerId := "peerId" + err := fx.requestManager.QueueRequest(peerId, msg1) + require.NoError(t, err) + err = fx.requestManager.QueueRequest(peerId, msg2) + require.NoError(t, err) + err = fx.requestManager.QueueRequest(peerId, msg3) + require.NoError(t, err) + + // waiting until all the messages are loaded + msgWait <- struct{}{} + msgWait <- struct{}{} + _, ok := msgs.Load("id1") + require.True(t, ok) + _, ok = msgs.Load("id2") + require.True(t, ok) + // third message should not be read + _, ok = msgs.Load("id3") + require.False(t, ok) + + // request for other peer should pass + err = fx.requestManager.QueueRequest(otherPeer, otherMsg1) + require.NoError(t, err) + msgWait <- struct{}{} + + _, ok = msgs.Load("otherId1") + require.True(t, ok) + close(msgRelease) + }) + + t.Run("no requests after close", func(t *testing.T) { + fx := newFixture(t) + defer fx.stop() + fx.requestManager.workers = 1 + msgRelease := make(chan struct{}) + msgWait := make(chan struct{}) + msgs := sync.Map{} + doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) { + msgs.Store(req.ObjectId, struct{}{}) + <-msgWait + <-msgRelease + } + msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"} + msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"} + + // sending requests to first peer + peerId := "peerId" + err := fx.requestManager.QueueRequest(peerId, msg1) + require.NoError(t, err) + err = fx.requestManager.QueueRequest(peerId, msg2) + require.NoError(t, err) + + // waiting until all the message is loaded + msgWait <- struct{}{} + _, ok := msgs.Load("id1") + require.True(t, ok) + _, ok = msgs.Load("id2") + require.False(t, ok) + + fx.requestManager.Close(context.Background()) + close(msgRelease) + // waiting to know if the second one is not taken + // because the manager is now closed + time.Sleep(100 * time.Millisecond) + _, ok = msgs.Load("id2") + require.False(t, ok) + + }) +} diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index 0bff0765..642aeb9a 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -10,7 +10,10 @@ import ( // workers - how many processes will execute tasks // maxSize - limit for queue size func NewExecPool(workers, maxSize int) *ExecPool { + ctx, cancel := context.WithCancel(context.Background()) ss := &ExecPool{ + ctx: ctx, + cancel: cancel, workers: workers, batch: mb.New[func()](maxSize), } @@ -19,6 +22,8 @@ func NewExecPool(workers, maxSize int) *ExecPool { // ExecPool needed for parallel execution of the incoming send tasks type ExecPool struct { + ctx context.Context + cancel context.CancelFunc workers int batch *mb.MB[func()] } @@ -39,7 +44,7 @@ func (ss *ExecPool) Run() { func (ss *ExecPool) sendLoop() { for { - f, err := ss.batch.WaitOne(context.Background()) + f, err := ss.batch.WaitOne(ss.ctx) if err != nil { log.Debug("close send loop", zap.Error(err)) return @@ -49,5 +54,6 @@ func (ss *ExecPool) sendLoop() { } func (ss *ExecPool) Close() (err error) { + ss.cancel() return ss.batch.Close() } From 318a49c526efeaad5267d6105f6e16c24a3d24bb Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 7 Jun 2023 22:05:05 +0200 Subject: [PATCH 5/5] Change objectsync injection --- commonspace/objectsync/objectsync.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index d2febaee..0ec7f344 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/net/peer" @@ -69,7 +70,7 @@ type objectSync struct { func (s *objectSync) Init(a *app.App) (err error) { s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) - s.objectGetter = app.MustComponent[syncobjectgetter.SyncObjectGetter](a) + s.objectGetter = a.MustComponent(treemanager.CName).(syncobjectgetter.SyncObjectGetter) s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) mc := a.Component(metric.CName)