fix streampool tags
This commit is contained in:
parent
0e7450fd52
commit
b1a1ff1a72
@ -214,7 +214,7 @@ func (s *space) Init(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
s.cache.AddObject(s.settingsObject)
|
s.cache.AddObject(s.settingsObject)
|
||||||
s.syncStatus.Run()
|
s.syncStatus.Run()
|
||||||
s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 10)
|
s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 100)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -249,7 +249,12 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, st := range streams {
|
for _, st := range streams {
|
||||||
s.exec.Add(ctx, sendStreamFunc(st))
|
if st == nil {
|
||||||
|
panic("nil stream")
|
||||||
|
}
|
||||||
|
if err = s.exec.Add(ctx, sendStreamFunc(st)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -268,11 +273,11 @@ func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error {
|
|||||||
var newTags = make([]string, 0, len(tags))
|
var newTags = make([]string, 0, len(tags))
|
||||||
for _, newTag := range tags {
|
for _, newTag := range tags {
|
||||||
if !slices.Contains(st.tags, newTag) {
|
if !slices.Contains(st.tags, newTag) {
|
||||||
|
st.tags = append(st.tags, newTag)
|
||||||
newTags = append(newTags, newTag)
|
newTags = append(newTags, newTag)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
st.tags = append(st.tags, newTags...)
|
for _, newTag := range newTags {
|
||||||
for _, newTag := range tags {
|
|
||||||
s.streamIdsByTag[newTag] = append(s.streamIdsByTag[newTag], streamId)
|
s.streamIdsByTag[newTag] = append(s.streamIdsByTag[newTag], streamId)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -177,7 +177,7 @@ func TestStreamPool_Tags(t *testing.T) {
|
|||||||
defer s1.Close()
|
defer s1.Close()
|
||||||
fx.AddStream("p2", s2, "t2")
|
fx.AddStream("p2", s2, "t2")
|
||||||
|
|
||||||
err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3")
|
err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3", "t3")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, []uint32{1}, fx.StreamPool.(*streamPool).streamIdsByTag["t3"])
|
assert.Equal(t, []uint32{1}, fx.StreamPool.(*streamPool).streamIdsByTag["t3"])
|
||||||
|
|
||||||
|
|||||||
@ -47,7 +47,7 @@ func (m *multiQueue[T]) Add(ctx context.Context, threadId string, msg T) (err er
|
|||||||
q = m.startThread(threadId)
|
q = m.startThread(threadId)
|
||||||
}
|
}
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
return q.Add(ctx, msg)
|
return q.TryAdd(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *multiQueue[T]) startThread(id string) *mb.MB[T] {
|
func (m *multiQueue[T]) startThread(id string) *mb.MB[T] {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user