diff --git a/service/configuration/configuration.go b/service/configuration/configuration.go index 1efa82b2..5701eca6 100644 --- a/service/configuration/configuration.go +++ b/service/configuration/configuration.go @@ -2,6 +2,7 @@ package configuration import ( "context" + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-chash" @@ -12,15 +13,23 @@ func New() Service { } type Configuration interface { + // Id returns current configuration id Id() string + // AllPeers returns all peers by spaceId except current account AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) + // OnePeer returns one of peer for spaceId OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) + // NodeIds returns list of peerId for given spaceId + NodeIds(spaceId string) []string + // IsResponsible checks if current account responsible for given spaceId + IsResponsible(spaceId string) bool } type configuration struct { - id string - pool pool.Pool - chash chash.CHash + id string + accountId string + pool pool.Pool + chash chash.CHash } func (c *configuration) Id() string { @@ -28,11 +37,41 @@ func (c *configuration) Id() string { } func (c *configuration) AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) { - //TODO implement me - panic("implement me") + nodeIds := c.NodeIds(spaceId) + peers = make([]peer.Peer, 0, len(nodeIds)) + for _, id := range nodeIds { + p, e := c.pool.DialAndAddPeer(ctx, id) + if e == nil { + peers = append(peers, p) + } + } + if len(peers) == 0 { + return nil, fmt.Errorf("unable to connect to any node") + } + return } func (c *configuration) OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) { - //TODO implement me - panic("implement me") + nodeIds := c.NodeIds(spaceId) + return c.pool.GetOrDialOneOf(ctx, nodeIds) +} + +func (c *configuration) NodeIds(spaceId string) []string { + members := c.chash.GetMembers(spaceId) + res := make([]string, 0, len(members)) + for _, m := range members { + if m.Id() != c.accountId { + res = append(res, m.Id()) + } + } + return res +} + +func (c *configuration) IsResponsible(spaceId string) bool { + for _, m := range c.chash.GetMembers(spaceId) { + if m.Id() == c.accountId { + return true + } + } + return false } diff --git a/service/configuration/service.go b/service/configuration/service.go index a09d8db1..9cd920f7 100644 --- a/service/configuration/service.go +++ b/service/configuration/service.go @@ -38,8 +38,9 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) { s.pool = a.MustComponent(pool.CName).(pool.Pool) configNodes := a.MustComponent(node.CName).(node.Service).Nodes() config := &configuration{ - id: "config", - pool: s.pool, + id: "config", + accountId: s.accountId, + pool: s.pool, } if config.chash, err = chash.New(chash.Config{ PartitionCount: partitionCount, diff --git a/service/net/pool/pool.go b/service/net/pool/pool.go index 0de81d19..97a7dcc1 100644 --- a/service/net/pool/pool.go +++ b/service/net/pool/pool.go @@ -11,6 +11,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" "go.uber.org/zap" + "math/rand" "sync" "sync/atomic" ) @@ -34,11 +35,12 @@ func NewPool() Pool { type Handler func(ctx context.Context, msg *Message) (err error) type Pool interface { - DialAndAddPeer(ctx context.Context, id string) (err error) AddAndReadPeer(peer peer.Peer) (err error) AddHandler(msgType syncproto.MessageType, h Handler) AddPeerIdToGroup(peerId, groupId string) (err error) RemovePeerIdFromGroup(peerId, groupId string) (err error) + DialAndAddPeer(ctx context.Context, id string) (peer.Peer, error) + GetOrDialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) SendAndWaitResponse(ctx context.Context, id string, s *syncproto.Message) (resp *Message, err error) @@ -89,25 +91,29 @@ func (p *pool) AddHandler(msgType syncproto.MessageType, h Handler) { p.handlers[msgType] = append(p.handlers[msgType], h) } -func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (err error) { +func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (peer.Peer, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { - return ErrPoolClosed + return nil, ErrPoolClosed } - if _, ok := p.peersById[peerId]; ok { - return nil + return p.dialAndAdd(ctx, peerId) +} + +func (p *pool) dialAndAdd(ctx context.Context, peerId string) (peer.Peer, error) { + if peer, ok := p.peersById[peerId]; ok { + return peer.peer, nil } peer, err := p.dialer.Dial(ctx, peerId) if err != nil { - return + return nil, err } p.peersById[peer.Id()] = &peerEntry{ peer: peer, } p.wg.Add(1) go p.readPeerLoop(peer) - return nil + return peer, nil } func (p *pool) AddAndReadPeer(peer peer.Peer) (err error) { @@ -211,6 +217,38 @@ func (p *pool) SendAndWaitResponse(ctx context.Context, peerId string, msg *sync return } +func (p *pool) GetOrDialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) { + p.mu.RLock() + if p.closed { + p.mu.RUnlock() + return nil, ErrPoolClosed + } + for _, peerId := range peerIds { + peer, ok := p.peersById[peerId] + if ok { + p.mu.RUnlock() + return peer.peer, nil + } + } + p.mu.RUnlock() + rand.Shuffle(len(peerIds), func(i, j int) { + peerIds[i], peerIds[j] = peerIds[j], peerIds[i] + }) + p.mu.Lock() + defer p.mu.Unlock() + var lastErr error + for _, peerId := range peerIds { + peer, err := p.dialAndAdd(ctx, peerId) + if err != nil { + lastErr = err + continue + } else { + return peer, nil + } + } + return nil, lastErr +} + func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) { //TODO implement me panic("implement me") diff --git a/service/space/service.go b/service/space/service.go index bab561f4..364ce91d 100644 --- a/service/space/service.go +++ b/service/space/service.go @@ -7,6 +7,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool/handler" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" @@ -29,17 +30,20 @@ type Service interface { } type service struct { - conf config.Space - cache ocache.OCache - pool pool.Pool + conf config.Space + cache ocache.OCache + pool pool.Pool + confService configuration.Service } func (s *service) Init(ctx context.Context, a *app.App) (err error) { s.conf = a.MustComponent(config.CName).(*config.Config).Space s.pool = a.MustComponent(pool.CName).(pool.Pool) + s.confService = a.MustComponent(configuration.CName).(configuration.Service) ttlSec := time.Second * time.Duration(s.conf.GCTTL) s.cache = ocache.New(s.loadSpace, ocache.WithTTL(ttlSec), ocache.WithGCPeriod(time.Minute)) s.pool.AddHandler(syncproto.MessageType_MessageTypeSpace, handler.Reply{ReplyHandler: s}.Handle) + return nil } @@ -53,7 +57,7 @@ func (s *service) Run(ctx context.Context) (err error) { func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) { // TODO: load from database here - sp := &space{s: s, id: id} + sp := &space{s: s, id: id, conf: s.confService.GetLast()} if err = sp.Run(ctx); err != nil { return nil, err } diff --git a/service/space/space.go b/service/space/space.go index e88c4c75..776df945 100644 --- a/service/space/space.go +++ b/service/space/space.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" "go.uber.org/zap" @@ -21,8 +22,8 @@ type Space interface { // type space struct { - id string - + id string + conf configuration.Configuration diff ldiff.Diff diffHandler func() syncCtx context.Context @@ -105,10 +106,47 @@ func (s *space) syncLoop() { } func (s *space) sync(ctx context.Context) error { - + peerIds, err := s.peerIds(ctx) + if err != nil { + return err + } + for _, peerId := range peerIds { + if err := s.syncWithPeer(ctx, peerId); err != nil { + log.Error("can't sync with peer", zap.String("peer", peerId), zap.Error(err)) + } + } return nil } +func (s *space) syncWithPeer(ctx context.Context, peerId string) (err error) { + rdiff := remotediff.NewRemoteDiff(s.s.pool, peerId, s.id) + newIds, changedIds, removedIds, err := s.diff.Diff(ctx, rdiff) + if err != nil { + return nil + } + log.Info("sync done:", zap.Strings("newIds", newIds), zap.Strings("changedIds", changedIds), zap.Strings("removedIds", removedIds)) + return +} + +func (s *space) peerIds(ctx context.Context) (peerIds []string, err error) { + if s.conf.IsResponsible(s.id) { + peers, err := s.conf.AllPeers(ctx, s.id) + if err != nil { + return nil, err + } + for _, p := range peers { + peerIds = append(peerIds, p.Id()) + } + } else { + peer, err := s.conf.OnePeer(ctx, s.id) + if err != nil { + return nil, err + } + peerIds = append(peerIds, peer.Id()) + } + return +} + func (s *space) Close() error { s.syncCancel() <-s.syncLoopDone diff --git a/service/sync/message/service.go b/service/sync/message/service.go index 8f71f597..efe76900 100644 --- a/service/sync/message/service.go +++ b/service/sync/message/service.go @@ -75,7 +75,7 @@ func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err err } func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err error) { - err = s.pool.DialAndAddPeer(context.Background(), peerId) + _, err = s.pool.DialAndAddPeer(context.Background(), peerId) if err != nil { return }