handle replies in separate thread

This commit is contained in:
Sergey Cherepanov 2023-01-27 13:58:28 +03:00 committed by Mikhail Iudin
parent 80c8da8cac
commit a2765a5233
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
2 changed files with 12 additions and 5 deletions

View File

@ -80,13 +80,13 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message
return s.messagePool.HandleMessage(ctx, senderId, message)
}
func (s *objectSync) handleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
log.With(zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).DebugCtx(ctx, "handling message")
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)).DebugCtx(ctx, "handling message")
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
if err != nil {
return
}
return obj.HandleMessage(ctx, senderId, message)
return obj.HandleMessage(ctx, senderId, msg)
}
func (s *objectSync) MessagePool() MessagePool {

View File

@ -352,7 +352,14 @@ func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
}
func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
return s.handleQueue.Add(ctx, hm.Message.ObjectId, hm)
threadId := hm.Message.ObjectId
if hm.Message.ReplyId != "" {
threadId += hm.Message.ReplyId
defer func() {
_ = s.handleQueue.CloseThread(threadId)
}()
}
return s.handleQueue.Add(ctx, threadId, hm)
}
func (s *space) handleMessage(msg HandleMessage) {