127 lines
3.4 KiB
Go
127 lines
3.4 KiB
Go
package requestmanager
|
|
|
|
import (
|
|
"context"
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/app/logger"
|
|
"github.com/anyproto/any-sync/commonspace/objectsync"
|
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
|
"github.com/anyproto/any-sync/net/peer"
|
|
"github.com/anyproto/any-sync/net/pool"
|
|
"github.com/anyproto/any-sync/net/streampool"
|
|
"go.uber.org/zap"
|
|
"storj.io/drpc"
|
|
"sync"
|
|
)
|
|
|
|
const CName = "common.commonspace.requestmanager"
|
|
|
|
var log = logger.NewNamed(CName)
|
|
|
|
type RequestManager interface {
|
|
app.ComponentRunnable
|
|
SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
|
QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
|
}
|
|
|
|
func New() RequestManager {
|
|
return &requestManager{
|
|
workers: 10,
|
|
queueSize: 300,
|
|
pools: map[string]*streampool.ExecPool{},
|
|
}
|
|
}
|
|
|
|
type MessageHandler interface {
|
|
HandleMessage(ctx context.Context, hm objectsync.HandleMessage) (err error)
|
|
}
|
|
|
|
type requestManager struct {
|
|
sync.Mutex
|
|
pools map[string]*streampool.ExecPool
|
|
peerPool pool.Pool
|
|
workers int
|
|
queueSize int
|
|
handler MessageHandler
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
clientFactory spacesyncproto.ClientFactory
|
|
}
|
|
|
|
func (r *requestManager) Init(a *app.App) (err error) {
|
|
r.ctx, r.cancel = context.WithCancel(context.Background())
|
|
r.handler = a.MustComponent(objectsync.CName).(MessageHandler)
|
|
r.peerPool = a.MustComponent(pool.CName).(pool.Pool)
|
|
r.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
|
return
|
|
}
|
|
|
|
func (r *requestManager) Name() (name string) {
|
|
return CName
|
|
}
|
|
|
|
func (r *requestManager) Run(ctx context.Context) (err error) {
|
|
return nil
|
|
}
|
|
|
|
func (r *requestManager) Close(ctx context.Context) (err error) {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
r.cancel()
|
|
for _, p := range r.pools {
|
|
_ = p.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
|
// TODO: limit concurrent sends?
|
|
return r.doRequest(ctx, peerId, req)
|
|
}
|
|
|
|
func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) (err error) {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
pl, exists := r.pools[peerId]
|
|
if !exists {
|
|
pl = streampool.NewExecPool(r.workers, r.queueSize)
|
|
r.pools[peerId] = pl
|
|
pl.Run()
|
|
}
|
|
// TODO: for later think when many clients are there,
|
|
// we need to close pools for inactive clients
|
|
return pl.TryAdd(func() {
|
|
doRequestAndHandle(r, peerId, req)
|
|
})
|
|
}
|
|
|
|
var doRequestAndHandle = (*requestManager).requestAndHandle
|
|
|
|
func (r *requestManager) requestAndHandle(peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
|
ctx := r.ctx
|
|
resp, err := r.doRequest(ctx, peerId, req)
|
|
if err != nil {
|
|
log.Warn("failed to send request", zap.Error(err))
|
|
return
|
|
}
|
|
ctx = peer.CtxWithPeerId(ctx, peerId)
|
|
_ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{
|
|
SenderId: peerId,
|
|
Message: resp,
|
|
PeerCtx: ctx,
|
|
})
|
|
}
|
|
|
|
func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
|
pr, err := r.peerPool.Get(ctx, peerId)
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = pr.DoDrpc(ctx, func(conn drpc.Conn) error {
|
|
cl := r.clientFactory.Client(conn)
|
|
resp, err = cl.ObjectSync(ctx, msg)
|
|
return err
|
|
})
|
|
return
|
|
}
|