Fix waiters in streampool
This commit is contained in:
parent
270a6bdcee
commit
021342bfe9
@ -1,12 +1,12 @@
|
||||
package objectsync
|
||||
package syncservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/ocache"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -16,7 +16,7 @@ import (
|
||||
var ErrEmptyPeer = errors.New("don't have such a peer")
|
||||
var ErrStreamClosed = errors.New("stream is already closed")
|
||||
|
||||
var maxStreamReaders = 10
|
||||
var maxSimultaneousOperationsPerStream = 10
|
||||
var syncWaitPeriod = 2 * time.Second
|
||||
|
||||
var ErrSyncTimeout = errors.New("too long wait on sync receive")
|
||||
@ -24,8 +24,8 @@ var ErrSyncTimeout = errors.New("too long wait on sync receive")
|
||||
// StreamPool can be made generic to work with different streams
|
||||
type StreamPool interface {
|
||||
ocache.ObjectLastUsage
|
||||
AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error)
|
||||
AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error)
|
||||
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
|
||||
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error)
|
||||
|
||||
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
||||
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
@ -43,7 +43,7 @@ type responseWaiter struct {
|
||||
|
||||
type streamPool struct {
|
||||
sync.Mutex
|
||||
peerStreams map[string]spacesyncproto.ObjectSyncStream
|
||||
peerStreams map[string]spacesyncproto.SpaceStream
|
||||
messageHandler MessageHandler
|
||||
wg *sync.WaitGroup
|
||||
waiters map[string]responseWaiter
|
||||
@ -54,7 +54,7 @@ type streamPool struct {
|
||||
|
||||
func newStreamPool(messageHandler MessageHandler) StreamPool {
|
||||
s := &streamPool{
|
||||
peerStreams: make(map[string]spacesyncproto.ObjectSyncStream),
|
||||
peerStreams: make(map[string]spacesyncproto.SpaceStream),
|
||||
messageHandler: messageHandler,
|
||||
waiters: make(map[string]responseWaiter),
|
||||
wg: &sync.WaitGroup{},
|
||||
@ -110,7 +110,7 @@ func (s *streamPool) SendSync(
|
||||
|
||||
func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
s.lastUsage.Store(time.Now().Unix())
|
||||
getStreams := func() (streams []spacesyncproto.ObjectSyncStream) {
|
||||
getStreams := func() (streams []spacesyncproto.SpaceStream) {
|
||||
for _, pId := range peers {
|
||||
stream, err := s.getOrDeleteStream(pId)
|
||||
if err != nil {
|
||||
@ -139,7 +139,7 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) {
|
||||
func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) {
|
||||
stream, exists := s.peerStreams[id]
|
||||
if !exists {
|
||||
err = ErrEmptyPeer
|
||||
@ -156,7 +156,7 @@ func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectS
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) {
|
||||
func (s *streamPool) getAllStreams() (streams []spacesyncproto.SpaceStream) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
Loop:
|
||||
@ -188,7 +188,7 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) {
|
||||
peerId, err := s.addStream(stream)
|
||||
if err != nil {
|
||||
return
|
||||
@ -197,7 +197,7 @@ func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStrea
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) {
|
||||
peerId, err := s.addStream(stream)
|
||||
if err != nil {
|
||||
return
|
||||
@ -205,7 +205,7 @@ func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream
|
||||
return s.readPeerLoop(peerId, stream)
|
||||
}
|
||||
|
||||
func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) {
|
||||
func (s *streamPool) addStream(stream spacesyncproto.SpaceStream) (peerId string, err error) {
|
||||
s.Lock()
|
||||
peerId, err = peer.CtxPeerId(stream.Context())
|
||||
if err != nil {
|
||||
@ -245,74 +245,65 @@ func (s *streamPool) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
var (
|
||||
log = log.With(zap.String("peerId", peerId))
|
||||
queue = NewActionQueue(maxStreamReaders, 100)
|
||||
)
|
||||
queue.Run()
|
||||
|
||||
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
|
||||
log := log.With(zap.String("peerId", peerId))
|
||||
defer func() {
|
||||
log.Debug("stopped reading stream from peer")
|
||||
s.removePeer(peerId, stream)
|
||||
queue.Close()
|
||||
s.wg.Done()
|
||||
}()
|
||||
|
||||
log.Debug("started reading stream from peer")
|
||||
|
||||
stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool {
|
||||
s.waitersMx.Lock()
|
||||
waiter, exists := s.waiters[msg.ReplyId]
|
||||
if exists {
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
waiter.ch <- msg
|
||||
return true
|
||||
}
|
||||
s.waitersMx.Unlock()
|
||||
return false
|
||||
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
|
||||
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
|
||||
limiter <- struct{}{}
|
||||
}
|
||||
|
||||
process := func(msg *spacesyncproto.ObjectSyncMessage) error {
|
||||
process := func(msg *spacesyncproto.ObjectSyncMessage) {
|
||||
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
|
||||
log.Debug("getting message with reply id")
|
||||
err = s.messageHandler(stream.Context(), peerId, msg)
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("message handling failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
var msg *spacesyncproto.ObjectSyncMessage
|
||||
msg, err = stream.Recv()
|
||||
s.lastUsage.Store(time.Now().Unix())
|
||||
if err != nil {
|
||||
stream.Close()
|
||||
return
|
||||
}
|
||||
|
||||
if msg.ReplyId != "" {
|
||||
// then we can send it directly to waiters without adding to queue or starting a reader
|
||||
if stopWaiter(msg) {
|
||||
s.waitersMx.Lock()
|
||||
waiter, exists := s.waiters[msg.ReplyId]
|
||||
if exists {
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
waiter.ch <- msg
|
||||
continue
|
||||
}
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
|
||||
}
|
||||
|
||||
queue.Send(func() error {
|
||||
return process(msg)
|
||||
})
|
||||
select {
|
||||
case <-limiter:
|
||||
case <-stream.Context().Done():
|
||||
return
|
||||
}
|
||||
|
||||
go func(msg *spacesyncproto.ObjectSyncMessage) {
|
||||
process(msg)
|
||||
limiter <- struct{}{}
|
||||
}(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
func (s *streamPool) removePeer(peerId string, stream spacesyncproto.SpaceStream) (err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
mapStream, ok := s.peerStreams[peerId]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user