Add subscribe after push
This commit is contained in:
parent
370683ff70
commit
c95059b4c6
@ -123,7 +123,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
d.syncStatus.SetNodesOnline(p.Id(), true)
|
d.syncStatus.SetNodesOnline(p.Id(), true)
|
||||||
|
|
||||||
if err == spacesyncproto.ErrSpaceMissing {
|
if err == spacesyncproto.ErrSpaceMissing {
|
||||||
return d.sendPushSpaceRequest(ctx, cl)
|
return d.sendPushSpaceRequest(ctx, p.Id(), cl)
|
||||||
}
|
}
|
||||||
|
|
||||||
totalLen := len(newIds) + len(changedIds) + len(removedIds)
|
totalLen := len(newIds) + len(changedIds) + len(removedIds)
|
||||||
@ -167,7 +167,7 @@ func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []strin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, cl spacesyncproto.DRPCSpaceSyncClient) (err error) {
|
func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl spacesyncproto.DRPCSpaceSyncClient) (err error) {
|
||||||
aclStorage, err := d.storage.AclStorage()
|
aclStorage, err := d.storage.AclStorage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -202,5 +202,25 @@ func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, cl spacesyncproto
|
|||||||
_, err = cl.SpacePush(ctx, &spacesyncproto.SpacePushRequest{
|
_, err = cl.SpacePush(ctx, &spacesyncproto.SpacePushRequest{
|
||||||
Payload: spacePayload,
|
Payload: spacePayload,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if e := d.subscribe(ctx, peerId); e != nil {
|
||||||
|
d.log.WarnCtx(ctx, "error subscribing for space", zap.Error(e))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *diffSyncer) subscribe(ctx context.Context, peerId string) (err error) {
|
||||||
|
var msg = &spacesyncproto.SpaceSubscription{
|
||||||
|
SpaceIds: []string{d.spaceId},
|
||||||
|
Action: spacesyncproto.SpaceSubscriptionAction_Subscribe,
|
||||||
|
}
|
||||||
|
payload, err := msg.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return d.peerManager.SendPeer(ctx, peerId, &spacesyncproto.ObjectSyncMessage{
|
||||||
|
Payload: payload,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -192,6 +192,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
clientMock.EXPECT().
|
clientMock.EXPECT().
|
||||||
SpacePush(gomock.Any(), newPushSpaceRequestMatcher(spaceId, aclRootId, settingsId, spaceHeader)).
|
SpacePush(gomock.Any(), newPushSpaceRequestMatcher(spaceId, aclRootId, settingsId, spaceHeader)).
|
||||||
Return(nil, nil)
|
Return(nil, nil)
|
||||||
|
peerManagerMock.EXPECT().SendPeer(gomock.Any(), "mockId", gomock.Any())
|
||||||
|
|
||||||
require.NoError(t, diffSyncer.Sync(ctx))
|
require.NoError(t, diffSyncer.Sync(ctx))
|
||||||
})
|
})
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user