streampool fixes
This commit is contained in:
parent
44ffc39729
commit
8f1cba3770
@ -92,16 +92,22 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
var peerIds = make([]string, 0, len(peers))
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
if err = d.syncWithPeer(ctx, p); err != nil {
|
peerIds = append(peerIds, p.Id())
|
||||||
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
}
|
||||||
|
d.log.DebugCtx(ctx, "start diffsync", zap.Strings("peerIds", peerIds))
|
||||||
|
for _, p := range peers {
|
||||||
|
if err = d.syncWithPeer(peer.CtxWithPeerId(ctx, p.Id()), p); err != nil {
|
||||||
|
d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.log.Info("diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
d.log.InfoCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||||
|
ctx = logger.CtxWithFields(ctx, zap.String("peerId", p.Id()))
|
||||||
var (
|
var (
|
||||||
cl = d.clientFactory.Client(p)
|
cl = d.clientFactory.Client(p)
|
||||||
rdiff = NewRemoteDiff(d.spaceId, cl)
|
rdiff = NewRemoteDiff(d.spaceId, cl)
|
||||||
@ -126,7 +132,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
|
|
||||||
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
|
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
|
||||||
|
|
||||||
ctx = peer.CtxWithPeerId(ctx, p.Id())
|
|
||||||
d.pingTreesInCache(ctx, filteredIds)
|
d.pingTreesInCache(ctx, filteredIds)
|
||||||
|
|
||||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||||
@ -139,7 +144,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
||||||
ctx = logger.CtxWithFields(ctx, zap.String("op", "pingTrees"))
|
|
||||||
for _, tId := range trees {
|
for _, tId := range trees {
|
||||||
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
|
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -9,13 +9,13 @@ import (
|
|||||||
type SyncAcl struct {
|
type SyncAcl struct {
|
||||||
list.AclList
|
list.AclList
|
||||||
synchandler.SyncHandler
|
synchandler.SyncHandler
|
||||||
streamPool objectsync.MessagePool
|
messagePool objectsync.MessagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSyncAcl(aclList list.AclList, streamPool objectsync.MessagePool) *SyncAcl {
|
func NewSyncAcl(aclList list.AclList, messagePool objectsync.MessagePool) *SyncAcl {
|
||||||
return &SyncAcl{
|
return &SyncAcl{
|
||||||
AclList: aclList,
|
AclList: aclList,
|
||||||
SyncHandler: nil,
|
SyncHandler: nil,
|
||||||
streamPool: streamPool,
|
messagePool: messagePool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -77,6 +77,6 @@ func (p *peer) UpdateLastUsage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) Close() (err error) {
|
func (p *peer) Close() (err error) {
|
||||||
log.Warn("peer close", zap.String("peerId", p.id))
|
log.Debug("peer close", zap.String("peerId", p.id))
|
||||||
return p.Conn.Close()
|
return p.Conn.Close()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,15 +2,12 @@ package streampool
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"github.com/anytypeio/any-sync/app/logger"
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"storj.io/drpc"
|
"storj.io/drpc"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
var msgCounter atomic.Uint32
|
|
||||||
|
|
||||||
type stream struct {
|
type stream struct {
|
||||||
peerId string
|
peerId string
|
||||||
stream drpc.Stream
|
stream drpc.Stream
|
||||||
@ -40,7 +37,7 @@ func (sr *stream) readLoop() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId)
|
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId)
|
||||||
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", fmt.Sprintf("streamMsg.%d", msgCounter.Add(1))), zap.String("peerId", sr.peerId))
|
ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId))
|
||||||
if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil {
|
if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil {
|
||||||
sr.l.Info("msg handle error", zap.Error(err))
|
sr.l.Info("msg handle error", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
|||||||
@ -96,42 +96,60 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) {
|
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) {
|
||||||
var funcs []func()
|
var sendOneFunc = func(sp peer.Peer) func() {
|
||||||
for _, p := range peers {
|
return func() {
|
||||||
funcs = append(funcs, func() {
|
if e := s.sendOne(ctx, sp, msg); e != nil {
|
||||||
if e := s.sendOne(ctx, p, msg); e != nil {
|
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", sp.Id()))
|
||||||
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id()))
|
|
||||||
} else {
|
} else {
|
||||||
log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id()))
|
log.DebugCtx(ctx, "send success", zap.String("peerId", sp.Id()))
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
return s.exec.Add(ctx, funcs...)
|
|
||||||
|
for _, p := range peers {
|
||||||
|
if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {
|
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
var streams []*stream
|
var streamsByPeer [][]*stream
|
||||||
for _, peerId := range peerIds {
|
for _, peerId := range peerIds {
|
||||||
|
var streams []*stream
|
||||||
for _, streamId := range s.streamIdsByPeer[peerId] {
|
for _, streamId := range s.streamIdsByPeer[peerId] {
|
||||||
streams = append(streams, s.streams[streamId])
|
streams = append(streams, s.streams[streamId])
|
||||||
}
|
}
|
||||||
|
if len(streams) != 0 {
|
||||||
|
streamsByPeer = append(streamsByPeer, streams)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
var funcs []func()
|
|
||||||
for _, st := range streams {
|
var sendStreamsFunc = func(streams []*stream) func() {
|
||||||
funcs = append(funcs, func() {
|
return func() {
|
||||||
if e := st.write(msg); e != nil {
|
for _, st := range streams {
|
||||||
st.l.Debug("sendById write error", zap.Error(e))
|
if e := st.write(msg); e != nil {
|
||||||
} else {
|
st.l.Debug("sendById write error", zap.Error(e))
|
||||||
st.l.DebugCtx(ctx, "sendById success")
|
} else {
|
||||||
|
st.l.DebugCtx(ctx, "sendById success")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
if len(funcs) == 0 {
|
|
||||||
|
for _, streams := range streamsByPeer {
|
||||||
|
if err = s.exec.Add(ctx, sendStreamsFunc(streams)); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(streamsByPeer) == 0 {
|
||||||
return pool.ErrUnableToConnect
|
return pool.ErrUnableToConnect
|
||||||
}
|
}
|
||||||
return s.exec.Add(ctx, funcs...)
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) {
|
func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) {
|
||||||
@ -221,18 +239,19 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
var funcs []func()
|
var sendStreamFunc = func(st *stream) func() {
|
||||||
for _, st := range streams {
|
return func() {
|
||||||
funcs = append(funcs, func() {
|
|
||||||
if e := st.write(msg); e != nil {
|
if e := st.write(msg); e != nil {
|
||||||
log.DebugCtx(ctx, "broadcast write error", zap.Error(e))
|
st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e))
|
||||||
|
} else {
|
||||||
|
st.l.DebugCtx(ctx, "broadcast success")
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
if len(funcs) == 0 {
|
for _, st := range streams {
|
||||||
return
|
s.exec.Add(ctx, sendStreamFunc(st))
|
||||||
}
|
}
|
||||||
return s.exec.Add(ctx, funcs...)
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error {
|
func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user