Add two client script and fix common snapshot bug

This commit is contained in:
mcrakhman 2022-12-04 15:02:50 +01:00
parent cdfee1232e
commit 6ed6142129
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 63 additions and 7 deletions

View File

@ -108,7 +108,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
continue
}
}
newPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId, activeNodeIds)
newPeers, err := s.connector.DialInactiveResponsiblePeers(ctx, s.spaceId, activeNodeIds)
if err != nil {
s.log.Error("failed to dial peers", zap.Error(err))
return

View File

@ -10,7 +10,7 @@ import (
type ConfConnector interface {
Configuration() Configuration
GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
DialResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error)
DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error)
}
type confConnector struct {
@ -30,7 +30,7 @@ func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string)
return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get, s.pool.GetOneOf)
}
func (s *confConnector) DialResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) {
func (s *confConnector) DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) {
return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial, s.pool.DialOneOf)
}

View File

@ -137,23 +137,27 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
if !shouldVisit(entry.position, exists) {
continue
}
if id == commonSnapshot {
commonSnapshotVisited = true
continue
}
if !exists {
entry, err = r.loadEntry(id)
if err != nil {
continue
}
}
// setting the counter when we visit
r.cache[id] = visit(entry)
entry = visit(entry)
r.cache[id] = entry
for _, prev := range entry.change.PreviousIds {
if prev == commonSnapshot {
commonSnapshotVisited = true
break
}
entry, exists = r.cache[prev]
if !shouldVisit(entry.position, exists) {
prevEntry, exists := r.cache[prev]
if !shouldVisit(prevEntry.position, exists) {
continue
}
r.idStack = append(r.idStack, prev)

View File

@ -11,7 +11,10 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api/client"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api/node"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/peers"
"github.com/zeebo/errs"
"math/rand"
"strconv"
"sync"
)
const CName = "debug.api"
@ -379,4 +382,53 @@ func (s *service) registerScripts() {
}
return
}}
s.scripts["create-many-two-clients"] = Script{Cmd: func(params []string) (res string, err error) {
if len(params) != 6 {
err = ErrIncorrectParamsCount
return
}
peer1, err := s.peers.Get(params[0])
if err != nil {
return
}
peer2, err := s.peers.Get(params[1])
if err != nil {
return
}
last, err := strconv.Atoi(params[5])
if err != nil {
return
}
if last <= 0 {
err = fmt.Errorf("incorrect number of steps")
return
}
wg := &sync.WaitGroup{}
var mError errs.Group
createMany := func(peer peers.Peer) {
defer wg.Done()
for i := 0; i < last; i++ {
_, err := s.client.AddText(context.Background(), peer.Address, &clientproto.AddTextRequest{
SpaceId: params[2],
DocumentId: params[3],
Text: params[4],
IsSnapshot: rand.Int()%2 == 0,
})
if err != nil {
mError.Add(err)
return
}
}
}
for _, p := range []peers.Peer{peer1, peer2} {
wg.Add(1)
createMany(p)
}
wg.Wait()
if mError.Err() != nil {
err = mError.Err()
return
}
return
}}
}