diff --git a/common/commonspace/objectsync/streampool.go b/common/commonspace/objectsync/streampool.go index 46689ab8..8e5231c7 100644 --- a/common/commonspace/objectsync/streampool.go +++ b/common/commonspace/objectsync/streampool.go @@ -1,12 +1,12 @@ -package objectsync +package syncservice import ( "context" "errors" "fmt" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/ocache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" "go.uber.org/zap" "sync" "sync/atomic" @@ -16,7 +16,7 @@ import ( var ErrEmptyPeer = errors.New("don't have such a peer") var ErrStreamClosed = errors.New("stream is already closed") -var maxStreamReaders = 10 +var maxSimultaneousOperationsPerStream = 10 var syncWaitPeriod = 2 * time.Second var ErrSyncTimeout = errors.New("too long wait on sync receive") @@ -24,8 +24,8 @@ var ErrSyncTimeout = errors.New("too long wait on sync receive") // StreamPool can be made generic to work with different streams type StreamPool interface { ocache.ObjectLastUsage - AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) - AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) + AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) + AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) @@ -43,7 +43,7 @@ type responseWaiter struct { type streamPool struct { sync.Mutex - peerStreams map[string]spacesyncproto.ObjectSyncStream + peerStreams map[string]spacesyncproto.SpaceStream messageHandler MessageHandler wg *sync.WaitGroup waiters map[string]responseWaiter @@ -54,7 +54,7 @@ type streamPool struct { func newStreamPool(messageHandler MessageHandler) StreamPool { s := &streamPool{ - peerStreams: make(map[string]spacesyncproto.ObjectSyncStream), + peerStreams: make(map[string]spacesyncproto.SpaceStream), messageHandler: messageHandler, waiters: make(map[string]responseWaiter), wg: &sync.WaitGroup{}, @@ -110,7 +110,7 @@ func (s *streamPool) SendSync( func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) { s.lastUsage.Store(time.Now().Unix()) - getStreams := func() (streams []spacesyncproto.ObjectSyncStream) { + getStreams := func() (streams []spacesyncproto.SpaceStream) { for _, pId := range peers { stream, err := s.getOrDeleteStream(pId) if err != nil { @@ -139,7 +139,7 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn return err } -func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) { +func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) { stream, exists := s.peerStreams[id] if !exists { err = ErrEmptyPeer @@ -156,7 +156,7 @@ func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectS return } -func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) { +func (s *streamPool) getAllStreams() (streams []spacesyncproto.SpaceStream) { s.Lock() defer s.Unlock() Loop: @@ -188,7 +188,7 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) ( return nil } -func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) { +func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) { peerId, err := s.addStream(stream) if err != nil { return @@ -197,7 +197,7 @@ func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStrea return } -func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) { +func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) { peerId, err := s.addStream(stream) if err != nil { return @@ -205,7 +205,7 @@ func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream return s.readPeerLoop(peerId, stream) } -func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) { +func (s *streamPool) addStream(stream spacesyncproto.SpaceStream) (peerId string, err error) { s.Lock() peerId, err = peer.CtxPeerId(stream.Context()) if err != nil { @@ -245,74 +245,65 @@ func (s *streamPool) Close() (err error) { return nil } -func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { - var ( - log = log.With(zap.String("peerId", peerId)) - queue = NewActionQueue(maxStreamReaders, 100) - ) - queue.Run() - +func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { + log := log.With(zap.String("peerId", peerId)) defer func() { log.Debug("stopped reading stream from peer") s.removePeer(peerId, stream) - queue.Close() s.wg.Done() }() - log.Debug("started reading stream from peer") - stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool { - s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] - if exists { - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - waiter.ch <- msg - return true - } - s.waitersMx.Unlock() - return false + limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) + for i := 0; i < maxSimultaneousOperationsPerStream; i++ { + limiter <- struct{}{} } - process := func(msg *spacesyncproto.ObjectSyncMessage) error { + process := func(msg *spacesyncproto.ObjectSyncMessage) { log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) log.Debug("getting message with reply id") err = s.messageHandler(stream.Context(), peerId, msg) if err != nil { log.With(zap.Error(err)).Debug("message handling failed") } - return nil } for { - select { - case <-stream.Context().Done(): - return - default: - } var msg *spacesyncproto.ObjectSyncMessage msg, err = stream.Recv() s.lastUsage.Store(time.Now().Unix()) if err != nil { - stream.Close() return } if msg.ReplyId != "" { - // then we can send it directly to waiters without adding to queue or starting a reader - if stopWaiter(msg) { + s.waitersMx.Lock() + waiter, exists := s.waiters[msg.ReplyId] + if exists { + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + waiter.ch <- msg continue } + s.waitersMx.Unlock() + log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") } - queue.Send(func() error { - return process(msg) - }) + select { + case <-limiter: + case <-stream.Context().Done(): + return + } + + go func(msg *spacesyncproto.ObjectSyncMessage) { + process(msg) + limiter <- struct{}{} + }(msg) } } -func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { +func (s *streamPool) removePeer(peerId string, stream spacesyncproto.SpaceStream) (err error) { s.Lock() defer s.Unlock() mapStream, ok := s.peerStreams[peerId]