2022-12-03 17:54:10 +01:00

154 lines
4.6 KiB
Go

package syncservice
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectgetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"go.uber.org/zap"
"time"
)
var log = logger.NewNamed("syncservice").Sugar()
type SyncService interface {
ocache.ObjectLastUsage
synchandler.SyncHandler
StreamPool() StreamPool
Init(getter objectgetter.ObjectGetter)
Close() (err error)
}
const respPeersStreamCheckInterval = time.Second * 10
type syncService struct {
spaceId string
streamPool StreamPool
clientFactory spacesyncproto.ClientFactory
objectGetter objectgetter.ObjectGetter
streamLoopCtx context.Context
stopStreamLoop context.CancelFunc
connector nodeconf.ConfConnector
streamLoopDone chan struct{}
log *zap.SugaredLogger // TODO: change to logger
}
func NewSyncService(
spaceId string,
confConnector nodeconf.ConfConnector) (syncService SyncService) {
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncService.HandleMessage(ctx, senderId, message)
})
syncService = newSyncService(
spaceId,
streamPool,
spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient),
confConnector)
return
}
func newSyncService(
spaceId string,
streamPool StreamPool,
clientFactory spacesyncproto.ClientFactory,
connector nodeconf.ConfConnector) *syncService {
return &syncService{
streamPool: streamPool,
connector: connector,
clientFactory: clientFactory,
spaceId: spaceId,
log: log.With(zap.String("id", spaceId)),
streamLoopDone: make(chan struct{}),
}
}
func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) {
s.objectGetter = objectGetter
s.streamLoopCtx, s.stopStreamLoop = context.WithCancel(context.Background())
go s.responsibleStreamCheckLoop(s.streamLoopCtx)
}
func (s *syncService) Close() (err error) {
s.stopStreamLoop()
<-s.streamLoopDone
return s.streamPool.Close()
}
func (s *syncService) LastUsage() time.Time {
return s.streamPool.LastUsage()
}
func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
s.log.With(zap.String("peerId", senderId), zap.String("objectId", message.ObjectId)).Debug("handling message")
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
if err != nil {
return
}
return obj.HandleMessage(ctx, senderId, message)
}
func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
defer close(s.streamLoopDone)
checkResponsiblePeers := func() {
var (
activeNodeIds []string
configuration = s.connector.Configuration()
)
for _, nodeId := range configuration.NodeIds(s.spaceId) {
if s.streamPool.HasActiveStream(nodeId) {
s.log.Debug("has active stream for", zap.String("id", nodeId))
activeNodeIds = append(activeNodeIds, nodeId)
continue
}
}
newPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId, activeNodeIds)
if err != nil {
s.log.Error("failed to dial peers", zap.Error(err))
return
}
for _, p := range newPeers {
stream, err := s.clientFactory.Client(p).Stream(ctx)
if err != nil {
err = rpcerr.Unwrap(err)
s.log.Errorf("failed to open stream: %v", err)
// so here probably the request is failed because there is no such space,
// but diffService should handle such cases by sending pushSpace
continue
}
// sending empty message for the server to understand from which space is it coming
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
if err != nil {
err = rpcerr.Unwrap(err)
s.log.Errorf("failed to send first message to stream: %v", err)
continue
}
s.log.Debug("reading stream for", zap.String("id", p.Id()))
s.streamPool.AddAndReadStreamAsync(stream)
}
}
checkResponsiblePeers()
ticker := time.NewTicker(respPeersStreamCheckInterval)
defer ticker.Stop()
for {
select {
case <-s.streamLoopCtx.Done():
return
case <-ticker.C:
checkResponsiblePeers()
}
}
}
func (s *syncService) StreamPool() StreamPool {
return s.streamPool
}