Add two client script and fix common snapshot bug
This commit is contained in:
parent
82eb76c1ea
commit
2b3ae0fb94
@ -108,7 +108,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
newPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId, activeNodeIds)
|
newPeers, err := s.connector.DialInactiveResponsiblePeers(ctx, s.spaceId, activeNodeIds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("failed to dial peers", zap.Error(err))
|
s.log.Error("failed to dial peers", zap.Error(err))
|
||||||
return
|
return
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import (
|
|||||||
type ConfConnector interface {
|
type ConfConnector interface {
|
||||||
Configuration() Configuration
|
Configuration() Configuration
|
||||||
GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
|
GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
|
||||||
DialResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error)
|
DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type confConnector struct {
|
type confConnector struct {
|
||||||
@ -30,7 +30,7 @@ func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string)
|
|||||||
return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get, s.pool.GetOneOf)
|
return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get, s.pool.GetOneOf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *confConnector) DialResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) {
|
func (s *confConnector) DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) {
|
||||||
return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial, s.pool.DialOneOf)
|
return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial, s.pool.DialOneOf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -137,23 +137,27 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
|
|||||||
if !shouldVisit(entry.position, exists) {
|
if !shouldVisit(entry.position, exists) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if id == commonSnapshot {
|
||||||
|
commonSnapshotVisited = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
entry, err = r.loadEntry(id)
|
entry, err = r.loadEntry(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setting the counter when we visit
|
// setting the counter when we visit
|
||||||
r.cache[id] = visit(entry)
|
entry = visit(entry)
|
||||||
|
r.cache[id] = entry
|
||||||
|
|
||||||
for _, prev := range entry.change.PreviousIds {
|
for _, prev := range entry.change.PreviousIds {
|
||||||
if prev == commonSnapshot {
|
if prev == commonSnapshot {
|
||||||
commonSnapshotVisited = true
|
commonSnapshotVisited = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
entry, exists = r.cache[prev]
|
prevEntry, exists := r.cache[prev]
|
||||||
if !shouldVisit(entry.position, exists) {
|
if !shouldVisit(prevEntry.position, exists) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
r.idStack = append(r.idStack, prev)
|
r.idStack = append(r.idStack, prev)
|
||||||
|
|||||||
@ -11,7 +11,10 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api/client"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api/client"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api/node"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api/node"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/peers"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/peers"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
const CName = "debug.api"
|
const CName = "debug.api"
|
||||||
@ -379,4 +382,53 @@ func (s *service) registerScripts() {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}}
|
}}
|
||||||
|
s.scripts["create-many-two-clients"] = Script{Cmd: func(params []string) (res string, err error) {
|
||||||
|
if len(params) != 6 {
|
||||||
|
err = ErrIncorrectParamsCount
|
||||||
|
return
|
||||||
|
}
|
||||||
|
peer1, err := s.peers.Get(params[0])
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
peer2, err := s.peers.Get(params[1])
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
last, err := strconv.Atoi(params[5])
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if last <= 0 {
|
||||||
|
err = fmt.Errorf("incorrect number of steps")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
var mError errs.Group
|
||||||
|
createMany := func(peer peers.Peer) {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < last; i++ {
|
||||||
|
_, err := s.client.AddText(context.Background(), peer.Address, &clientproto.AddTextRequest{
|
||||||
|
SpaceId: params[2],
|
||||||
|
DocumentId: params[3],
|
||||||
|
Text: params[4],
|
||||||
|
IsSnapshot: rand.Int()%2 == 0,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
mError.Add(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, p := range []peers.Peer{peer1, peer2} {
|
||||||
|
wg.Add(1)
|
||||||
|
createMany(p)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if mError.Err() != nil {
|
||||||
|
err = mError.Err()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user