From b1a1ff1a728dada2eda58ec22af29d3bb9234584 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 27 Jan 2023 20:34:48 +0300 Subject: [PATCH] fix streampool tags --- commonspace/space.go | 2 +- net/streampool/streampool.go | 11 ++++++++--- net/streampool/streampool_test.go | 2 +- util/multiqueue/multiqueue.go | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/commonspace/space.go b/commonspace/space.go index c5d8e258..b4ab1da3 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -214,7 +214,7 @@ func (s *space) Init(ctx context.Context) (err error) { } s.cache.AddObject(s.settingsObject) s.syncStatus.Run() - s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 10) + s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 100) return nil } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 388978df..0a2b4419 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -249,7 +249,12 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st } } for _, st := range streams { - s.exec.Add(ctx, sendStreamFunc(st)) + if st == nil { + panic("nil stream") + } + if err = s.exec.Add(ctx, sendStreamFunc(st)); err != nil { + return err + } } return } @@ -268,11 +273,11 @@ func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error { var newTags = make([]string, 0, len(tags)) for _, newTag := range tags { if !slices.Contains(st.tags, newTag) { + st.tags = append(st.tags, newTag) newTags = append(newTags, newTag) } } - st.tags = append(st.tags, newTags...) - for _, newTag := range tags { + for _, newTag := range newTags { s.streamIdsByTag[newTag] = append(s.streamIdsByTag[newTag], streamId) } return nil diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index eb5eb50f..671b2aa2 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -177,7 +177,7 @@ func TestStreamPool_Tags(t *testing.T) { defer s1.Close() fx.AddStream("p2", s2, "t2") - err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3") + err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3", "t3") require.NoError(t, err) assert.Equal(t, []uint32{1}, fx.StreamPool.(*streamPool).streamIdsByTag["t3"]) diff --git a/util/multiqueue/multiqueue.go b/util/multiqueue/multiqueue.go index 3b836300..ee1cce81 100644 --- a/util/multiqueue/multiqueue.go +++ b/util/multiqueue/multiqueue.go @@ -47,7 +47,7 @@ func (m *multiQueue[T]) Add(ctx context.Context, threadId string, msg T) (err er q = m.startThread(threadId) } m.mu.Unlock() - return q.Add(ctx, msg) + return q.TryAdd(msg) } func (m *multiQueue[T]) startThread(id string) *mb.MB[T] {