Change solution to use queue for better performance

This commit is contained in:
mcrakhman 2022-12-27 22:26:07 +01:00 committed by Mikhail Iudin
parent e18f71111a
commit 5794ea1650
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
2 changed files with 60 additions and 24 deletions

View File

@ -42,13 +42,13 @@ func (q *actionQueue) Run() {
} }
func (q *actionQueue) read() { func (q *actionQueue) read() {
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) limiter := make(chan struct{}, maxStreamReaders)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ { for i := 0; i < maxStreamReaders; i++ {
limiter <- struct{}{} limiter <- struct{}{}
} }
defer func() { defer func() {
// wait until all operations are done // wait until all operations are done
for i := 0; i < maxSimultaneousOperationsPerStream; i++ { for i := 0; i < maxStreamReaders; i++ {
<-limiter <-limiter
} }
close(q.queueDone) close(q.queueDone)

View File

@ -16,7 +16,7 @@ import (
var ErrEmptyPeer = errors.New("don't have such a peer") var ErrEmptyPeer = errors.New("don't have such a peer")
var ErrStreamClosed = errors.New("stream is already closed") var ErrStreamClosed = errors.New("stream is already closed")
var maxSimultaneousOperationsPerStream = 10 var maxStreamReaders = 10
var syncWaitPeriod = 2 * time.Second var syncWaitPeriod = 2 * time.Second
var ErrSyncTimeout = errors.New("too long wait on sync receive") var ErrSyncTimeout = errors.New("too long wait on sync receive")
@ -246,7 +246,14 @@ func (s *streamPool) Close() (err error) {
} }
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
log := log.With(zap.String("peerId", peerId)) var (
msg *spacesyncproto.ObjectSyncMessage
mx = sync.Mutex{}
maxWaitMsgs = 500
queue = make([]*spacesyncproto.ObjectSyncMessage, 0, 10)
readers = 0
log = log.With(zap.String("peerId", peerId))
)
defer func() { defer func() {
log.Debug("stopped reading stream from peer") log.Debug("stopped reading stream from peer")
s.removePeer(peerId, stream) s.removePeer(peerId, stream)
@ -254,22 +261,45 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn
}() }()
log.Debug("started reading stream from peer") log.Debug("started reading stream from peer")
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) readQueue := func() {
for i := 0; i < maxSimultaneousOperationsPerStream; i++ { defer func() {
limiter <- struct{}{} mx.Lock()
} readers--
mx.Unlock()
}()
process := func(msg *spacesyncproto.ObjectSyncMessage) { for {
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) select {
log.Debug("getting message with reply id") case <-stream.Context().Done():
err = s.messageHandler(stream.Context(), peerId, msg) return
if err != nil { default:
log.With(zap.Error(err)).Debug("message handling failed") }
mx.Lock()
if len(queue) == 0 {
mx.Unlock()
return
}
newEl := queue[0]
queue = queue[1:]
mx.Unlock()
log := log.With(zap.String("replyId", newEl.ReplyId), zap.String("object id", newEl.ObjectId))
log.Debug("getting message with reply id")
err = s.messageHandler(stream.Context(), peerId, newEl)
if err != nil {
log.With(zap.Error(err)).Debug("message handling failed")
}
} }
} }
for { for {
var msg *spacesyncproto.ObjectSyncMessage select {
case <-stream.Context().Done():
return
default:
}
msg, err = stream.Recv() msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix()) s.lastUsage.Store(time.Now().Unix())
if err != nil { if err != nil {
@ -277,6 +307,7 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn
} }
if msg.ReplyId != "" { if msg.ReplyId != "" {
// then we can send it directly to waiters without adding to queue or starting a reader
s.waitersMx.Lock() s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId] waiter, exists := s.waiters[msg.ReplyId]
if exists { if exists {
@ -290,16 +321,21 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
} }
select { mx.Lock()
case <-limiter: queue = append(queue, msg)
case <-stream.Context().Done(): if len(queue) > maxWaitMsgs {
return queue = queue[len(queue)-maxWaitMsgs:]
} }
// if we already have max goroutines reading the queue in parallel
if readers >= maxStreamReaders {
mx.Unlock()
continue
}
readers++
mx.Unlock()
go func(msg *spacesyncproto.ObjectSyncMessage) { // starting another reader
process(msg) go readQueue()
limiter <- struct{}{}
}(msg)
} }
} }