switch coordinator to new peer api

This commit is contained in:
Sergey Cherepanov 2023-06-02 10:39:48 +02:00
parent e9f23e2dd9
commit 96768adaae
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C

View File

@ -10,6 +10,7 @@ import (
"github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/crypto" "github.com/anyproto/any-sync/util/crypto"
"storj.io/drpc"
) )
const CName = "common.coordinator.coordinatorclient" const CName = "common.coordinator.coordinatorclient"
@ -39,42 +40,8 @@ type coordinatorClient struct {
nodeConf nodeconf.Service nodeConf nodeconf.Service
} }
func (c *coordinatorClient) ChangeStatus(ctx context.Context, spaceId string, deleteRaw *treechangeproto.RawTreeChangeWithId) (status *coordinatorproto.SpaceStatusPayload, err error) {
cl, err := c.client(ctx)
if err != nil {
return
}
resp, err := cl.SpaceStatusChange(ctx, &coordinatorproto.SpaceStatusChangeRequest{
SpaceId: spaceId,
DeletionChangeId: deleteRaw.GetId(),
DeletionChangePayload: deleteRaw.GetRawChange(),
})
if err != nil {
err = rpcerr.Unwrap(err)
return
}
status = resp.Payload
return
}
func (c *coordinatorClient) StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error) {
cl, err := c.client(ctx)
if err != nil {
return
}
resp, err := cl.SpaceStatusCheck(ctx, &coordinatorproto.SpaceStatusCheckRequest{
SpaceId: spaceId,
})
if err != nil {
err = rpcerr.Unwrap(err)
return
}
status = resp.Payload
return
}
func (c *coordinatorClient) Init(a *app.App) (err error) { func (c *coordinatorClient) Init(a *app.App) (err error) {
c.pool = a.MustComponent(pool.CName).(pool.Service).NewPool(CName) c.pool = a.MustComponent(pool.CName).(pool.Service)
c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service) c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
return return
} }
@ -83,8 +50,37 @@ func (c *coordinatorClient) Name() (name string) {
return CName return CName
} }
func (c *coordinatorClient) ChangeStatus(ctx context.Context, spaceId string, deleteRaw *treechangeproto.RawTreeChangeWithId) (status *coordinatorproto.SpaceStatusPayload, err error) {
err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error {
resp, err := cl.SpaceStatusChange(ctx, &coordinatorproto.SpaceStatusChangeRequest{
SpaceId: spaceId,
DeletionChangeId: deleteRaw.GetId(),
DeletionChangePayload: deleteRaw.GetRawChange(),
})
if err != nil {
return rpcerr.Unwrap(err)
}
status = resp.Payload
return nil
})
return
}
func (c *coordinatorClient) StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error) {
err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error {
resp, err := cl.SpaceStatusCheck(ctx, &coordinatorproto.SpaceStatusCheckRequest{
SpaceId: spaceId,
})
if err != nil {
return rpcerr.Unwrap(err)
}
status = resp.Payload
return nil
})
return
}
func (c *coordinatorClient) SpaceSign(ctx context.Context, payload SpaceSignPayload) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) { func (c *coordinatorClient) SpaceSign(ctx context.Context, payload SpaceSignPayload) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) {
cl, err := c.client(ctx)
if err != nil { if err != nil {
return return
} }
@ -100,54 +96,56 @@ func (c *coordinatorClient) SpaceSign(ctx context.Context, payload SpaceSignPayl
if err != nil { if err != nil {
return return
} }
resp, err := cl.SpaceSign(ctx, &coordinatorproto.SpaceSignRequest{ err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error {
SpaceId: payload.SpaceId, resp, err := cl.SpaceSign(ctx, &coordinatorproto.SpaceSignRequest{
Header: payload.SpaceHeader, SpaceId: payload.SpaceId,
OldIdentity: oldIdentity, Header: payload.SpaceHeader,
NewIdentitySignature: newSignature, OldIdentity: oldIdentity,
NewIdentitySignature: newSignature,
})
if err != nil {
return rpcerr.Unwrap(err)
}
receipt = resp.Receipt
return nil
}) })
if err != nil {
err = rpcerr.Unwrap(err)
return
}
return resp.Receipt, nil
}
func (c *coordinatorClient) FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (limit uint64, err error) {
cl, err := c.client(ctx)
if err != nil {
return
}
resp, err := cl.FileLimitCheck(ctx, &coordinatorproto.FileLimitCheckRequest{
AccountIdentity: identity,
SpaceId: spaceId,
})
if err != nil {
err = rpcerr.Unwrap(err)
return
}
return resp.Limit, nil
}
func (c *coordinatorClient) NetworkConfiguration(ctx context.Context, currentId string) (resp *coordinatorproto.NetworkConfigurationResponse, err error) {
cl, err := c.client(ctx)
if err != nil {
return
}
resp, err = cl.NetworkConfiguration(ctx, &coordinatorproto.NetworkConfigurationRequest{
CurrentId: currentId,
})
if err != nil {
err = rpcerr.Unwrap(err)
return
}
return return
} }
func (c *coordinatorClient) client(ctx context.Context) (coordinatorproto.DRPCCoordinatorClient, error) { func (c *coordinatorClient) FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (limit uint64, err error) {
err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error {
resp, err := cl.FileLimitCheck(ctx, &coordinatorproto.FileLimitCheckRequest{
AccountIdentity: identity,
SpaceId: spaceId,
})
if err != nil {
return rpcerr.Unwrap(err)
}
limit = resp.Limit
return nil
})
return
}
func (c *coordinatorClient) NetworkConfiguration(ctx context.Context, currentId string) (resp *coordinatorproto.NetworkConfigurationResponse, err error) {
err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error {
resp, err = cl.NetworkConfiguration(ctx, &coordinatorproto.NetworkConfigurationRequest{
CurrentId: currentId,
})
if err != nil {
return rpcerr.Unwrap(err)
}
return nil
})
return
}
func (c *coordinatorClient) doClient(ctx context.Context, f func(cl coordinatorproto.DRPCCoordinatorClient) error) error {
p, err := c.pool.GetOneOf(ctx, c.nodeConf.CoordinatorPeers()) p, err := c.pool.GetOneOf(ctx, c.nodeConf.CoordinatorPeers())
if err != nil { if err != nil {
return nil, err return err
} }
return coordinatorproto.NewDRPCCoordinatorClient(p), nil return p.DoDrpc(ctx, func(conn drpc.Conn) error {
return f(coordinatorproto.NewDRPCCoordinatorClient(conn))
})
} }