any-sync/commonspace/objectsync/streamchecker.go
2023-01-05 15:34:09 +03:00

134 lines
3.5 KiB
Go

package objectsync
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/rpc/rpcerr"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"time"
)
type StreamChecker interface {
CheckResponsiblePeers()
CheckPeerConnection(peerId string) (err error)
}
type streamChecker struct {
spaceId string
connector confconnector.ConfConnector
streamPool StreamPool
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
syncCtx context.Context
lastCheck *atomic.Time
}
const streamCheckerInterval = time.Second * 10
func NewStreamChecker(
spaceId string,
connector confconnector.ConfConnector,
streamPool StreamPool,
clientFactory spacesyncproto.ClientFactory,
syncCtx context.Context,
log *zap.Logger) StreamChecker {
return &streamChecker{
spaceId: spaceId,
connector: connector,
streamPool: streamPool,
clientFactory: clientFactory,
log: log,
syncCtx: syncCtx,
lastCheck: atomic.NewTime(time.Time{}),
}
}
func (s *streamChecker) CheckResponsiblePeers() {
lastCheck := s.lastCheck.Load()
now := time.Now()
if lastCheck.Add(streamCheckerInterval).After(now) {
return
}
s.lastCheck.Store(now)
var (
activeNodeIds []string
configuration = s.connector.Configuration()
)
nodeIds := configuration.NodeIds(s.spaceId)
for _, nodeId := range nodeIds {
if s.streamPool.HasActiveStream(nodeId) {
s.log.Debug("has active stream for", zap.String("id", nodeId))
activeNodeIds = append(activeNodeIds, nodeId)
continue
}
}
s.log.Debug("total streams", zap.Int("total", len(activeNodeIds)))
newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds)
if err != nil {
s.log.Error("failed to dial peers", zap.Error(err))
return
}
for _, p := range newPeers {
err := s.createStream(p)
if err != nil {
log.With(zap.Error(err)).Error("failed to create stream")
continue
}
s.log.Debug("reading stream for", zap.String("id", p.Id()))
}
return
}
func (s *streamChecker) CheckPeerConnection(peerId string) (err error) {
if s.streamPool.HasActiveStream(peerId) {
return
}
var (
configuration = s.connector.Configuration()
pool = s.connector.Pool()
)
nodeIds := configuration.NodeIds(s.spaceId)
// we don't know the address of the peer
if !slices.Contains(nodeIds, peerId) {
err = fmt.Errorf("don't know the address of peer %s", peerId)
return
}
newPeer, err := pool.Dial(s.syncCtx, peerId)
if err != nil {
return
}
return s.createStream(newPeer)
}
func (s *streamChecker) createStream(p peer.Peer) (err error) {
stream, err := s.clientFactory.Client(p).ObjectSyncStream(s.syncCtx)
if err != nil {
// so here probably the request is failed because there is no such space,
// but diffService should handle such cases by sending pushSpace
err = fmt.Errorf("failed to open stream: %w", rpcerr.Unwrap(err))
return
}
// sending empty message for the server to understand from which space is it coming
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
if err != nil {
err = fmt.Errorf("failed to send first message to stream: %w", rpcerr.Unwrap(err))
return
}
err = s.streamPool.AddAndReadStreamAsync(stream)
if err != nil {
err = fmt.Errorf("failed to read from stream async: %w", err)
return
}
return
}