WIP request manager

This commit is contained in:
mcrakhman 2023-06-04 19:01:33 +02:00
parent b85f545fa3
commit aff2061bd1
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
2 changed files with 76 additions and 13 deletions

View File

@ -104,6 +104,8 @@ func (s *objectSync) LastUsage() time.Time {
}
func (s *objectSync) HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
hm.ReceiveTime = time.Now()
hm.StartHandlingTime = hm.ReceiveTime
return s.handleRequest(ctx, hm.SenderId, hm.Message)
}
@ -157,10 +159,7 @@ func (s *objectSync) processHandleMessage(msg HandleMessage) {
}
func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
log := log.With(
zap.String("objectId", msg.ObjectId),
zap.String("requestId", msg.RequestId),
zap.String("replyId", msg.ReplyId))
log := log.With(zap.String("objectId", msg.ObjectId))
if s.spaceIsDeleted.Load() {
log = log.With(zap.Bool("isDeleted", true))
// preventing sync with other clients if they are not just syncing the settings tree
@ -180,10 +179,7 @@ func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *sp
}
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
log := log.With(
zap.String("objectId", msg.ObjectId),
zap.String("requestId", msg.RequestId),
zap.String("replyId", msg.ReplyId))
log := log.With(zap.String("objectId", msg.ObjectId))
if s.spaceIsDeleted.Load() {
log = log.With(zap.Bool("isDeleted", true))
// preventing sync with other clients if they are not just syncing the settings tree

View File

@ -4,7 +4,14 @@ import (
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap"
"storj.io/drpc"
"sync"
)
const CName = "common.commonspace.requestmanager"
@ -18,13 +25,32 @@ type RequestManager interface {
}
func New() RequestManager {
return &requestManager{}
return &requestManager{
workers: 10,
queueSize: 300,
pools: map[string]*streampool.ExecPool{},
}
}
type MessageHandler interface {
HandleMessage(ctx context.Context, hm objectsync.HandleMessage) (err error)
}
type requestManager struct {
sync.Mutex
pools map[string]*streampool.ExecPool
peerPool pool.Pool
workers int
queueSize int
handler MessageHandler
ctx context.Context
cancel context.CancelFunc
}
func (r *requestManager) Init(a *app.App) (err error) {
r.ctx, r.cancel = context.WithCancel(context.Background())
r.handler = a.MustComponent(objectsync.CName).(MessageHandler)
r.peerPool = a.MustComponent(pool.CName).(pool.Pool)
return
}
@ -37,13 +63,54 @@ func (r *requestManager) Run(ctx context.Context) (err error) {
}
func (r *requestManager) Close(ctx context.Context) (err error) {
r.Lock()
defer r.Unlock()
r.cancel()
for _, p := range r.pools {
_ = p.Close()
}
return nil
}
func (r *requestManager) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
return nil, nil
func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
return r.doRequest(ctx, peerId, req)
}
func (r *requestManager) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) (err error) {
r.Lock()
defer r.Unlock()
pl, exists := r.pools[peerId]
if !exists {
pl := streampool.NewExecPool(r.workers, r.queueSize)
r.pools[peerId] = pl
}
// TODO: for later think when many clients are there,
// we need to close pools for inactive clients
return pl.TryAdd(func() {
ctx := r.ctx
resp, err := r.doRequest(ctx, peerId, req)
if err != nil {
log.Warn("failed to send request", zap.Error(err))
return
}
ctx = peer.CtxWithPeerId(ctx, peerId)
_ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{
SenderId: peerId,
Message: resp,
PeerCtx: ctx,
})
})
}
func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
pr, err := r.peerPool.Get(ctx, peerId)
if err != nil {
return
}
err = pr.DoDrpc(ctx, func(conn drpc.Conn) error {
cl := spacesyncproto.NewDRPCSpaceSyncClient(conn)
resp, err = cl.ObjectSync(ctx, msg)
return err
})
return
}