From 80c8da8cac4784cc4879dbcfa845a07ee6b22eec Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 27 Jan 2023 13:50:36 +0300 Subject: [PATCH] space milti queue --- commonspace/headsync/diffsyncer.go | 6 +- commonspace/object/tree/synctree/synctree.go | 1 + commonspace/objectsync/msgpool.go | 4 +- commonspace/space.go | 53 ++++++++-- net/streampool/streampool.go | 7 +- util/multiqueue/multiqueue.go | 100 +++++++++++++++++++ util/multiqueue/multiqueue_test.go | 65 ++++++++++++ 7 files changed, 226 insertions(+), 10 deletions(-) create mode 100644 util/multiqueue/multiqueue.go create mode 100644 util/multiqueue/multiqueue_test.go diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 6186c458..21fa49cb 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -155,7 +155,11 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { // it may be already there (i.e. loaded) // and build func will not be called, thus we won't sync the tree // therefore we just do it manually - _ = syncTree.Ping(ctx) + if err = syncTree.Ping(ctx); err != nil { + d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId)) + } else { + d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId)) + } } } diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 8302d2a5..b73d91f0 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -72,6 +72,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { peerId, err := peer.CtxPeerId(ctx) if err != nil { + log.WarnCtx(ctx, "peer not found in context, use first responsible") peerId = deps.Configuration.NodeIds(deps.SpaceId)[0] } diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index e9d4082c..e198de18 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -68,7 +68,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn s.waiters[msg.ReplyId] = waiter s.waitersMx.Unlock() - err = s.SendPeer(context.Background(), peerId, msg) + err = s.SendPeer(ctx, peerId, msg) if err != nil { return } @@ -78,7 +78,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn delete(s.waiters, msg.ReplyId) s.waitersMx.Unlock() - log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting") + log.With(zap.String("replyId", msg.ReplyId)).WarnCtx(ctx, "time elapsed when waiting") err = fmt.Errorf("sendSync context error: %v", ctx.Err()) case reply = <-waiter.ch: // success diff --git a/commonspace/space.go b/commonspace/space.go index dab65d67..bbe095b9 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/anytypeio/any-sync/accountservice" + "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/headsync" "github.com/anytypeio/any-sync/commonspace/object/acl/list" @@ -20,9 +21,11 @@ import ( "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey" + "github.com/anytypeio/any-sync/util/multiqueue" "github.com/anytypeio/any-sync/util/slice" "github.com/zeebo/errs" "go.uber.org/zap" @@ -50,6 +53,13 @@ type SpaceCreatePayload struct { ReplicationKey uint64 } +type HandleMessage struct { + Id uint64 + Deadline time.Time + SenderId string + Message *spacesyncproto.ObjectSyncMessage +} + const SpaceTypeDerived = "derived.space" type SpaceDerivePayload struct { @@ -92,6 +102,8 @@ type Space interface { SyncStatus() syncstatus.StatusUpdater Storage() spacestorage.SpaceStorage + HandleMessage(ctx context.Context, msg HandleMessage) (err error) + Close() error } @@ -110,6 +122,8 @@ type space struct { configuration nodeconf.Configuration settingsObject settings.SettingsObject + handleQueue multiqueue.MultiQueue[HandleMessage] + isClosed atomic.Bool treesUsed atomic.Int32 } @@ -200,7 +214,7 @@ func (s *space) Init(ctx context.Context) (err error) { } s.cache.AddObject(s.settingsObject) s.syncStatus.Run() - + s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 10) return nil } @@ -337,13 +351,40 @@ func (s *space) DeleteTree(ctx context.Context, id string) (err error) { return s.settingsObject.DeleteObject(id) } +func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { + return s.handleQueue.Add(ctx, hm.Message.ObjectId, hm) +} + +func (s *space) handleMessage(msg HandleMessage) { + ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) + ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) + if !msg.Deadline.IsZero() { + now := time.Now() + if now.After(msg.Deadline) { + log.InfoCtx(ctx, "skip message: deadline exceed") + return + } + var cancel context.CancelFunc + ctx, cancel = context.WithDeadline(ctx, msg.Deadline) + defer cancel() + } + + if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { + log.InfoCtx(ctx, "handleMessage error", zap.Error(err)) + } +} + func (s *space) Close() error { + if s.isClosed.Swap(true) { + log.Warn("call space.Close on closed space", zap.String("id", s.id)) + return nil + } log.With(zap.String("id", s.id)).Debug("space is closing") - defer func() { - s.isClosed.Store(true) - log.With(zap.String("id", s.id)).Debug("space closed") - }() + var mError errs.Group + if err := s.handleQueue.Close(); err != nil { + mError.Add(err) + } if err := s.headSync.Close(); err != nil { mError.Add(err) } @@ -362,6 +403,6 @@ func (s *space) Close() error { if err := s.syncStatus.Close(); err != nil { mError.Add(err) } - + log.With(zap.String("id", s.id)).Debug("space closed") return mError.Err() } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index c718c097..81f3b678 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -100,7 +100,9 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P for _, p := range peers { funcs = append(funcs, func() { if e := s.sendOne(ctx, p, msg); e != nil { - log.Info("send peer error", zap.Error(e), zap.String("peerId", p.Id())) + log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id())) + } else { + log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id())) } }) } @@ -121,6 +123,8 @@ func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ... funcs = append(funcs, func() { if e := st.write(msg); e != nil { st.l.Debug("sendById write error", zap.Error(e)) + } else { + st.l.DebugCtx(ctx, "sendById success") } }) } @@ -142,6 +146,7 @@ func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) // continue with next stream continue } else { + st.l.DebugCtx(ctx, "sendOne success") // stop sending on success break } diff --git a/util/multiqueue/multiqueue.go b/util/multiqueue/multiqueue.go new file mode 100644 index 00000000..3b836300 --- /dev/null +++ b/util/multiqueue/multiqueue.go @@ -0,0 +1,100 @@ +package multiqueue + +import ( + "context" + "errors" + "github.com/cheggaaa/mb/v3" + "sync" +) + +var ( + ErrThreadNotExists = errors.New("multiQueue: thread not exists") + ErrClosed = errors.New("multiQueue: closed") +) + +func New[T any](h HandleFunc[T], maxThreadSize int) MultiQueue[T] { + return &multiQueue[T]{ + handler: h, + threads: make(map[string]*mb.MB[T]), + queueMaxSize: maxThreadSize, + } +} + +type HandleFunc[T any] func(msg T) + +type MultiQueue[T any] interface { + Add(ctx context.Context, threadId string, msg T) (err error) + CloseThread(threadId string) (err error) + Close() (err error) +} + +type multiQueue[T any] struct { + handler HandleFunc[T] + queueMaxSize int + threads map[string]*mb.MB[T] + mu sync.Mutex + closed bool +} + +func (m *multiQueue[T]) Add(ctx context.Context, threadId string, msg T) (err error) { + m.mu.Lock() + if m.closed { + m.mu.Unlock() + return ErrClosed + } + q, ok := m.threads[threadId] + if !ok { + q = m.startThread(threadId) + } + m.mu.Unlock() + return q.Add(ctx, msg) +} + +func (m *multiQueue[T]) startThread(id string) *mb.MB[T] { + q := mb.New[T](m.queueMaxSize) + m.threads[id] = q + go m.threadLoop(q) + return q +} + +func (m *multiQueue[T]) threadLoop(q *mb.MB[T]) { + for { + msg, err := q.WaitOne(context.Background()) + if err != nil { + return + } + m.handler(msg) + } +} + +func (m *multiQueue[T]) CloseThread(threadId string) (err error) { + m.mu.Lock() + if m.closed { + m.mu.Unlock() + return ErrClosed + } + q, ok := m.threads[threadId] + if ok { + delete(m.threads, threadId) + } + m.mu.Unlock() + if !ok { + return ErrThreadNotExists + } + return q.Close() +} + +func (m *multiQueue[T]) Close() (err error) { + m.mu.Lock() + if m.closed { + m.mu.Unlock() + return ErrClosed + } + m.closed = true + threads := m.threads + m.mu.Unlock() + for _, q := range threads { + _ = q.Close() + } + return nil +} diff --git a/util/multiqueue/multiqueue_test.go b/util/multiqueue/multiqueue_test.go new file mode 100644 index 00000000..475d784e --- /dev/null +++ b/util/multiqueue/multiqueue_test.go @@ -0,0 +1,65 @@ +package multiqueue + +import ( + "context" + "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestMultiQueue_Add(t *testing.T) { + t.Run("process", func(t *testing.T) { + var msgsCh = make(chan string) + var h HandleFunc[string] = func(msg string) { + msgsCh <- msg + } + q := New[string](h, 10) + defer func() { + require.NoError(t, q.Close()) + }() + + for i := 0; i < 5; i++ { + for j := 0; j < 5; j++ { + assert.NoError(t, q.Add(context.Background(), fmt.Sprint(i), fmt.Sprint(i, j))) + } + } + var msgs []string + for i := 0; i < 5*5; i++ { + select { + case <-time.After(time.Second / 4): + require.True(t, false, "timeout") + case msg := <-msgsCh: + msgs = append(msgs, msg) + } + } + assert.Len(t, msgs, 25) + }) + t.Run("add to closed", func(t *testing.T) { + q := New[string](func(msg string) {}, 10) + require.NoError(t, q.Close()) + assert.Equal(t, ErrClosed, q.Add(context.Background(), "1", "1")) + }) +} + +func TestMultiQueue_CloseThread(t *testing.T) { + var msgsCh = make(chan string) + var h HandleFunc[string] = func(msg string) { + msgsCh <- msg + } + q := New[string](h, 10) + defer func() { + require.NoError(t, q.Close()) + }() + require.NoError(t, q.Add(context.Background(), "1", "1")) + require.NoError(t, q.Add(context.Background(), "1", "2")) + require.NoError(t, q.CloseThread("1")) + for i := 0; i < 2; i++ { + select { + case <-msgsCh: + case <-time.After(time.Second / 4): + require.False(t, true, "timeout") + } + } +}