Update stream configuration and sync service
This commit is contained in:
parent
1bc9242644
commit
f420ac144c
@ -20,5 +20,5 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
|
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
|
||||||
return r.s.SyncService().StreamPool().AddAndReadStream(stream)
|
return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -148,7 +148,8 @@ func (s *space) testFill() {
|
|||||||
|
|
||||||
func (s *space) sync(ctx context.Context) error {
|
func (s *space) sync(ctx context.Context) error {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
peers, err := s.getPeers(ctx)
|
// diffing with responsible peers according to configuration
|
||||||
|
peers, err := s.nconf.ResponsiblePeers(ctx, s.id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -168,6 +169,7 @@ func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pingTreesInCache(ctx, newIds)
|
s.pingTreesInCache(ctx, newIds)
|
||||||
s.pingTreesInCache(ctx, changedIds)
|
s.pingTreesInCache(ctx, changedIds)
|
||||||
|
|
||||||
@ -181,19 +183,6 @@ func (s *space) pingTreesInCache(ctx context.Context, trees []string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) {
|
|
||||||
if s.nconf.IsResponsible(s.id) {
|
|
||||||
return s.nconf.AllPeers(ctx, s.id)
|
|
||||||
} else {
|
|
||||||
var p peer.Peer
|
|
||||||
p, err = s.nconf.OnePeer(ctx, s.id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return []peer.Peer{p}, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *space) Close() error {
|
func (s *space) Close() error {
|
||||||
s.periodicSync.Close()
|
s.periodicSync.Close()
|
||||||
return s.syncService.Close()
|
return s.syncService.Close()
|
||||||
|
|||||||
@ -17,7 +17,8 @@ const maxSimultaneousOperationsPerStream = 10
|
|||||||
|
|
||||||
// StreamPool can be made generic to work with different streams
|
// StreamPool can be made generic to work with different streams
|
||||||
type StreamPool interface {
|
type StreamPool interface {
|
||||||
AddAndReadStream(stream spacesyncproto.SpaceStream) (err error)
|
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
|
||||||
|
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
|
||||||
HasStream(peerId string) bool
|
HasStream(peerId string) bool
|
||||||
SyncClient
|
SyncClient
|
||||||
Close() (err error)
|
Close() (err error)
|
||||||
@ -151,7 +152,11 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) AddAndReadStream(stream spacesyncproto.SpaceStream) (err error) {
|
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) {
|
||||||
|
go s.AddAndReadStreamSync(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
peerId, err := GetPeerIdFromStreamContext(stream.Context())
|
peerId, err := GetPeerIdFromStreamContext(stream.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SyncService interface {
|
type SyncService interface {
|
||||||
@ -14,39 +15,56 @@ type SyncService interface {
|
|||||||
Close() (err error)
|
Close() (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const respPeersStreamCheckInterval = time.Second * 10
|
||||||
|
|
||||||
type syncService struct {
|
type syncService struct {
|
||||||
syncHandler SyncHandler
|
syncHandler SyncHandler
|
||||||
streamPool StreamPool
|
streamPool StreamPool
|
||||||
configuration nodeconf.Configuration
|
configuration nodeconf.Configuration
|
||||||
spaceId string
|
spaceId string
|
||||||
|
streamLoopCtx context.Context
|
||||||
|
stopStreamLoop context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncService) Run() {
|
||||||
|
s.streamLoopCtx, s.stopStreamLoop = context.WithCancel(context.Background())
|
||||||
|
s.streamCheckLoop(s.streamLoopCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncService) Close() (err error) {
|
func (s *syncService) Close() (err error) {
|
||||||
|
s.stopStreamLoop()
|
||||||
return s.streamPool.Close()
|
return s.streamPool.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) {
|
func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) {
|
||||||
msg := spacesyncproto.WrapHeadUpdate(update, header, treeId)
|
return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId))
|
||||||
peers, err := s.configuration.AllPeers(context.Background(), s.spaceId)
|
}
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, peer := range peers {
|
|
||||||
if s.streamPool.HasStream(peer.Id()) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cl := spacesyncproto.NewDRPCSpaceClient(peer)
|
|
||||||
stream, err := cl.Stream(ctx)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.streamPool.AddAndReadStream(stream)
|
func (s *syncService) streamCheckLoop(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
respPeers, err := s.configuration.ResponsiblePeers(ctx, s.spaceId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
for _, peer := range respPeers {
|
||||||
|
if s.streamPool.HasStream(peer.Id()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cl := spacesyncproto.NewDRPCSpaceClient(peer)
|
||||||
|
stream, err := cl.Stream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.streamPool.AddAndReadStreamAsync(stream)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-time.After(respPeersStreamCheckInterval):
|
||||||
|
break
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return s.streamPool.BroadcastAsync(msg)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncService) StreamPool() StreamPool {
|
func (s *syncService) StreamPool() StreamPool {
|
||||||
|
|||||||
@ -19,6 +19,8 @@ type Configuration interface {
|
|||||||
AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error)
|
AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error)
|
||||||
// OnePeer returns one of peer for spaceId
|
// OnePeer returns one of peer for spaceId
|
||||||
OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error)
|
OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error)
|
||||||
|
// ResponsiblePeers returns peers for the space id that are responsible for the space
|
||||||
|
ResponsiblePeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error)
|
||||||
// NodeIds returns list of peerId for given spaceId
|
// NodeIds returns list of peerId for given spaceId
|
||||||
NodeIds(spaceId string) []string
|
NodeIds(spaceId string) []string
|
||||||
// IsResponsible checks if current account responsible for given spaceId
|
// IsResponsible checks if current account responsible for given spaceId
|
||||||
@ -51,6 +53,20 @@ func (c *configuration) AllPeers(ctx context.Context, spaceId string) (peers []p
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *configuration) ResponsiblePeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) {
|
||||||
|
if c.IsResponsible(spaceId) {
|
||||||
|
return c.AllPeers(ctx, spaceId)
|
||||||
|
} else {
|
||||||
|
var one peer.Peer
|
||||||
|
one, err = c.OnePeer(ctx, spaceId)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
peers = []peer.Peer{one}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *configuration) OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) {
|
func (c *configuration) OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) {
|
||||||
nodeIds := c.NodeIds(spaceId)
|
nodeIds := c.NodeIds(spaceId)
|
||||||
return c.pool.GetOneOf(ctx, nodeIds)
|
return c.pool.GetOneOf(ctx, nodeIds)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user