diff --git a/commonspace/object/tree/synctree/syncclient_test.go b/commonspace/object/tree/synctree/syncclient_test.go index d7617987..7a76b71f 100644 --- a/commonspace/object/tree/synctree/syncclient_test.go +++ b/commonspace/object/tree/synctree/syncclient_test.go @@ -12,13 +12,16 @@ import ( "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/cheggaaa/mb/v3" + "github.com/stretchr/testify/require" "sync" + "testing" ) type processMsg struct { msg *spacesyncproto.ObjectSyncMessage senderId string receiverId string + userMsg *objecttree.RawChangesPayload } type messageLog struct { @@ -48,11 +51,19 @@ func (p *processSyncHandler) manager() *processPeerManager { return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) } +func (p *processSyncHandler) tree() *broadcastTree { + return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree) +} + func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) { return p.batcher.Add(ctx, msg) } -func (p *processSyncHandler) run(ctx context.Context, wg *sync.WaitGroup) { +func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) { + p.batcher.Add(ctx, processMsg{userMsg: &changes}) +} + +func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() @@ -61,6 +72,14 @@ func (p *processSyncHandler) run(ctx context.Context, wg *sync.WaitGroup) { if err != nil { return } + if res.userMsg != nil { + p.tree().Lock() + userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg) + require.NoError(t, err) + fmt.Println("user add result", userRes.Heads) + p.tree().Unlock() + continue + } err = p.SyncHandler.HandleMessage(ctx, res.senderId, res.msg) if err != nil { fmt.Println("error handling message", err.Error()) @@ -155,14 +174,62 @@ func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (obje return objecttree.BuildTestableTree(aclList, storage) } +type fixtureDeps struct { + aclList list.AclList + initStorage *treestorage.InMemoryTreeStorage + connectionMap map[string][]string +} + type processFixture struct { - handlers []*processSyncHandler + handlers map[string]*processSyncHandler log *messageLog wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc } +func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture { + var ( + handlers = map[string]*processSyncHandler{} + log = newMessageLog() + wg = sync.WaitGroup{} + ctx, cancel = context.WithCancel(context.Background()) + ) + + for peerId := range deps.connectionMap { + stCopy := deps.initStorage.Copy() + testTree, err := createTestTree(deps.aclList, stCopy) + require.NoError(t, err) + handler := createSyncHandler(peerId, spaceId, testTree, log) + handlers[peerId] = handler + } + for peerId, connectionMap := range deps.connectionMap { + handler := handlers[peerId] + manager := handler.manager() + for _, connectionId := range connectionMap { + manager.addHandler(connectionId, handlers[connectionId]) + } + } + return &processFixture{ + handlers: handlers, + log: log, + wg: &wg, + ctx: ctx, + cancel: cancel, + } +} + +func (p *processFixture) runProcessFixture(t *testing.T) { + for _, handler := range p.handlers { + handler.run(p.ctx, t, p.wg) + } +} + +func (p *processFixture) stop() { + p.cancel() + p.wg.Wait() +} + //func TestSyncProtocol(t *testing.T) { // aclList, err := createAclList() // require.NoError(t, err) diff --git a/commonspace/object/tree/treestorage/inmemory.go b/commonspace/object/tree/treestorage/inmemory.go index c5c5a5cc..0f22037e 100644 --- a/commonspace/object/tree/treestorage/inmemory.go +++ b/commonspace/object/tree/treestorage/inmemory.go @@ -7,7 +7,7 @@ import ( "sync" ) -type inMemoryTreeStorage struct { +type InMemoryTreeStorage struct { id string root *treechangeproto.RawTreeChangeWithId heads []string @@ -16,7 +16,7 @@ type inMemoryTreeStorage struct { sync.RWMutex } -func (t *inMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error { +func (t *InMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error { t.RLock() defer t.RUnlock() @@ -37,46 +37,46 @@ func NewInMemoryTreeStorage( } allChanges[root.Id] = root - return &inMemoryTreeStorage{ + return &InMemoryTreeStorage{ id: root.Id, root: root, - heads: heads, + heads: append([]string(nil), heads...), changes: allChanges, RWMutex: sync.RWMutex{}, }, nil } -func (t *inMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, error) { +func (t *InMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, error) { _, exists := t.changes[id] return exists, nil } -func (t *inMemoryTreeStorage) Id() string { +func (t *InMemoryTreeStorage) Id() string { t.RLock() defer t.RUnlock() return t.id } -func (t *inMemoryTreeStorage) Root() (*treechangeproto.RawTreeChangeWithId, error) { +func (t *InMemoryTreeStorage) Root() (*treechangeproto.RawTreeChangeWithId, error) { t.RLock() defer t.RUnlock() return t.root, nil } -func (t *inMemoryTreeStorage) Heads() ([]string, error) { +func (t *InMemoryTreeStorage) Heads() ([]string, error) { t.RLock() defer t.RUnlock() return t.heads, nil } -func (t *inMemoryTreeStorage) SetHeads(heads []string) error { +func (t *InMemoryTreeStorage) SetHeads(heads []string) error { t.Lock() defer t.Unlock() t.heads = append(t.heads[:0], heads...) return nil } -func (t *inMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChangeWithId) error { +func (t *InMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChangeWithId) error { t.Lock() defer t.Unlock() // TODO: better to do deep copy @@ -84,7 +84,7 @@ func (t *inMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChange return nil } -func (t *inMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) { +func (t *InMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) { t.RLock() defer t.RUnlock() if res, exists := t.changes[changeId]; exists { @@ -93,6 +93,15 @@ func (t *inMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) return nil, fmt.Errorf("could not get change with id: %s", changeId) } -func (t *inMemoryTreeStorage) Delete() error { +func (t *InMemoryTreeStorage) Delete() error { return nil } + +func (t *InMemoryTreeStorage) Copy() *InMemoryTreeStorage { + var changes []*treechangeproto.RawTreeChangeWithId + for _, ch := range t.changes { + changes = append(changes, ch) + } + other, _ := NewInMemoryTreeStorage(t.root, t.heads, changes) + return other.(*InMemoryTreeStorage) +}