From 0e7450fd52c46628d42528eb0d447e969c9e4110 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 27 Jan 2023 16:14:34 +0300 Subject: [PATCH] streampool fixes --- commonspace/headsync/diffsyncer.go | 14 +++-- commonspace/object/acl/syncacl/syncacl.go | 6 +- net/peer/peer.go | 2 +- net/streampool/stream.go | 5 +- net/streampool/streampool.go | 73 ++++++++++++++--------- 5 files changed, 60 insertions(+), 40 deletions(-) diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 21fa49cb..9665702a 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -92,16 +92,22 @@ func (d *diffSyncer) Sync(ctx context.Context) error { if err != nil { return err } + var peerIds = make([]string, 0, len(peers)) for _, p := range peers { - if err = d.syncWithPeer(ctx, p); err != nil { - d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) + peerIds = append(peerIds, p.Id()) + } + d.log.DebugCtx(ctx, "start diffsync", zap.Strings("peerIds", peerIds)) + for _, p := range peers { + if err = d.syncWithPeer(peer.CtxWithPeerId(ctx, p.Id()), p); err != nil { + d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) } } - d.log.Info("diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) + d.log.InfoCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) return nil } func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { + ctx = logger.CtxWithFields(ctx, zap.String("peerId", p.Id())) var ( cl = d.clientFactory.Client(p) rdiff = NewRemoteDiff(d.spaceId, cl) @@ -126,7 +132,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - ctx = peer.CtxWithPeerId(ctx, p.Id()) d.pingTreesInCache(ctx, filteredIds) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), @@ -139,7 +144,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) } func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { - ctx = logger.CtxWithFields(ctx, zap.String("op", "pingTrees")) for _, tId := range trees { tree, err := d.cache.GetTree(ctx, d.spaceId, tId) if err != nil { diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index d18fea0e..1e2546b7 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -9,13 +9,13 @@ import ( type SyncAcl struct { list.AclList synchandler.SyncHandler - streamPool objectsync.MessagePool + messagePool objectsync.MessagePool } -func NewSyncAcl(aclList list.AclList, streamPool objectsync.MessagePool) *SyncAcl { +func NewSyncAcl(aclList list.AclList, messagePool objectsync.MessagePool) *SyncAcl { return &SyncAcl{ AclList: aclList, SyncHandler: nil, - streamPool: streamPool, + messagePool: messagePool, } } diff --git a/net/peer/peer.go b/net/peer/peer.go index 7e55d5e0..9c9f547d 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -77,6 +77,6 @@ func (p *peer) UpdateLastUsage() { } func (p *peer) Close() (err error) { - log.Warn("peer close", zap.String("peerId", p.id)) + log.Debug("peer close", zap.String("peerId", p.id)) return p.Conn.Close() } diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 06dbb0c9..065f322e 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -2,15 +2,12 @@ package streampool import ( "context" - "fmt" "github.com/anytypeio/any-sync/app/logger" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" ) -var msgCounter atomic.Uint32 - type stream struct { peerId string stream drpc.Stream @@ -40,7 +37,7 @@ func (sr *stream) readLoop() error { return err } ctx := streamCtx(context.Background(), sr.streamId, sr.peerId) - ctx = logger.CtxWithFields(ctx, zap.String("rootOp", fmt.Sprintf("streamMsg.%d", msgCounter.Add(1))), zap.String("peerId", sr.peerId)) + ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId)) if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil { sr.l.Info("msg handle error", zap.Error(err)) return err diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 81f3b678..388978df 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -96,42 +96,60 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st } func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) { - var funcs []func() - for _, p := range peers { - funcs = append(funcs, func() { - if e := s.sendOne(ctx, p, msg); e != nil { - log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id())) + var sendOneFunc = func(sp peer.Peer) func() { + return func() { + if e := s.sendOne(ctx, sp, msg); e != nil { + log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", sp.Id())) } else { - log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id())) + log.DebugCtx(ctx, "send success", zap.String("peerId", sp.Id())) } - }) + } } - return s.exec.Add(ctx, funcs...) + + for _, p := range peers { + if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil { + return + } + } + return } func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) { s.mu.Lock() - var streams []*stream + var streamsByPeer [][]*stream for _, peerId := range peerIds { + var streams []*stream for _, streamId := range s.streamIdsByPeer[peerId] { streams = append(streams, s.streams[streamId]) } + if len(streams) != 0 { + streamsByPeer = append(streamsByPeer, streams) + } } s.mu.Unlock() - var funcs []func() - for _, st := range streams { - funcs = append(funcs, func() { - if e := st.write(msg); e != nil { - st.l.Debug("sendById write error", zap.Error(e)) - } else { - st.l.DebugCtx(ctx, "sendById success") + + var sendStreamsFunc = func(streams []*stream) func() { + return func() { + for _, st := range streams { + if e := st.write(msg); e != nil { + st.l.Debug("sendById write error", zap.Error(e)) + } else { + st.l.DebugCtx(ctx, "sendById success") + return + } } - }) + } } - if len(funcs) == 0 { + + for _, streams := range streamsByPeer { + if err = s.exec.Add(ctx, sendStreamsFunc(streams)); err != nil { + return + } + } + if len(streamsByPeer) == 0 { return pool.ErrUnableToConnect } - return s.exec.Add(ctx, funcs...) + return } func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) { @@ -221,18 +239,19 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st } } s.mu.Unlock() - var funcs []func() - for _, st := range streams { - funcs = append(funcs, func() { + var sendStreamFunc = func(st *stream) func() { + return func() { if e := st.write(msg); e != nil { - log.DebugCtx(ctx, "broadcast write error", zap.Error(e)) + st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e)) + } else { + st.l.DebugCtx(ctx, "broadcast success") } - }) + } } - if len(funcs) == 0 { - return + for _, st := range streams { + s.exec.Add(ctx, sendStreamFunc(st)) } - return s.exec.Add(ctx, funcs...) + return } func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error {