diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go index 9a2d6993..e94df0b8 100644 --- a/common/commonspace/rpchandler.go +++ b/common/commonspace/rpchandler.go @@ -2,7 +2,6 @@ package commonspace import ( "context" - "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" ) @@ -21,6 +20,5 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR } func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { - - return fmt.Errorf("not implemented") + return r.s.SyncService().StreamPool().AddStream(stream) } diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 85dd8ae9..daeb321d 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -4,6 +4,7 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" ) @@ -37,10 +38,12 @@ func (s *service) Name() (name string) { } func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) { + syncService := syncservice.NewSyncService(id, nil, s.configurationService.GetLast()) sp := &space{ - id: id, - nconf: s.configurationService.GetLast(), - conf: s.config, + id: id, + nconf: s.configurationService.GetLast(), + conf: s.config, + syncService: syncService, } if err := sp.Init(ctx); err != nil { return nil, err diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 4f83dd97..0c223a09 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" @@ -19,18 +20,21 @@ type Space interface { Id() string SpaceSyncRpc() RpcHandler + SyncService() syncservice.SyncService Close() error } type space struct { - id string - nconf nodeconf.Configuration - conf config.Space - diff ldiff.Diff + id string + nconf nodeconf.Configuration + conf config.Space + diff ldiff.Diff + mu sync.RWMutex + rpc *rpcHandler periodicSync *periodicSync - mu sync.RWMutex + syncService syncservice.SyncService } func (s *space) Id() string { @@ -49,6 +53,10 @@ func (s *space) SpaceSyncRpc() RpcHandler { return s.rpc } +func (s *space) SyncService() syncservice.SyncService { + return s.syncService +} + func (s *space) testFill() { var n = 1000 var els = make([]ldiff.Element, 0, n) diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 03cc6dcd..47d3063c 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -1,6 +1,7 @@ package syncservice import ( + "context" "errors" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" @@ -18,6 +19,7 @@ const maxSimultaneousOperationsPerStream = 10 // StreamPool can be made generic to work with different streams type StreamPool interface { AddStream(stream spacesyncproto.SpaceStream) (err error) + HasStream(peerId string) bool SyncClient } @@ -26,9 +28,7 @@ type SyncClient interface { BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) } -type MessageHandler interface { - HandleMessage(peerId string, message *spacesyncproto.ObjectSyncMessage) -} +type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) type streamPool struct { sync.Mutex @@ -43,6 +43,11 @@ func newStreamPool(messageHandler MessageHandler) StreamPool { } } +func (s *streamPool) HasStream(peerId string) (res bool) { + _, err := s.getStream(peerId) + return err == nil +} + func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) { stream, err := s.getStream(peerId) if err != nil { @@ -101,15 +106,16 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) ( func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) { s.Lock() - defer s.Unlock() peerId, err := getPeerIdFromStream(stream) if err != nil { + s.Unlock() return } s.peerStreams[peerId] = stream - go s.readPeerLoop(peerId, stream) - return + s.Unlock() + + return s.readPeerLoop(peerId, stream) } func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { @@ -134,13 +140,10 @@ Loop: limiter <- struct{}{} }() - s.messageHandler.HandleMessage(peerId, msg) + s.messageHandler(context.Background(), peerId, msg) }() } - if err = s.removePeer(peerId); err != nil { - // TODO: log something - } - return + return s.removePeer(peerId) } func (s *streamPool) removePeer(peerId string) (err error) { diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index 2aa2ea9b..fc04468d 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -16,7 +16,7 @@ type syncHandler struct { } type SyncHandler interface { - HandleSyncMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) + HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) } func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler { @@ -26,7 +26,7 @@ func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandl } } -func (s *syncHandler) HandleSyncMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) error { +func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) error { msg := message.GetContent() switch { case msg.GetFullSyncRequest() != nil: diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index c9ce956d..4570ce85 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -1,13 +1,15 @@ package syncservice import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" ) type SyncService interface { - NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) + NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) StreamPool() StreamPool } @@ -15,11 +17,29 @@ type syncService struct { syncHandler SyncHandler streamPool StreamPool configuration nodeconf.Configuration + spaceId string } -func (s *syncService) NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { +func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { msg := spacesyncproto.WrapHeadUpdate(update, header, treeId) - all + peers, err := s.configuration.AllPeers(context.Background(), s.spaceId) + if err != nil { + return + } + for _, peer := range peers { + if !s.streamPool.HasStream(peer.Id()) { + cl := spacesyncproto.NewDRPCSpaceClient(peer) + stream, err := cl.Stream(ctx) + if err != nil { + continue + } + + s.streamPool.AddStream(stream) + if err != nil { + continue + } + } + } return s.streamPool.BroadcastAsync(msg) } @@ -27,6 +47,24 @@ func (s *syncService) StreamPool() StreamPool { return s.streamPool } -func newSyncService() { - +func NewSyncService(spaceId string, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService { + var syncHandler SyncHandler + streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + return syncHandler.HandleMessage(ctx, senderId, message) + }) + syncHandler = newSyncHandler(cache, streamPool) + return newSyncService(spaceId, syncHandler, streamPool, configuration) +} + +func newSyncService( + spaceId string, + syncHandler SyncHandler, + streamPool StreamPool, + configuration nodeconf.Configuration) *syncService { + return &syncService{ + syncHandler: syncHandler, + streamPool: streamPool, + configuration: configuration, + spaceId: spaceId, + } }