streampool fixes

This commit is contained in:
Sergey Cherepanov 2023-01-27 16:14:34 +03:00 committed by Mikhail Iudin
parent bf3b80825a
commit 0e7450fd52
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
5 changed files with 60 additions and 40 deletions

View File

@ -92,16 +92,22 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
if err != nil {
return err
}
var peerIds = make([]string, 0, len(peers))
for _, p := range peers {
if err = d.syncWithPeer(ctx, p); err != nil {
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
peerIds = append(peerIds, p.Id())
}
d.log.DebugCtx(ctx, "start diffsync", zap.Strings("peerIds", peerIds))
for _, p := range peers {
if err = d.syncWithPeer(peer.CtxWithPeerId(ctx, p.Id()), p); err != nil {
d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
}
}
d.log.Info("diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
d.log.InfoCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
return nil
}
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
ctx = logger.CtxWithFields(ctx, zap.String("peerId", p.Id()))
var (
cl = d.clientFactory.Client(p)
rdiff = NewRemoteDiff(d.spaceId, cl)
@ -126,7 +132,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
ctx = peer.CtxWithPeerId(ctx, p.Id())
d.pingTreesInCache(ctx, filteredIds)
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
@ -139,7 +144,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
}
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
ctx = logger.CtxWithFields(ctx, zap.String("op", "pingTrees"))
for _, tId := range trees {
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
if err != nil {

View File

@ -9,13 +9,13 @@ import (
type SyncAcl struct {
list.AclList
synchandler.SyncHandler
streamPool objectsync.MessagePool
messagePool objectsync.MessagePool
}
func NewSyncAcl(aclList list.AclList, streamPool objectsync.MessagePool) *SyncAcl {
func NewSyncAcl(aclList list.AclList, messagePool objectsync.MessagePool) *SyncAcl {
return &SyncAcl{
AclList: aclList,
SyncHandler: nil,
streamPool: streamPool,
messagePool: messagePool,
}
}

View File

@ -77,6 +77,6 @@ func (p *peer) UpdateLastUsage() {
}
func (p *peer) Close() (err error) {
log.Warn("peer close", zap.String("peerId", p.id))
log.Debug("peer close", zap.String("peerId", p.id))
return p.Conn.Close()
}

View File

@ -2,15 +2,12 @@ package streampool
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/app/logger"
"go.uber.org/zap"
"storj.io/drpc"
"sync/atomic"
)
var msgCounter atomic.Uint32
type stream struct {
peerId string
stream drpc.Stream
@ -40,7 +37,7 @@ func (sr *stream) readLoop() error {
return err
}
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId)
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", fmt.Sprintf("streamMsg.%d", msgCounter.Add(1))), zap.String("peerId", sr.peerId))
ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId))
if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil {
sr.l.Info("msg handle error", zap.Error(err))
return err

View File

@ -96,42 +96,60 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st
}
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) {
var funcs []func()
for _, p := range peers {
funcs = append(funcs, func() {
if e := s.sendOne(ctx, p, msg); e != nil {
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id()))
var sendOneFunc = func(sp peer.Peer) func() {
return func() {
if e := s.sendOne(ctx, sp, msg); e != nil {
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", sp.Id()))
} else {
log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id()))
log.DebugCtx(ctx, "send success", zap.String("peerId", sp.Id()))
}
})
}
}
return s.exec.Add(ctx, funcs...)
for _, p := range peers {
if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil {
return
}
}
return
}
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {
s.mu.Lock()
var streams []*stream
var streamsByPeer [][]*stream
for _, peerId := range peerIds {
var streams []*stream
for _, streamId := range s.streamIdsByPeer[peerId] {
streams = append(streams, s.streams[streamId])
}
if len(streams) != 0 {
streamsByPeer = append(streamsByPeer, streams)
}
}
s.mu.Unlock()
var funcs []func()
for _, st := range streams {
funcs = append(funcs, func() {
if e := st.write(msg); e != nil {
st.l.Debug("sendById write error", zap.Error(e))
} else {
st.l.DebugCtx(ctx, "sendById success")
var sendStreamsFunc = func(streams []*stream) func() {
return func() {
for _, st := range streams {
if e := st.write(msg); e != nil {
st.l.Debug("sendById write error", zap.Error(e))
} else {
st.l.DebugCtx(ctx, "sendById success")
return
}
}
})
}
}
if len(funcs) == 0 {
for _, streams := range streamsByPeer {
if err = s.exec.Add(ctx, sendStreamsFunc(streams)); err != nil {
return
}
}
if len(streamsByPeer) == 0 {
return pool.ErrUnableToConnect
}
return s.exec.Add(ctx, funcs...)
return
}
func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) {
@ -221,18 +239,19 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
}
}
s.mu.Unlock()
var funcs []func()
for _, st := range streams {
funcs = append(funcs, func() {
var sendStreamFunc = func(st *stream) func() {
return func() {
if e := st.write(msg); e != nil {
log.DebugCtx(ctx, "broadcast write error", zap.Error(e))
st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e))
} else {
st.l.DebugCtx(ctx, "broadcast success")
}
})
}
}
if len(funcs) == 0 {
return
for _, st := range streams {
s.exec.Add(ctx, sendStreamFunc(st))
}
return s.exec.Add(ctx, funcs...)
return
}
func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error {