From aff2061bd1be6180bfa33d23a2ac978443f523da Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 4 Jun 2023 19:01:33 +0200 Subject: [PATCH] WIP request manager --- commonspace/objectsync/objectsync.go | 12 +-- commonspace/requestmanager/requestmanager.go | 77 ++++++++++++++++++-- 2 files changed, 76 insertions(+), 13 deletions(-) diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 9e3c3eba..f4dc7c1b 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -104,6 +104,8 @@ func (s *objectSync) LastUsage() time.Time { } func (s *objectSync) HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) { + hm.ReceiveTime = time.Now() + hm.StartHandlingTime = hm.ReceiveTime return s.handleRequest(ctx, hm.SenderId, hm.Message) } @@ -157,10 +159,7 @@ func (s *objectSync) processHandleMessage(msg HandleMessage) { } func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) { - log := log.With( - zap.String("objectId", msg.ObjectId), - zap.String("requestId", msg.RequestId), - zap.String("replyId", msg.ReplyId)) + log := log.With(zap.String("objectId", msg.ObjectId)) if s.spaceIsDeleted.Load() { log = log.With(zap.Bool("isDeleted", true)) // preventing sync with other clients if they are not just syncing the settings tree @@ -180,10 +179,7 @@ func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *sp } func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - log := log.With( - zap.String("objectId", msg.ObjectId), - zap.String("requestId", msg.RequestId), - zap.String("replyId", msg.ReplyId)) + log := log.With(zap.String("objectId", msg.ObjectId)) if s.spaceIsDeleted.Load() { log = log.With(zap.Bool("isDeleted", true)) // preventing sync with other clients if they are not just syncing the settings tree diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go index e77b4bdb..18eeff45 100644 --- a/commonspace/requestmanager/requestmanager.go +++ b/commonspace/requestmanager/requestmanager.go @@ -4,7 +4,14 @@ import ( "context" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/pool" + "github.com/anyproto/any-sync/net/streampool" + "go.uber.org/zap" + "storj.io/drpc" + "sync" ) const CName = "common.commonspace.requestmanager" @@ -18,13 +25,32 @@ type RequestManager interface { } func New() RequestManager { - return &requestManager{} + return &requestManager{ + workers: 10, + queueSize: 300, + pools: map[string]*streampool.ExecPool{}, + } +} + +type MessageHandler interface { + HandleMessage(ctx context.Context, hm objectsync.HandleMessage) (err error) } type requestManager struct { + sync.Mutex + pools map[string]*streampool.ExecPool + peerPool pool.Pool + workers int + queueSize int + handler MessageHandler + ctx context.Context + cancel context.CancelFunc } func (r *requestManager) Init(a *app.App) (err error) { + r.ctx, r.cancel = context.WithCancel(context.Background()) + r.handler = a.MustComponent(objectsync.CName).(MessageHandler) + r.peerPool = a.MustComponent(pool.CName).(pool.Pool) return } @@ -37,13 +63,54 @@ func (r *requestManager) Run(ctx context.Context) (err error) { } func (r *requestManager) Close(ctx context.Context) (err error) { + r.Lock() + defer r.Unlock() + r.cancel() + for _, p := range r.pools { + _ = p.Close() + } return nil } -func (r *requestManager) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { - return nil, nil +func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + return r.doRequest(ctx, peerId, req) } -func (r *requestManager) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - return nil +func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) (err error) { + r.Lock() + defer r.Unlock() + pl, exists := r.pools[peerId] + if !exists { + pl := streampool.NewExecPool(r.workers, r.queueSize) + r.pools[peerId] = pl + } + // TODO: for later think when many clients are there, + // we need to close pools for inactive clients + return pl.TryAdd(func() { + ctx := r.ctx + resp, err := r.doRequest(ctx, peerId, req) + if err != nil { + log.Warn("failed to send request", zap.Error(err)) + return + } + ctx = peer.CtxWithPeerId(ctx, peerId) + _ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{ + SenderId: peerId, + Message: resp, + PeerCtx: ctx, + }) + }) +} + +func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) { + pr, err := r.peerPool.Get(ctx, peerId) + if err != nil { + return + } + err = pr.DoDrpc(ctx, func(conn drpc.Conn) error { + cl := spacesyncproto.NewDRPCSpaceSyncClient(conn) + resp, err = cl.ObjectSync(ctx, msg) + return err + }) + return }