From a2765a5233402517a9a3804b22ef16044f3c1e48 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 27 Jan 2023 13:58:28 +0300 Subject: [PATCH] handle replies in separate thread --- commonspace/objectsync/objectsync.go | 8 ++++---- commonspace/space.go | 9 ++++++++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 20dd0869..11c79a4a 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -80,13 +80,13 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message return s.messagePool.HandleMessage(ctx, senderId, message) } -func (s *objectSync) handleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - log.With(zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).DebugCtx(ctx, "handling message") - obj, err := s.objectGetter.GetObject(ctx, message.ObjectId) +func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)).DebugCtx(ctx, "handling message") + obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) if err != nil { return } - return obj.HandleMessage(ctx, senderId, message) + return obj.HandleMessage(ctx, senderId, msg) } func (s *objectSync) MessagePool() MessagePool { diff --git a/commonspace/space.go b/commonspace/space.go index bbe095b9..c5d8e258 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -352,7 +352,14 @@ func (s *space) DeleteTree(ctx context.Context, id string) (err error) { } func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { - return s.handleQueue.Add(ctx, hm.Message.ObjectId, hm) + threadId := hm.Message.ObjectId + if hm.Message.ReplyId != "" { + threadId += hm.Message.ReplyId + defer func() { + _ = s.handleQueue.CloseThread(threadId) + }() + } + return s.handleQueue.Add(ctx, threadId, hm) } func (s *space) handleMessage(msg HandleMessage) {