Create fixture draft

This commit is contained in:
mcrakhman 2023-03-14 11:41:29 +01:00 committed by Mikhail Iudin
parent 24e400ad70
commit b27ca28bb3
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
2 changed files with 90 additions and 14 deletions

View File

@ -12,13 +12,16 @@ import (
"github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/cheggaaa/mb/v3" "github.com/cheggaaa/mb/v3"
"github.com/stretchr/testify/require"
"sync" "sync"
"testing"
) )
type processMsg struct { type processMsg struct {
msg *spacesyncproto.ObjectSyncMessage msg *spacesyncproto.ObjectSyncMessage
senderId string senderId string
receiverId string receiverId string
userMsg *objecttree.RawChangesPayload
} }
type messageLog struct { type messageLog struct {
@ -48,11 +51,19 @@ func (p *processSyncHandler) manager() *processPeerManager {
return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager)
} }
func (p *processSyncHandler) tree() *broadcastTree {
return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree)
}
func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) { func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) {
return p.batcher.Add(ctx, msg) return p.batcher.Add(ctx, msg)
} }
func (p *processSyncHandler) run(ctx context.Context, wg *sync.WaitGroup) { func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) {
p.batcher.Add(ctx, processMsg{userMsg: &changes})
}
func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@ -61,6 +72,14 @@ func (p *processSyncHandler) run(ctx context.Context, wg *sync.WaitGroup) {
if err != nil { if err != nil {
return return
} }
if res.userMsg != nil {
p.tree().Lock()
userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg)
require.NoError(t, err)
fmt.Println("user add result", userRes.Heads)
p.tree().Unlock()
continue
}
err = p.SyncHandler.HandleMessage(ctx, res.senderId, res.msg) err = p.SyncHandler.HandleMessage(ctx, res.senderId, res.msg)
if err != nil { if err != nil {
fmt.Println("error handling message", err.Error()) fmt.Println("error handling message", err.Error())
@ -155,14 +174,62 @@ func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (obje
return objecttree.BuildTestableTree(aclList, storage) return objecttree.BuildTestableTree(aclList, storage)
} }
type fixtureDeps struct {
aclList list.AclList
initStorage *treestorage.InMemoryTreeStorage
connectionMap map[string][]string
}
type processFixture struct { type processFixture struct {
handlers []*processSyncHandler handlers map[string]*processSyncHandler
log *messageLog log *messageLog
wg *sync.WaitGroup wg *sync.WaitGroup
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
} }
func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture {
var (
handlers = map[string]*processSyncHandler{}
log = newMessageLog()
wg = sync.WaitGroup{}
ctx, cancel = context.WithCancel(context.Background())
)
for peerId := range deps.connectionMap {
stCopy := deps.initStorage.Copy()
testTree, err := createTestTree(deps.aclList, stCopy)
require.NoError(t, err)
handler := createSyncHandler(peerId, spaceId, testTree, log)
handlers[peerId] = handler
}
for peerId, connectionMap := range deps.connectionMap {
handler := handlers[peerId]
manager := handler.manager()
for _, connectionId := range connectionMap {
manager.addHandler(connectionId, handlers[connectionId])
}
}
return &processFixture{
handlers: handlers,
log: log,
wg: &wg,
ctx: ctx,
cancel: cancel,
}
}
func (p *processFixture) runProcessFixture(t *testing.T) {
for _, handler := range p.handlers {
handler.run(p.ctx, t, p.wg)
}
}
func (p *processFixture) stop() {
p.cancel()
p.wg.Wait()
}
//func TestSyncProtocol(t *testing.T) { //func TestSyncProtocol(t *testing.T) {
// aclList, err := createAclList() // aclList, err := createAclList()
// require.NoError(t, err) // require.NoError(t, err)

View File

@ -7,7 +7,7 @@ import (
"sync" "sync"
) )
type inMemoryTreeStorage struct { type InMemoryTreeStorage struct {
id string id string
root *treechangeproto.RawTreeChangeWithId root *treechangeproto.RawTreeChangeWithId
heads []string heads []string
@ -16,7 +16,7 @@ type inMemoryTreeStorage struct {
sync.RWMutex sync.RWMutex
} }
func (t *inMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error { func (t *InMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
@ -37,46 +37,46 @@ func NewInMemoryTreeStorage(
} }
allChanges[root.Id] = root allChanges[root.Id] = root
return &inMemoryTreeStorage{ return &InMemoryTreeStorage{
id: root.Id, id: root.Id,
root: root, root: root,
heads: heads, heads: append([]string(nil), heads...),
changes: allChanges, changes: allChanges,
RWMutex: sync.RWMutex{}, RWMutex: sync.RWMutex{},
}, nil }, nil
} }
func (t *inMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, error) { func (t *InMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, error) {
_, exists := t.changes[id] _, exists := t.changes[id]
return exists, nil return exists, nil
} }
func (t *inMemoryTreeStorage) Id() string { func (t *InMemoryTreeStorage) Id() string {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
return t.id return t.id
} }
func (t *inMemoryTreeStorage) Root() (*treechangeproto.RawTreeChangeWithId, error) { func (t *InMemoryTreeStorage) Root() (*treechangeproto.RawTreeChangeWithId, error) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
return t.root, nil return t.root, nil
} }
func (t *inMemoryTreeStorage) Heads() ([]string, error) { func (t *InMemoryTreeStorage) Heads() ([]string, error) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
return t.heads, nil return t.heads, nil
} }
func (t *inMemoryTreeStorage) SetHeads(heads []string) error { func (t *InMemoryTreeStorage) SetHeads(heads []string) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
t.heads = append(t.heads[:0], heads...) t.heads = append(t.heads[:0], heads...)
return nil return nil
} }
func (t *inMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChangeWithId) error { func (t *InMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChangeWithId) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
// TODO: better to do deep copy // TODO: better to do deep copy
@ -84,7 +84,7 @@ func (t *inMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChange
return nil return nil
} }
func (t *inMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) { func (t *InMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
if res, exists := t.changes[changeId]; exists { if res, exists := t.changes[changeId]; exists {
@ -93,6 +93,15 @@ func (t *inMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string)
return nil, fmt.Errorf("could not get change with id: %s", changeId) return nil, fmt.Errorf("could not get change with id: %s", changeId)
} }
func (t *inMemoryTreeStorage) Delete() error { func (t *InMemoryTreeStorage) Delete() error {
return nil return nil
} }
func (t *InMemoryTreeStorage) Copy() *InMemoryTreeStorage {
var changes []*treechangeproto.RawTreeChangeWithId
for _, ch := range t.changes {
changes = append(changes, ch)
}
other, _ := NewInMemoryTreeStorage(t.root, t.heads, changes)
return other.(*InMemoryTreeStorage)
}