From 96768adaae2f64d07f03a5dc0ed633b5bf0590e7 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 2 Jun 2023 10:39:48 +0200 Subject: [PATCH] switch coordinator to new peer api --- .../coordinatorclient/coordinatorclient.go | 156 +++++++++--------- 1 file changed, 77 insertions(+), 79 deletions(-) diff --git a/coordinator/coordinatorclient/coordinatorclient.go b/coordinator/coordinatorclient/coordinatorclient.go index 4847ade7..b804d9a5 100644 --- a/coordinator/coordinatorclient/coordinatorclient.go +++ b/coordinator/coordinatorclient/coordinatorclient.go @@ -10,6 +10,7 @@ import ( "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/util/crypto" + "storj.io/drpc" ) const CName = "common.coordinator.coordinatorclient" @@ -39,42 +40,8 @@ type coordinatorClient struct { nodeConf nodeconf.Service } -func (c *coordinatorClient) ChangeStatus(ctx context.Context, spaceId string, deleteRaw *treechangeproto.RawTreeChangeWithId) (status *coordinatorproto.SpaceStatusPayload, err error) { - cl, err := c.client(ctx) - if err != nil { - return - } - resp, err := cl.SpaceStatusChange(ctx, &coordinatorproto.SpaceStatusChangeRequest{ - SpaceId: spaceId, - DeletionChangeId: deleteRaw.GetId(), - DeletionChangePayload: deleteRaw.GetRawChange(), - }) - if err != nil { - err = rpcerr.Unwrap(err) - return - } - status = resp.Payload - return -} - -func (c *coordinatorClient) StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error) { - cl, err := c.client(ctx) - if err != nil { - return - } - resp, err := cl.SpaceStatusCheck(ctx, &coordinatorproto.SpaceStatusCheckRequest{ - SpaceId: spaceId, - }) - if err != nil { - err = rpcerr.Unwrap(err) - return - } - status = resp.Payload - return -} - func (c *coordinatorClient) Init(a *app.App) (err error) { - c.pool = a.MustComponent(pool.CName).(pool.Service).NewPool(CName) + c.pool = a.MustComponent(pool.CName).(pool.Service) c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service) return } @@ -83,8 +50,37 @@ func (c *coordinatorClient) Name() (name string) { return CName } +func (c *coordinatorClient) ChangeStatus(ctx context.Context, spaceId string, deleteRaw *treechangeproto.RawTreeChangeWithId) (status *coordinatorproto.SpaceStatusPayload, err error) { + err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error { + resp, err := cl.SpaceStatusChange(ctx, &coordinatorproto.SpaceStatusChangeRequest{ + SpaceId: spaceId, + DeletionChangeId: deleteRaw.GetId(), + DeletionChangePayload: deleteRaw.GetRawChange(), + }) + if err != nil { + return rpcerr.Unwrap(err) + } + status = resp.Payload + return nil + }) + return +} + +func (c *coordinatorClient) StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error) { + err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error { + resp, err := cl.SpaceStatusCheck(ctx, &coordinatorproto.SpaceStatusCheckRequest{ + SpaceId: spaceId, + }) + if err != nil { + return rpcerr.Unwrap(err) + } + status = resp.Payload + return nil + }) + return +} + func (c *coordinatorClient) SpaceSign(ctx context.Context, payload SpaceSignPayload) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) { - cl, err := c.client(ctx) if err != nil { return } @@ -100,54 +96,56 @@ func (c *coordinatorClient) SpaceSign(ctx context.Context, payload SpaceSignPayl if err != nil { return } - resp, err := cl.SpaceSign(ctx, &coordinatorproto.SpaceSignRequest{ - SpaceId: payload.SpaceId, - Header: payload.SpaceHeader, - OldIdentity: oldIdentity, - NewIdentitySignature: newSignature, + err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error { + resp, err := cl.SpaceSign(ctx, &coordinatorproto.SpaceSignRequest{ + SpaceId: payload.SpaceId, + Header: payload.SpaceHeader, + OldIdentity: oldIdentity, + NewIdentitySignature: newSignature, + }) + if err != nil { + return rpcerr.Unwrap(err) + } + receipt = resp.Receipt + return nil }) - if err != nil { - err = rpcerr.Unwrap(err) - return - } - return resp.Receipt, nil -} - -func (c *coordinatorClient) FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (limit uint64, err error) { - cl, err := c.client(ctx) - if err != nil { - return - } - resp, err := cl.FileLimitCheck(ctx, &coordinatorproto.FileLimitCheckRequest{ - AccountIdentity: identity, - SpaceId: spaceId, - }) - if err != nil { - err = rpcerr.Unwrap(err) - return - } - return resp.Limit, nil -} - -func (c *coordinatorClient) NetworkConfiguration(ctx context.Context, currentId string) (resp *coordinatorproto.NetworkConfigurationResponse, err error) { - cl, err := c.client(ctx) - if err != nil { - return - } - resp, err = cl.NetworkConfiguration(ctx, &coordinatorproto.NetworkConfigurationRequest{ - CurrentId: currentId, - }) - if err != nil { - err = rpcerr.Unwrap(err) - return - } return } -func (c *coordinatorClient) client(ctx context.Context) (coordinatorproto.DRPCCoordinatorClient, error) { +func (c *coordinatorClient) FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (limit uint64, err error) { + err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error { + resp, err := cl.FileLimitCheck(ctx, &coordinatorproto.FileLimitCheckRequest{ + AccountIdentity: identity, + SpaceId: spaceId, + }) + if err != nil { + return rpcerr.Unwrap(err) + } + limit = resp.Limit + return nil + }) + return +} + +func (c *coordinatorClient) NetworkConfiguration(ctx context.Context, currentId string) (resp *coordinatorproto.NetworkConfigurationResponse, err error) { + err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error { + resp, err = cl.NetworkConfiguration(ctx, &coordinatorproto.NetworkConfigurationRequest{ + CurrentId: currentId, + }) + if err != nil { + return rpcerr.Unwrap(err) + } + return nil + }) + return +} + +func (c *coordinatorClient) doClient(ctx context.Context, f func(cl coordinatorproto.DRPCCoordinatorClient) error) error { p, err := c.pool.GetOneOf(ctx, c.nodeConf.CoordinatorPeers()) if err != nil { - return nil, err + return err } - return coordinatorproto.NewDRPCCoordinatorClient(p), nil + return p.DoDrpc(ctx, func(conn drpc.Conn) error { + return f(coordinatorproto.NewDRPCCoordinatorClient(conn)) + }) }