Add more logic re sync service

This commit is contained in:
mcrakhman 2022-09-15 11:35:16 +02:00
parent f900cbff0b
commit aecb95bc92
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
6 changed files with 79 additions and 29 deletions

View File

@ -2,7 +2,6 @@ package commonspace
import ( import (
"context" "context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
) )
@ -21,6 +20,5 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
} }
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error {
return r.s.SyncService().StreamPool().AddStream(stream)
return fmt.Errorf("not implemented")
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/config"
) )
@ -37,10 +38,12 @@ func (s *service) Name() (name string) {
} }
func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) { func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) {
syncService := syncservice.NewSyncService(id, nil, s.configurationService.GetLast())
sp := &space{ sp := &space{
id: id, id: id,
nconf: s.configurationService.GetLast(), nconf: s.configurationService.GetLast(),
conf: s.config, conf: s.config,
syncService: syncService,
} }
if err := sp.Init(ctx); err != nil { if err := sp.Init(ctx); err != nil {
return nil, err return nil, err

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/config"
@ -19,18 +20,21 @@ type Space interface {
Id() string Id() string
SpaceSyncRpc() RpcHandler SpaceSyncRpc() RpcHandler
SyncService() syncservice.SyncService
Close() error Close() error
} }
type space struct { type space struct {
id string id string
nconf nodeconf.Configuration nconf nodeconf.Configuration
conf config.Space conf config.Space
diff ldiff.Diff diff ldiff.Diff
mu sync.RWMutex
rpc *rpcHandler rpc *rpcHandler
periodicSync *periodicSync periodicSync *periodicSync
mu sync.RWMutex syncService syncservice.SyncService
} }
func (s *space) Id() string { func (s *space) Id() string {
@ -49,6 +53,10 @@ func (s *space) SpaceSyncRpc() RpcHandler {
return s.rpc return s.rpc
} }
func (s *space) SyncService() syncservice.SyncService {
return s.syncService
}
func (s *space) testFill() { func (s *space) testFill() {
var n = 1000 var n = 1000
var els = make([]ldiff.Element, 0, n) var els = make([]ldiff.Element, 0, n)

View File

@ -1,6 +1,7 @@
package syncservice package syncservice
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
@ -18,6 +19,7 @@ const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams // StreamPool can be made generic to work with different streams
type StreamPool interface { type StreamPool interface {
AddStream(stream spacesyncproto.SpaceStream) (err error) AddStream(stream spacesyncproto.SpaceStream) (err error)
HasStream(peerId string) bool
SyncClient SyncClient
} }
@ -26,9 +28,7 @@ type SyncClient interface {
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
} }
type MessageHandler interface { type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
HandleMessage(peerId string, message *spacesyncproto.ObjectSyncMessage)
}
type streamPool struct { type streamPool struct {
sync.Mutex sync.Mutex
@ -43,6 +43,11 @@ func newStreamPool(messageHandler MessageHandler) StreamPool {
} }
} }
func (s *streamPool) HasStream(peerId string) (res bool) {
_, err := s.getStream(peerId)
return err == nil
}
func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) { func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
stream, err := s.getStream(peerId) stream, err := s.getStream(peerId)
if err != nil { if err != nil {
@ -101,15 +106,16 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (
func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) {
s.Lock() s.Lock()
defer s.Unlock()
peerId, err := getPeerIdFromStream(stream) peerId, err := getPeerIdFromStream(stream)
if err != nil { if err != nil {
s.Unlock()
return return
} }
s.peerStreams[peerId] = stream s.peerStreams[peerId] = stream
go s.readPeerLoop(peerId, stream) s.Unlock()
return
return s.readPeerLoop(peerId, stream)
} }
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
@ -134,13 +140,10 @@ Loop:
limiter <- struct{}{} limiter <- struct{}{}
}() }()
s.messageHandler.HandleMessage(peerId, msg) s.messageHandler(context.Background(), peerId, msg)
}() }()
} }
if err = s.removePeer(peerId); err != nil { return s.removePeer(peerId)
// TODO: log something
}
return
} }
func (s *streamPool) removePeer(peerId string) (err error) { func (s *streamPool) removePeer(peerId string) (err error) {

View File

@ -16,7 +16,7 @@ type syncHandler struct {
} }
type SyncHandler interface { type SyncHandler interface {
HandleSyncMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
} }
func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler { func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
@ -26,7 +26,7 @@ func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandl
} }
} }
func (s *syncHandler) HandleSyncMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) error { func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) error {
msg := message.GetContent() msg := message.GetContent()
switch { switch {
case msg.GetFullSyncRequest() != nil: case msg.GetFullSyncRequest() != nil:

View File

@ -1,13 +1,15 @@
package syncservice package syncservice
import ( import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
) )
type SyncService interface { type SyncService interface {
NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error)
StreamPool() StreamPool StreamPool() StreamPool
} }
@ -15,11 +17,29 @@ type syncService struct {
syncHandler SyncHandler syncHandler SyncHandler
streamPool StreamPool streamPool StreamPool
configuration nodeconf.Configuration configuration nodeconf.Configuration
spaceId string
} }
func (s *syncService) NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) {
msg := spacesyncproto.WrapHeadUpdate(update, header, treeId) msg := spacesyncproto.WrapHeadUpdate(update, header, treeId)
all peers, err := s.configuration.AllPeers(context.Background(), s.spaceId)
if err != nil {
return
}
for _, peer := range peers {
if !s.streamPool.HasStream(peer.Id()) {
cl := spacesyncproto.NewDRPCSpaceClient(peer)
stream, err := cl.Stream(ctx)
if err != nil {
continue
}
s.streamPool.AddStream(stream)
if err != nil {
continue
}
}
}
return s.streamPool.BroadcastAsync(msg) return s.streamPool.BroadcastAsync(msg)
} }
@ -27,6 +47,24 @@ func (s *syncService) StreamPool() StreamPool {
return s.streamPool return s.streamPool
} }
func newSyncService() { func NewSyncService(spaceId string, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService {
var syncHandler SyncHandler
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncHandler.HandleMessage(ctx, senderId, message)
})
syncHandler = newSyncHandler(cache, streamPool)
return newSyncService(spaceId, syncHandler, streamPool, configuration)
}
func newSyncService(
spaceId string,
syncHandler SyncHandler,
streamPool StreamPool,
configuration nodeconf.Configuration) *syncService {
return &syncService{
syncHandler: syncHandler,
streamPool: streamPool,
configuration: configuration,
spaceId: spaceId,
}
} }