space milti queue

This commit is contained in:
Sergey Cherepanov 2023-01-27 13:50:36 +03:00 committed by Mikhail Iudin
parent eb1049b160
commit 80c8da8cac
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
7 changed files with 226 additions and 10 deletions

View File

@ -155,7 +155,11 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
// it may be already there (i.e. loaded)
// and build func will not be called, thus we won't sync the tree
// therefore we just do it manually
_ = syncTree.Ping(ctx)
if err = syncTree.Ping(ctx); err != nil {
d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId))
} else {
d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId))
}
}
}

View File

@ -72,6 +72,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
log.WarnCtx(ctx, "peer not found in context, use first responsible")
peerId = deps.Configuration.NodeIds(deps.SpaceId)[0]
}

View File

@ -68,7 +68,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
s.waiters[msg.ReplyId] = waiter
s.waitersMx.Unlock()
err = s.SendPeer(context.Background(), peerId, msg)
err = s.SendPeer(ctx, peerId, msg)
if err != nil {
return
}
@ -78,7 +78,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting")
log.With(zap.String("replyId", msg.ReplyId)).WarnCtx(ctx, "time elapsed when waiting")
err = fmt.Errorf("sendSync context error: %v", ctx.Err())
case reply = <-waiter.ch:
// success

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/anytypeio/any-sync/accountservice"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/headsync"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
@ -20,9 +21,11 @@ import (
"github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/nodeconf"
"github.com/anytypeio/any-sync/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
"github.com/anytypeio/any-sync/util/multiqueue"
"github.com/anytypeio/any-sync/util/slice"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -50,6 +53,13 @@ type SpaceCreatePayload struct {
ReplicationKey uint64
}
type HandleMessage struct {
Id uint64
Deadline time.Time
SenderId string
Message *spacesyncproto.ObjectSyncMessage
}
const SpaceTypeDerived = "derived.space"
type SpaceDerivePayload struct {
@ -92,6 +102,8 @@ type Space interface {
SyncStatus() syncstatus.StatusUpdater
Storage() spacestorage.SpaceStorage
HandleMessage(ctx context.Context, msg HandleMessage) (err error)
Close() error
}
@ -110,6 +122,8 @@ type space struct {
configuration nodeconf.Configuration
settingsObject settings.SettingsObject
handleQueue multiqueue.MultiQueue[HandleMessage]
isClosed atomic.Bool
treesUsed atomic.Int32
}
@ -200,7 +214,7 @@ func (s *space) Init(ctx context.Context) (err error) {
}
s.cache.AddObject(s.settingsObject)
s.syncStatus.Run()
s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 10)
return nil
}
@ -337,13 +351,40 @@ func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settingsObject.DeleteObject(id)
}
func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
return s.handleQueue.Add(ctx, hm.Message.ObjectId, hm)
}
func (s *space) handleMessage(msg HandleMessage) {
ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
if !msg.Deadline.IsZero() {
now := time.Now()
if now.After(msg.Deadline) {
log.InfoCtx(ctx, "skip message: deadline exceed")
return
}
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, msg.Deadline)
defer cancel()
}
if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
log.InfoCtx(ctx, "handleMessage error", zap.Error(err))
}
}
func (s *space) Close() error {
if s.isClosed.Swap(true) {
log.Warn("call space.Close on closed space", zap.String("id", s.id))
return nil
}
log.With(zap.String("id", s.id)).Debug("space is closing")
defer func() {
s.isClosed.Store(true)
log.With(zap.String("id", s.id)).Debug("space closed")
}()
var mError errs.Group
if err := s.handleQueue.Close(); err != nil {
mError.Add(err)
}
if err := s.headSync.Close(); err != nil {
mError.Add(err)
}
@ -362,6 +403,6 @@ func (s *space) Close() error {
if err := s.syncStatus.Close(); err != nil {
mError.Add(err)
}
log.With(zap.String("id", s.id)).Debug("space closed")
return mError.Err()
}

View File

@ -100,7 +100,9 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P
for _, p := range peers {
funcs = append(funcs, func() {
if e := s.sendOne(ctx, p, msg); e != nil {
log.Info("send peer error", zap.Error(e), zap.String("peerId", p.Id()))
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id()))
} else {
log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id()))
}
})
}
@ -121,6 +123,8 @@ func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...
funcs = append(funcs, func() {
if e := st.write(msg); e != nil {
st.l.Debug("sendById write error", zap.Error(e))
} else {
st.l.DebugCtx(ctx, "sendById success")
}
})
}
@ -142,6 +146,7 @@ func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message)
// continue with next stream
continue
} else {
st.l.DebugCtx(ctx, "sendOne success")
// stop sending on success
break
}

View File

@ -0,0 +1,100 @@
package multiqueue
import (
"context"
"errors"
"github.com/cheggaaa/mb/v3"
"sync"
)
var (
ErrThreadNotExists = errors.New("multiQueue: thread not exists")
ErrClosed = errors.New("multiQueue: closed")
)
func New[T any](h HandleFunc[T], maxThreadSize int) MultiQueue[T] {
return &multiQueue[T]{
handler: h,
threads: make(map[string]*mb.MB[T]),
queueMaxSize: maxThreadSize,
}
}
type HandleFunc[T any] func(msg T)
type MultiQueue[T any] interface {
Add(ctx context.Context, threadId string, msg T) (err error)
CloseThread(threadId string) (err error)
Close() (err error)
}
type multiQueue[T any] struct {
handler HandleFunc[T]
queueMaxSize int
threads map[string]*mb.MB[T]
mu sync.Mutex
closed bool
}
func (m *multiQueue[T]) Add(ctx context.Context, threadId string, msg T) (err error) {
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return ErrClosed
}
q, ok := m.threads[threadId]
if !ok {
q = m.startThread(threadId)
}
m.mu.Unlock()
return q.Add(ctx, msg)
}
func (m *multiQueue[T]) startThread(id string) *mb.MB[T] {
q := mb.New[T](m.queueMaxSize)
m.threads[id] = q
go m.threadLoop(q)
return q
}
func (m *multiQueue[T]) threadLoop(q *mb.MB[T]) {
for {
msg, err := q.WaitOne(context.Background())
if err != nil {
return
}
m.handler(msg)
}
}
func (m *multiQueue[T]) CloseThread(threadId string) (err error) {
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return ErrClosed
}
q, ok := m.threads[threadId]
if ok {
delete(m.threads, threadId)
}
m.mu.Unlock()
if !ok {
return ErrThreadNotExists
}
return q.Close()
}
func (m *multiQueue[T]) Close() (err error) {
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return ErrClosed
}
m.closed = true
threads := m.threads
m.mu.Unlock()
for _, q := range threads {
_ = q.Close()
}
return nil
}

View File

@ -0,0 +1,65 @@
package multiqueue
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestMultiQueue_Add(t *testing.T) {
t.Run("process", func(t *testing.T) {
var msgsCh = make(chan string)
var h HandleFunc[string] = func(msg string) {
msgsCh <- msg
}
q := New[string](h, 10)
defer func() {
require.NoError(t, q.Close())
}()
for i := 0; i < 5; i++ {
for j := 0; j < 5; j++ {
assert.NoError(t, q.Add(context.Background(), fmt.Sprint(i), fmt.Sprint(i, j)))
}
}
var msgs []string
for i := 0; i < 5*5; i++ {
select {
case <-time.After(time.Second / 4):
require.True(t, false, "timeout")
case msg := <-msgsCh:
msgs = append(msgs, msg)
}
}
assert.Len(t, msgs, 25)
})
t.Run("add to closed", func(t *testing.T) {
q := New[string](func(msg string) {}, 10)
require.NoError(t, q.Close())
assert.Equal(t, ErrClosed, q.Add(context.Background(), "1", "1"))
})
}
func TestMultiQueue_CloseThread(t *testing.T) {
var msgsCh = make(chan string)
var h HandleFunc[string] = func(msg string) {
msgsCh <- msg
}
q := New[string](h, 10)
defer func() {
require.NoError(t, q.Close())
}()
require.NoError(t, q.Add(context.Background(), "1", "1"))
require.NoError(t, q.Add(context.Background(), "1", "2"))
require.NoError(t, q.CloseThread("1"))
for i := 0; i < 2; i++ {
select {
case <-msgsCh:
case <-time.After(time.Second / 4):
require.False(t, true, "timeout")
}
}
}