diff --git a/data/aclstate.go b/data/aclstate.go new file mode 100644 index 00000000..934e29c5 --- /dev/null +++ b/data/aclstate.go @@ -0,0 +1,351 @@ +package data + +import ( + "bytes" + "errors" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pb" + "github.com/textileio/go-threads/crypto/symmetric" + "hash/fnv" +) + +var ErrNoSuchUser = errors.New("no such user") +var ErrFailedToDecrypt = errors.New("failed to decrypt key") +var ErrUserRemoved = errors.New("user was removed from the document") +var ErrDocumentForbidden = errors.New("your user was forbidden access to the document") +var ErrUserAlreadyExists = errors.New("user already exists") + +type ACLState struct { + currentReadKeyHash uint64 + userReadKeys map[uint64]*symmetric.Key + userStates map[string]*pb.ACLChangeUserState + userInvites map[string]*pb.ACLChangeUserInvite + signingPubKeyDecoder threadmodels.SigningPubKeyDecoder + encryptionKey threadmodels.EncryptionPrivKey + identity string +} + +func NewACLStateFromSnapshot( + snapshot *pb.ACLChangeACLSnapshot, + identity string, + encryptionKey threadmodels.EncryptionPrivKey, + signingPubKeyDecoder threadmodels.SigningPubKeyDecoder) (*ACLState, error) { + st := &ACLState{ + identity: identity, + encryptionKey: encryptionKey, + userReadKeys: make(map[uint64]*symmetric.Key), + userStates: make(map[string]*pb.ACLChangeUserState), + userInvites: make(map[string]*pb.ACLChangeUserInvite), + signingPubKeyDecoder: signingPubKeyDecoder, + } + err := st.recreateFromSnapshot(snapshot) + if err != nil { + return nil, err + } + return st, nil +} + +func (st *ACLState) recreateFromSnapshot(snapshot *pb.ACLChangeACLSnapshot) error { + state := snapshot.AclState + for _, userState := range state.UserStates { + st.userStates[userState.Identity] = userState + } + + userState, exists := st.userStates[st.identity] + if !exists { + return ErrNoSuchUser + } + var lastKeyHash uint64 + for _, key := range userState.EncryptedReadKeys { + key, hash, err := st.decryptReadKeyAndHash(key) + if err != nil { + return ErrFailedToDecrypt + } + + st.userReadKeys[hash] = key + lastKeyHash = hash + } + st.currentReadKeyHash = lastKeyHash + if snapshot.GetAclState().GetInvites() != nil { + st.userInvites = snapshot.GetAclState().GetInvites() + } + return nil +} + +func (st *ACLState) ApplyChange(changeId string, change *pb.ACLChange) error { + // we can't check this for the user which is joining, because it will not be in our list + if !st.isUserJoin(change) { + // we check signature when we add this to the tree, so no need to do it here + if _, exists := st.userStates[change.Identity]; !exists { + return ErrNoSuchUser + } + + if !st.HasPermission(change.Identity, pb.ACLChange_Admin) { + return fmt.Errorf("user %s must have admin permissions", change.Identity) + } + } + + for _, ch := range change.GetAclData().GetAclContent() { + if err := st.applyChange(changeId, ch); err != nil { + log.Infof("error while applying changes: %v; ignore", err) + return err + } + } + + return nil +} + +// TODO: remove changeId, because it is not needed +func (st *ACLState) applyChange(changeId string, ch *pb.ACLChangeACLContentValue) error { + switch { + case ch.GetUserPermissionChange() != nil: + return st.applyUserPermissionChange(ch.GetUserPermissionChange()) + case ch.GetUserAdd() != nil: + return st.applyUserAdd(ch.GetUserAdd()) + case ch.GetUserRemove() != nil: + return st.applyUserRemove(ch.GetUserRemove()) + case ch.GetUserInvite() != nil: + return st.applyUserInvite(changeId, ch.GetUserInvite()) + case ch.GetUserJoin() != nil: + return st.applyUserJoin(ch.GetUserJoin()) + case ch.GetUserConfirm() != nil: + return st.applyUserConfirm(ch.GetUserConfirm()) + default: + return fmt.Errorf("unexpected change type: %v", ch) + } +} + +func (st *ACLState) applyUserPermissionChange(ch *pb.ACLChangeUserPermissionChange) error { + if _, exists := st.userStates[ch.Identity]; !exists { + return ErrNoSuchUser + } + + st.userStates[ch.Identity].Permissions = ch.Permissions + return nil +} + +func (st *ACLState) applyUserInvite(changeId string, ch *pb.ACLChangeUserInvite) error { + st.userInvites[changeId] = ch + return nil +} + +func (st *ACLState) applyUserJoin(ch *pb.ACLChangeUserJoin) error { + invite, exists := st.userInvites[ch.UserInviteChangeId] + if !exists { + return fmt.Errorf("no such invite with id %s", ch.UserInviteChangeId) + } + + if _, exists = st.userStates[ch.Identity]; exists { + return ErrUserAlreadyExists + } + + // validating signature + signature := ch.GetAcceptSignature() + verificationKey, err := st.signingPubKeyDecoder.DecodeFromBytes(invite.AcceptPublicKey) + if err != nil { + return fmt.Errorf("public key verifying invite accepts is given in incorrect format: %v", err) + } + + rawSignedId, err := st.signingPubKeyDecoder.DecodeFromStringIntoBytes(ch.Identity) + if err != nil { + return fmt.Errorf("failed to decode signing identity as bytes") + } + + res, err := verificationKey.Verify(rawSignedId, signature) + if err != nil { + return fmt.Errorf("verification returned error: %w", err) + } + if !res { + return fmt.Errorf("signature is invalid") + } + + // if ourselves -> we need to decrypt the read keys + if st.identity == ch.Identity { + var lastKeyHash uint64 + for _, key := range ch.EncryptedReadKeys { + key, hash, err := st.decryptReadKeyAndHash(key) + if err != nil { + return ErrFailedToDecrypt + } + + st.userReadKeys[hash] = key + lastKeyHash = hash + } + st.currentReadKeyHash = lastKeyHash + } + + // adding user to the list + userState := &pb.ACLChangeUserState{ + Identity: ch.Identity, + EncryptionKey: ch.EncryptionKey, + EncryptedReadKeys: ch.EncryptedReadKeys, + Permissions: invite.Permissions, + IsConfirmed: true, + } + st.userStates[ch.Identity] = userState + return nil +} + +func (st *ACLState) applyUserAdd(ch *pb.ACLChangeUserAdd) error { + if _, exists := st.userStates[ch.Identity]; exists { + return ErrUserAlreadyExists + } + + st.userStates[ch.Identity] = &pb.ACLChangeUserState{ + Identity: ch.Identity, + EncryptionKey: ch.EncryptionKey, + Permissions: ch.Permissions, + EncryptedReadKeys: ch.EncryptedReadKeys, + } + + return nil +} + +func (st *ACLState) applyUserRemove(ch *pb.ACLChangeUserRemove) error { + if ch.Identity == st.identity { + return ErrDocumentForbidden + } + + if _, exists := st.userStates[ch.Identity]; !exists { + return ErrNoSuchUser + } + + delete(st.userStates, ch.Identity) + + for _, replace := range ch.ReadKeyReplaces { + userState, exists := st.userStates[replace.Identity] + if !exists { + continue + } + + userState.EncryptedReadKeys = append(userState.EncryptedReadKeys, replace.EncryptedReadKey) + // if this is our identity then we have to decrypt the key + if replace.Identity == st.identity { + key, hash, err := st.decryptReadKeyAndHash(replace.EncryptedReadKey) + if err != nil { + return ErrFailedToDecrypt + } + + st.currentReadKeyHash = hash + st.userReadKeys[st.currentReadKeyHash] = key + } + } + return nil +} + +func (st *ACLState) applyUserConfirm(ch *pb.ACLChangeUserConfirm) error { + if _, exists := st.userStates[ch.Identity]; !exists { + return ErrNoSuchUser + } + + userState := st.userStates[ch.Identity] + userState.IsConfirmed = true + return nil +} + +func (st *ACLState) decryptReadKeyAndHash(msg []byte) (*symmetric.Key, uint64, error) { + decrypted, err := st.encryptionKey.Decrypt(msg) + if err != nil { + return nil, 0, ErrFailedToDecrypt + } + + key, err := symmetric.FromBytes(decrypted) + if err != nil { + return nil, 0, ErrFailedToDecrypt + } + + hasher := fnv.New64() + hasher.Write(decrypted) + return key, hasher.Sum64(), nil +} + +func (st *ACLState) HasPermission(identity string, permission pb.ACLChangeUserPermissions) bool { + state, exists := st.userStates[identity] + if !exists { + return false + } + + return state.Permissions == permission +} + +func (st *ACLState) isUserJoin(ch *pb.ACLChange) bool { + // if we have a UserJoin, then it should always be the first one applied + return ch.AclData.GetAclContent() != nil && ch.AclData.GetAclContent()[0].GetUserJoin() != nil +} + +func (st *ACLState) GetPermissionDecreasedUsers(ch *pb.ACLChange) (identities []*pb.ACLChangeUserPermissionChange) { + // this should be called after general checks are completed + if ch.GetAclData().GetAclContent() == nil { + return nil + } + + contents := ch.GetAclData().GetAclContent() + for _, c := range contents { + if c.GetUserPermissionChange() != nil { + content := c.GetUserPermissionChange() + + currentState := st.userStates[content.Identity] + // the comparison works in different direction :-) + if content.Permissions > currentState.Permissions { + identities = append(identities, &pb.ACLChangeUserPermissionChange{ + Identity: content.Identity, + Permissions: content.Permissions, + }) + } + } + if c.GetUserRemove() != nil { + content := c.GetUserRemove() + identities = append(identities, &pb.ACLChangeUserPermissionChange{ + Identity: content.Identity, + Permissions: pb.ACLChange_Removed, + }) + } + } + + return identities +} + +func (st *ACLState) Equal(other *ACLState) bool { + if st == nil && other == nil { + return true + } + + if st == nil || other == nil { + return false + } + + if st.currentReadKeyHash != other.currentReadKeyHash { + return false + } + + if st.identity != other.identity { + return false + } + + if len(st.userStates) != len(other.userStates) { + return false + } + + for _, st := range st.userStates { + otherSt, exists := other.userStates[st.Identity] + if !exists { + return false + } + + if st.Permissions != otherSt.Permissions { + return false + } + + if bytes.Compare(st.EncryptionKey, otherSt.EncryptionKey) != 0 { + return false + } + } + + if len(st.userInvites) != len(other.userInvites) { + return false + } + + // TODO: add detailed user invites comparison + compare other stuff + return true +} diff --git a/data/aclstatebuilder.go b/data/aclstatebuilder.go new file mode 100644 index 00000000..ee1e1d9f --- /dev/null +++ b/data/aclstatebuilder.go @@ -0,0 +1,181 @@ +package data + +import ( + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pb" +) + +type ACLStateBuilder struct { + tree *Tree + aclState *ACLState + identity string + key threadmodels.EncryptionPrivKey +} + +type decreasedPermissionsParameters struct { + users []*pb.ACLChangeUserPermissionChange + startChange string +} + +func NewACLStateBuilder( + tree *Tree, + identity string, + key threadmodels.EncryptionPrivKey, + decoder threadmodels.SigningPubKeyDecoder) (*ACLStateBuilder, error) { + root := tree.Root() + if !root.IsSnapshot { + return nil, fmt.Errorf("root should always be a snapshot") + } + + snapshot := root.Content.GetAclData().GetAclSnapshot() + state, err := NewACLStateFromSnapshot(snapshot, identity, key, decoder) + if err != nil { + return nil, fmt.Errorf("could not build aclState from snapshot: %w", err) + } + + return &ACLStateBuilder{ + tree: tree, + aclState: state, + identity: identity, + key: key, + }, nil +} + +func (sb *ACLStateBuilder) Build() (*ACLState, error) { + state, _, err := sb.BuildBefore("") + return state, err +} + +// TODO: we can probably have only one state builder, because we can build both at the same time +func (sb *ACLStateBuilder) BuildBefore(beforeId string) (*ACLState, bool, error) { + var ( + err error + startChange = sb.tree.root + foundId bool + idSeenMap = make(map[string][]*Change) + decreasedPermissions *decreasedPermissionsParameters + ) + idSeenMap[startChange.Content.Identity] = append(idSeenMap[startChange.Content.Identity], startChange) + + if startChange.Content.GetChangesData() != nil { + key, exists := sb.aclState.userReadKeys[startChange.Content.CurrentReadKeyHash] + if !exists { + return nil, false, fmt.Errorf("no first snapshot") + } + + err = startChange.DecryptContents(key) + if err != nil { + return nil, false, fmt.Errorf("failed to decrypt contents of first snapshot") + } + } + + if beforeId == startChange.Id { + return sb.aclState, true, nil + } + + for { + // TODO: we should optimize this method to just remember last state of iterator and not iterate from the start and skip if nothing was removed from the tree + sb.tree.iterateSkip(sb.tree.root, startChange, func(c *Change) (isContinue bool) { + defer func() { + if err == nil { + startChange = c + } else if err != ErrDocumentForbidden { + log.Errorf("marking change %s as invalid: %v", c.Id, err) + sb.tree.RemoveInvalidChange(c.Id) + } + }() + + // not applying root change + if c.Id == startChange.Id { + return true + } + + idSeenMap[c.Content.Identity] = append(idSeenMap[c.Content.Identity], c) + if c.Content.GetAclData() != nil { + err = sb.aclState.ApplyChange(c.Id, c.Content) + if err != nil { + return false + } + + // if we have some users who have less permissions now + users := sb.aclState.GetPermissionDecreasedUsers(c.Content) + if len(users) > 0 { + decreasedPermissions = &decreasedPermissionsParameters{ + users: users, + startChange: c.Id, + } + return false + } + } + + // the user can't make changes + if !sb.aclState.HasPermission(c.Content.Identity, pb.ACLChange_Writer) && !sb.aclState.HasPermission(c.Content.Identity, pb.ACLChange_Admin) { + err = fmt.Errorf("user %s cannot make changes", c.Content.Identity) + return false + } + + // decrypting contents on the fly + if c.Content.GetChangesData() != nil { + key, exists := sb.aclState.userReadKeys[c.Content.CurrentReadKeyHash] + if !exists { + err = fmt.Errorf("failed to find key with hash: %d", c.Content.CurrentReadKeyHash) + return false + } + + err = c.DecryptContents(key) + if err != nil { + err = fmt.Errorf("failed to decrypt contents for hash: %d", c.Content.CurrentReadKeyHash) + return false + } + } + + if c.Id == beforeId { + foundId = true + return false + } + + return true + }) + + // if we have users with decreased permissions + if decreasedPermissions != nil { + var removed bool + validChanges := sb.tree.dfs(decreasedPermissions.startChange) + + for _, permChange := range decreasedPermissions.users { + seenChanges := idSeenMap[permChange.Identity] + + for _, seen := range seenChanges { + // if we find some invalid changes + if _, exists := validChanges[seen.Id]; !exists { + // if the user didn't have enough permission to make changes + if seen.IsACLChange() || permChange.Permissions > pb.ACLChange_Writer { + removed = true + sb.tree.RemoveInvalidChange(seen.Id) + } + } + } + } + + decreasedPermissions = nil + if removed { + // starting from the beginning but with updated tree + return sb.BuildBefore(beforeId) + } + } else if err == nil { + // we can finish the acl state building process + break + } + + // the user is forbidden to access the document + if err == ErrDocumentForbidden { + return nil, foundId, err + } + + // otherwise we have to continue from the change which we had + err = nil + } + + return sb.aclState, foundId, err +} diff --git a/data/aclstatebuilder_test.go b/data/aclstatebuilder_test.go new file mode 100644 index 00000000..c8f68756 --- /dev/null +++ b/data/aclstatebuilder_test.go @@ -0,0 +1,164 @@ +package data + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadbuilder" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pb" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestACLStateBuilder_UserJoinBuild(t *testing.T) { + thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userjoinexample.yml") + if err != nil { + t.Fatal(err) + } + keychain := thread.GetKeychain() + ctx, err := createACLStateFromThread( + thread, + keychain.GetIdentity("A"), + keychain.EncryptionKeys["A"], + threadmodels.NewEd25519Decoder(), + false) + if err != nil { + t.Fatalf("should build acl aclState without err: %v", err) + } + aclState := ctx.ACLState + //fmt.Println(ctx.Tree.Graph()) + aId := keychain.GeneratedIdentities["A"] + bId := keychain.GeneratedIdentities["B"] + cId := keychain.GeneratedIdentities["C"] + + assert.Equal(t, aclState.identity, aId) + assert.Equal(t, aclState.userStates[aId].Permissions, pb.ACLChange_Admin) + assert.Equal(t, aclState.userStates[bId].Permissions, pb.ACLChange_Writer) + assert.Equal(t, aclState.userStates[cId].Permissions, pb.ACLChange_Reader) + + var changeIds []string + ctx.Tree.iterate(ctx.Tree.root, func(c *Change) (isContinue bool) { + changeIds = append(changeIds, c.Id) + return true + }) + assert.Equal(t, changeIds, []string{"A.1.1", "A.1.2", "B.1.1", "B.1.2"}) +} + +func TestACLStateBuilder_UserRemoveBuild(t *testing.T) { + thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userremoveexample.yml") + if err != nil { + t.Fatal(err) + } + keychain := thread.GetKeychain() + ctx, err := createACLStateFromThread( + thread, + keychain.GetIdentity("A"), + keychain.EncryptionKeys["A"], + threadmodels.NewEd25519Decoder(), + false) + if err != nil { + t.Fatalf("should build acl aclState without err: %v", err) + } + aclState := ctx.ACLState + //fmt.Println(ctx.Tree.Graph()) + aId := keychain.GeneratedIdentities["A"] + + assert.Equal(t, aclState.identity, aId) + assert.Equal(t, aclState.userStates[aId].Permissions, pb.ACLChange_Admin) + + var changeIds []string + ctx.Tree.iterate(ctx.Tree.root, func(c *Change) (isContinue bool) { + changeIds = append(changeIds, c.Id) + return true + }) + assert.Equal(t, changeIds, []string{"A.1.1", "A.1.2", "B.1.1", "A.1.3", "A.1.4"}) +} + +func TestACLStateBuilder_UserRemoveBeforeBuild(t *testing.T) { + thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userremovebeforeexample.yml") + if err != nil { + t.Fatal(err) + } + keychain := thread.GetKeychain() + ctx, err := createACLStateFromThread( + thread, + keychain.GetIdentity("A"), + keychain.EncryptionKeys["A"], + threadmodels.NewEd25519Decoder(), + false) + if err != nil { + t.Fatalf("should build acl aclState without err: %v", err) + } + aclState := ctx.ACLState + //fmt.Println(ctx.Tree.Graph()) + for _, s := range []string{"A", "C", "E"} { + assert.Equal(t, aclState.userStates[keychain.GetIdentity(s)].Permissions, pb.ACLChange_Admin) + } + assert.Equal(t, aclState.identity, keychain.GetIdentity("A")) + assert.Nil(t, aclState.userStates[keychain.GetIdentity("B")]) + + var changeIds []string + ctx.Tree.iterate(ctx.Tree.root, func(c *Change) (isContinue bool) { + changeIds = append(changeIds, c.Id) + return true + }) + assert.Equal(t, changeIds, []string{"A.1.1", "B.1.1", "A.1.2", "A.1.3"}) +} + +func TestACLStateBuilder_InvalidSnapshotBuild(t *testing.T) { + thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/invalidsnapshotexample.yml") + if err != nil { + t.Fatal(err) + } + keychain := thread.GetKeychain() + ctx, err := createACLStateFromThread( + thread, + keychain.GetIdentity("A"), + keychain.EncryptionKeys["A"], + threadmodels.NewEd25519Decoder(), + false) + if err != nil { + t.Fatalf("should build acl aclState without err: %v", err) + } + aclState := ctx.ACLState + //fmt.Println(ctx.Tree.Graph()) + for _, s := range []string{"A", "B", "C", "D", "E", "F"} { + assert.Equal(t, aclState.userStates[keychain.GetIdentity(s)].Permissions, pb.ACLChange_Admin) + } + assert.Equal(t, aclState.identity, keychain.GetIdentity("A")) + + var changeIds []string + ctx.Tree.iterate(ctx.Tree.root, func(c *Change) (isContinue bool) { + changeIds = append(changeIds, c.Id) + return true + }) + assert.Equal(t, []string{"A.1.1", "B.1.1", "A.1.2", "A.1.3", "B.1.2"}, changeIds) +} + +func TestACLStateBuilder_ValidSnapshotBuild(t *testing.T) { + thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/validsnapshotexample.yml") + if err != nil { + t.Fatal(err) + } + keychain := thread.GetKeychain() + ctx, err := createACLStateFromThread( + thread, + keychain.GetIdentity("A"), + keychain.EncryptionKeys["A"], + threadmodels.NewEd25519Decoder(), + false) + if err != nil { + t.Fatalf("should build acl aclState without err: %v", err) + } + aclState := ctx.ACLState + //fmt.Println(ctx.Tree.Graph()) + for _, s := range []string{"A", "B", "C", "D", "E", "F"} { + assert.Equal(t, aclState.userStates[keychain.GetIdentity(s)].Permissions, pb.ACLChange_Admin) + } + assert.Equal(t, aclState.identity, keychain.GetIdentity("A")) + + var changeIds []string + ctx.Tree.iterate(ctx.Tree.root, func(c *Change) (isContinue bool) { + changeIds = append(changeIds, c.Id) + return true + }) + assert.Equal(t, []string{"A.1.2", "A.1.3", "B.1.2"}, changeIds) +} diff --git a/data/acltreebuilder.go b/data/acltreebuilder.go new file mode 100644 index 00000000..1583eed7 --- /dev/null +++ b/data/acltreebuilder.go @@ -0,0 +1,237 @@ +package data + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" + "github.com/gogo/protobuf/proto" + "github.com/textileio/go-threads/core/thread" + "sort" + "time" +) + +type ACLTreeBuilder struct { + cache map[string]*Change + logHeads map[string]*Change + identityKeys map[string]threadmodels.SigningPubKey + signingPubKeyDecoder threadmodels.SigningPubKeyDecoder + tree *Tree + thread threadmodels.Thread +} + +func NewACLTreeBuilder(t threadmodels.Thread, decoder threadmodels.SigningPubKeyDecoder) *ACLTreeBuilder { + return &ACLTreeBuilder{ + cache: make(map[string]*Change), + logHeads: make(map[string]*Change), + identityKeys: make(map[string]threadmodels.SigningPubKey), + signingPubKeyDecoder: decoder, + tree: &Tree{}, // TODO: add NewTree method + thread: t, + } +} + +func (tb *ACLTreeBuilder) loadChange(id string) (ch *Change, err error) { + if ch, ok := tb.cache[id]; ok { + return ch, nil + } + + // TODO: Add virtual changes logic + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + record, err := tb.thread.GetRecord(ctx, id) + if err != nil { + return nil, err + } + + aclChange := new(pb.ACLChange) + + // TODO: think what should we do with such cases, because this can be used by attacker to break our tree + if err = proto.Unmarshal(record.Signed.Payload, aclChange); err != nil { + return + } + var verified bool + verified, err = tb.verify(aclChange.Identity, record.Signed.Payload, record.Signed.Signature) + if err != nil { + return + } + if !verified { + err = fmt.Errorf("the signature of the payload cannot be verified") + return + } + + ch, err = NewACLChange(id, aclChange) + tb.cache[id] = ch + + return ch, nil +} + +func (tb *ACLTreeBuilder) verify(identity string, payload, signature []byte) (isVerified bool, err error) { + identityKey, exists := tb.identityKeys[identity] + if !exists { + identityKey, err = tb.signingPubKeyDecoder.DecodeFromString(identity) + if err != nil { + return + } + tb.identityKeys[identity] = identityKey + } + return identityKey.Verify(payload, signature) +} + +func (tb *ACLTreeBuilder) getLogs() (logs []threadmodels.ThreadLog, err error) { + // TODO: Add beforeId building logic + logs, err = tb.thread.GetLogs() + if err != nil { + return nil, fmt.Errorf("GetLogs error: %w", err) + } + + log.Debugf("build tree: logs: %v", logs) + if len(logs) == 0 || len(logs) == 1 && len(logs[0].Head) <= 1 { + return nil, ErrEmpty + } + var nonEmptyLogs = logs[:0] + for _, l := range logs { + if len(l.Head) == 0 { + continue + } + if ch, err := tb.loadChange(l.Head); err != nil { + log.Errorf("loading head %s of the log %s failed: %v", l.Head, l.ID, err) + } else { + tb.logHeads[l.ID] = ch + } + nonEmptyLogs = append(nonEmptyLogs, l) + } + return nonEmptyLogs, nil +} + +func (tb *ACLTreeBuilder) Build() (*Tree, error) { + logs, err := tb.getLogs() + if err != nil { + return nil, err + } + + heads, err := tb.getACLHeads(logs) + if err != nil { + return nil, fmt.Errorf("get acl heads error: %v", err) + } + + if err = tb.buildTreeFromStart(heads); err != nil { + return nil, fmt.Errorf("buildTree error: %v", err) + } + tb.cache = nil + + return tb.tree, nil +} + +func (tb *ACLTreeBuilder) buildTreeFromStart(heads []string) (err error) { + changes, possibleRoots, err := tb.dfsFromStart(heads) + if len(possibleRoots) == 0 { + return fmt.Errorf("cannot have tree without root") + } + root, err := tb.getRoot(possibleRoots) + if err != nil { + return err + } + + tb.tree.AddFast(root) + tb.tree.AddFast(changes...) + return +} + +func (tb *ACLTreeBuilder) dfsFromStart(stack []string) (buf []*Change, possibleRoots []*Change, err error) { + buf = make([]*Change, 0, len(stack)*2) + uniqMap := make(map[string]struct{}) + + for len(stack) > 0 { + id := stack[len(stack)-1] + stack = stack[:len(stack)-1] + if _, exists := uniqMap[id]; exists { + continue + } + + ch, err := tb.loadChange(id) + if err != nil { + continue + } + + uniqMap[id] = struct{}{} + buf = append(buf, ch) + + for _, prev := range ch.PreviousIds { + stack = append(stack, prev) + } + if len(ch.PreviousIds) == 0 { + possibleRoots = append(possibleRoots, ch) + } + } + return buf, possibleRoots, nil +} + +func (tb *ACLTreeBuilder) getPrecedingACLHeads(head string) ([]string, error) { + headChange, err := tb.loadChange(head) + if err != nil { + return nil, err + } + + if headChange.Content.GetAclData() != nil { + return []string{head}, nil + } else { + return headChange.PreviousIds, nil + } +} + +func (tb *ACLTreeBuilder) getACLHeads(logs []threadmodels.ThreadLog) (aclTreeHeads []string, err error) { + sort.Slice(logs, func(i, j int) bool { + return logs[i].ID < logs[j].ID + }) + + // get acl tree heads from log heads + for _, l := range logs { + if slice.FindPos(aclTreeHeads, l.Head) != -1 { // do not scan known heads + continue + } + precedingHeads, err := tb.getPrecedingACLHeads(l.Head) + if err != nil { + return nil, err + } + + for _, head := range precedingHeads { + if slice.FindPos(aclTreeHeads, l.Head) != -1 { + continue + } + aclTreeHeads = append(aclTreeHeads, head) + } + } + + if len(aclTreeHeads) == 0 { + return nil, fmt.Errorf("no usable ACL heads in thread") + } + return aclTreeHeads, nil +} + +func (tb *ACLTreeBuilder) getRoot(possibleRoots []*Change) (*Change, error) { + threadId, err := thread.Decode(tb.thread.ID()) + if err != nil { + return nil, err + } + + for _, r := range possibleRoots { + id := r.Content.Identity + sk, err := tb.signingPubKeyDecoder.DecodeFromString(id) + if err != nil { + continue + } + + res, err := threadmodels.VerifyACLThreadID(sk, threadId) + if err != nil { + continue + } + + if res { + return r, nil + } + } + return nil, fmt.Errorf("could not find any root") +} diff --git a/data/change.go b/data/change.go new file mode 100644 index 00000000..1d33e57b --- /dev/null +++ b/data/change.go @@ -0,0 +1,70 @@ +package data + +import ( + "fmt" + + "github.com/anytypeio/go-anytype-infrastructure-experiments/pb" + "github.com/gogo/protobuf/proto" + "github.com/textileio/go-threads/crypto/symmetric" +) + +// Change is an abstract type for all types of changes +type Change struct { + Next []*Change + Unattached []*Change + PreviousIds []string + Id string + SnapshotId string + LogHeads map[string]string + IsSnapshot bool + DecryptedDocumentChange *pb.ACLChangeChangeData + + Content *pb.ACLChange +} + +func (ch *Change) DecryptContents(key *symmetric.Key) error { + if ch.Content.ChangesData == nil { + return nil + } + + var changesData pb.ACLChangeChangeData + decrypted, err := key.Decrypt(ch.Content.ChangesData) + if err != nil { + return fmt.Errorf("failed to decrypt changes data: %w", err) + } + + err = proto.Unmarshal(decrypted, &changesData) + if err != nil { + return fmt.Errorf("failed to umarshall into ChangesData: %w", err) + } + ch.DecryptedDocumentChange = &changesData + return nil +} + +func (ch *Change) IsACLChange() bool { + return ch.Content.GetAclData() != nil +} + +func NewChange(id string, ch *pb.ACLChange) (*Change, error) { + return &Change{ + Next: nil, + PreviousIds: ch.TreeHeadIds, + Id: id, + Content: ch, + SnapshotId: ch.SnapshotBaseId, + IsSnapshot: ch.GetAclData().GetAclSnapshot() != nil, + LogHeads: ch.GetLogHeads(), + }, nil +} + +func NewACLChange(id string, ch *pb.ACLChange) (*Change, error) { + return &Change{ + Next: nil, + PreviousIds: ch.AclHeadIds, + Id: id, + Content: ch, + SnapshotId: ch.SnapshotBaseId, + IsSnapshot: ch.GetAclData().GetAclSnapshot() != nil, + LogHeads: ch.GetLogHeads(), + }, nil +} diff --git a/data/documentstatebuilder.go b/data/documentstatebuilder.go new file mode 100644 index 00000000..ed6d1852 --- /dev/null +++ b/data/documentstatebuilder.go @@ -0,0 +1,72 @@ +package data + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/core/block/editor/state" + "time" +) + +type documentStateBuilder struct { + tree *Tree + aclState *ACLState // TODO: decide if this is needed or not +} + +func newDocumentStateBuilder(tree *Tree, state *ACLState) *documentStateBuilder { + return &documentStateBuilder{ + tree: tree, + aclState: state, + } +} + +// TODO: we should probably merge the two builders into one +func (d *documentStateBuilder) build() (s *state.State, err error) { + var ( + startId string + applyRoot bool + st = time.Now() + lastChange *Change + count int + ) + rootChange := d.tree.Root() + root := state.NewDocFromSnapshot("root", rootChange.DecryptedDocumentChange.Snapshot).(*state.State) + root.SetChangeId(rootChange.Id) + + t := d.tree + if startId = root.ChangeId(); startId == "" { + startId = t.RootId() + applyRoot = true + } + + t.Iterate(startId, func(c *Change) (isContinue bool) { + count++ + lastChange = c + if startId == c.Id { + s = root.NewState() + if applyRoot && c.DecryptedDocumentChange != nil { + s.ApplyChangeIgnoreErr(c.DecryptedDocumentChange.Content...) + s.SetChangeId(c.Id) + s.AddFileKeys(c.DecryptedDocumentChange.FileKeys...) + } + return true + } + if c.DecryptedDocumentChange != nil { + ns := s.NewState() + ns.ApplyChangeIgnoreErr(c.DecryptedDocumentChange.Content...) + ns.SetChangeId(c.Id) + ns.AddFileKeys(c.DecryptedDocumentChange.FileKeys...) + _, _, err = state.ApplyStateFastOne(ns) + if err != nil { + return false + } + } + return true + }) + if err != nil { + return nil, err + } + if lastChange != nil { + s.SetLastModified(lastChange.Content.Timestamp, lastChange.Content.Identity) + } + + log.Infof("build state (crdt): changes: %d; dur: %v;", count, time.Since(st)) + return s, err +} diff --git a/data/documentstatebuilder_test.go b/data/documentstatebuilder_test.go new file mode 100644 index 00000000..e84b3805 --- /dev/null +++ b/data/documentstatebuilder_test.go @@ -0,0 +1,62 @@ +package data + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadbuilder" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestDocumentStateBuilder_UserJoinBuild(t *testing.T) { + thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userjoinexample.yml") + if err != nil { + t.Fatal(err) + } + keychain := thread.GetKeychain() + ctx, err := createDocumentStateFromThread( + thread, + keychain.GetIdentity("A"), + keychain.EncryptionKeys["A"], + threadmodels.NewEd25519Decoder()) + if err != nil { + t.Fatalf("should build acl aclState without err: %v", err) + } + + st := ctx.DocState + allIds := make(map[string]bool) + for _, b := range st.Blocks() { + allIds[b.Id] = true + } + if err != nil { + t.Fatalf("iterate should not return error: %v", err) + } + assert.True(t, allIds["root"]) + assert.True(t, allIds["first"]) +} + +func TestDocumentStateBuilder_UserRemoveBuild(t *testing.T) { + thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userremoveexample.yml") + if err != nil { + t.Fatal(err) + } + keychain := thread.GetKeychain() + ctx, err := createDocumentStateFromThread( + thread, + keychain.GetIdentity("A"), + keychain.EncryptionKeys["A"], + threadmodels.NewEd25519Decoder()) + if err != nil { + t.Fatalf("should build acl aclState without err: %v", err) + } + + st := ctx.DocState + allIds := make(map[string]bool) + for _, b := range st.Blocks() { + allIds[b.Id] = true + } + if err != nil { + t.Fatalf("iterate should not return error: %v", err) + } + assert.True(t, allIds["root"]) + assert.True(t, allIds["second"]) +} diff --git a/data/pb/protos/aclchanges.proto b/data/pb/protos/aclchanges.proto new file mode 100644 index 00000000..d0c6d955 --- /dev/null +++ b/data/pb/protos/aclchanges.proto @@ -0,0 +1,113 @@ +syntax = "proto3"; +package anytype; +option go_package = "pb"; + +import "pb/protos/changes.proto"; + +// the element of change tree used to store and internal apply smartBlock history +message ACLChange { + repeated string treeHeadIds = 1; + repeated string aclHeadIds = 2; + string snapshotBaseId = 3; // we will only have one base snapshot for both + ACLData aclData = 4; + // the data is encoded with read key and should be read in ChangesData format + bytes changesData = 5; + uint64 currentReadKeyHash = 6; + int64 timestamp = 7; + string identity = 8; + map logHeads = 9; + + message ACLContentValue { + oneof value { + UserAdd userAdd = 1; + UserRemove userRemove = 2; + UserPermissionChange userPermissionChange = 3; + UserInvite userInvite = 4; + UserJoin userJoin = 5; + UserConfirm userConfirm = 6; + } + } + + message ChangeData { + anytype.Change.Snapshot snapshot = 1; + repeated anytype.Change.Content content = 2; + repeated anytype.Change.FileKeys fileKeys = 3; + } + + message ACLData { + ACLSnapshot aclSnapshot = 1; + repeated ACLContentValue aclContent = 2; + } + + message ACLSnapshot { + // We don't need ACLState as a separate message now, because we simplified the snapshot model + ACLState aclState = 1; + } + + message ACLState { + repeated uint64 readKeyHashes = 1; + repeated UserState userStates = 2; + map invites = 3; // TODO: later + // repeated string unconfirmedUsers = 4; // TODO: later + } + + message UserState { + string identity = 1; + bytes encryptionKey = 2; + repeated bytes encryptedReadKeys = 3; // all read keys that we know + UserPermissions permissions = 4; + bool IsConfirmed = 5; + } + + // we already know identity and encryptionKey + message UserAdd { + string identity = 1; // public signing key + bytes encryptionKey = 2; // public encryption key + repeated bytes encryptedReadKeys = 3; // all read keys that we know for the user + UserPermissions permissions = 4; + } + + // TODO: this is not used as of now + message UserConfirm { // not needed for read permissions + string identity = 1; // not needed + string userAddId = 2; + } + + message UserInvite { + bytes acceptPublicKey = 1; + bytes encryptPublicKey = 2; + repeated bytes encryptedReadKeys = 3; // all read keys that we know for the user + UserPermissions permissions = 4; + } + + message UserJoin { + string identity = 1; + bytes encryptionKey = 2; + bytes acceptSignature = 3; // sign acceptPublicKey + string userInviteChangeId = 4; + repeated bytes encryptedReadKeys = 5; // the idea is that user should itself reencrypt the keys with the pub key + } + + message UserRemove { + string identity = 1; + repeated ReadKeyReplace readKeyReplaces = 3; // new read key encrypted for all users + } + + message ReadKeyReplace { + string identity = 1; + bytes encryptionKey = 2; + bytes encryptedReadKey = 3; + } + + message UserPermissionChange { + string identity = 1; + UserPermissions permissions = 2; + } + + enum UserPermissions { + Admin = 0; + Writer = 1; + Reader = 2; + Removed = 3; + } +} diff --git a/data/snapshotvalidator.go b/data/snapshotvalidator.go new file mode 100644 index 00000000..29a7ee43 --- /dev/null +++ b/data/snapshotvalidator.go @@ -0,0 +1,49 @@ +package data + +import ( + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" +) + +type SnapshotValidator struct { + aclTree *Tree + identity string + key threadmodels.EncryptionPrivKey + decoder threadmodels.SigningPubKeyDecoder +} + +func NewSnapshotValidator( + aclTree *Tree, + identity string, + key threadmodels.EncryptionPrivKey, + decoder threadmodels.SigningPubKeyDecoder) *SnapshotValidator { + return &SnapshotValidator{ + aclTree: aclTree, + identity: identity, + key: key, + decoder: decoder, + } +} + +func (s *SnapshotValidator) ValidateSnapshot(ch *Change) (bool, error) { + stateBuilder, err := NewACLStateBuilder(s.aclTree, s.identity, s.key, s.decoder) + if err != nil { + return false, err + } + + st, found, err := stateBuilder.BuildBefore(ch.Id) + if err != nil { + return false, err + } + + if !found { + return false, fmt.Errorf("didn't find snapshot in ACL tree") + } + + otherSt, err := NewACLStateFromSnapshot(ch.Content.GetAclData().GetAclSnapshot(), s.identity, s.key, s.decoder) + if err != nil { + return false, err + } + + return st.Equal(otherSt), nil +} diff --git a/data/threadbuilder/invalidsnapshotexample.yml b/data/threadbuilder/invalidsnapshotexample.yml new file mode 100644 index 00000000..366a9ced --- /dev/null +++ b/data/threadbuilder/invalidsnapshotexample.yml @@ -0,0 +1,122 @@ +thread: + author: A +logs: + - id: A.1 + identity: A + records: + - id: A.1.1 + aclSnapshot: + userStates: + - identity: A + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + permission: admin + - identity: B + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + permission: admin + snapshot: + blocks: + - id: root + aclChanges: + - userAdd: + identity: A + permission: admin + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + - userAdd: + identity: B + permission: admin + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: A.1.2 + aclSnapshot: + userStates: + - identity: A + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + permission: admin + - identity: B + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + permission: admin + - identity: D + encryptionKey: key.Enc.D + encryptedReadKeys: [ key.Read.1 ] + permission: admin + snapshot: + blocks: + - id: root + aclChanges: + - userAdd: + identity: D + permission: admin + encryptionKey: key.Enc.D + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: A.1.3 + aclChanges: + - userAdd: + identity: E + permission: admin + encryptionKey: key.Enc.E + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: B.1 + identity: B + records: + - id: B.1.1 + aclChanges: + - userAdd: + identity: C + permission: admin + encryptionKey: key.Enc.C + encryptedReadKeys: [ key.Read.1 ] + readKey: key.Read.1 + - id: B.1.2 + aclChanges: + - userAdd: + identity: F + permission: admin + encryptionKey: key.Enc.F + encryptedReadKeys: [ key.Read.1 ] + readKey: key.Read.1 +keys: + Enc: + - A + - B + - C + - D + - E + - F + Sign: + - A + - B + - C + - D + - E + - F + Read: + - 1 + - 2 +graph: + - id: A.1.1 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + - id: A.1.2 + baseSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + - id: B.1.1 + baseSnapshot: A.1.1 + aclHeads: [A.1.1] + treeHeads: [A.1.1] + - id: B.1.2 + baseSnapshot: A.1.2 + aclHeads: [A.1.2] + treeHeads: [A.1.2] + - id: A.1.3 + baseSnapshot: A.1.2 + aclHeads: [A.1.2] + treeHeads: [A.1.2] diff --git a/data/threadbuilder/keychain.go b/data/threadbuilder/keychain.go new file mode 100644 index 00000000..f048d1d6 --- /dev/null +++ b/data/threadbuilder/keychain.go @@ -0,0 +1,138 @@ +package threadbuilder + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/textileio/go-threads/crypto/symmetric" + "hash/fnv" + "strings" +) + +type SymKey struct { + Hash uint64 + Key *symmetric.Key +} + +type Keychain struct { + SigningKeys map[string]threadmodels.SigningPrivKey + EncryptionKeys map[string]threadmodels.EncryptionPrivKey + ReadKeys map[string]*SymKey + GeneratedIdentities map[string]string + coder *threadmodels.Ed25519SigningPubKeyDecoder +} + +func NewKeychain() *Keychain { + return &Keychain{ + SigningKeys: map[string]threadmodels.SigningPrivKey{}, + EncryptionKeys: map[string]threadmodels.EncryptionPrivKey{}, + GeneratedIdentities: map[string]string{}, + ReadKeys: map[string]*SymKey{}, + coder: threadmodels.NewEd25519Decoder(), + } +} + +func (k *Keychain) ParseKeys(keys *Keys) { + for _, encKey := range keys.Enc { + k.AddEncryptionKey(encKey) + } + + for _, signKey := range keys.Sign { + k.AddSigningKey(signKey) + } + + for _, readKey := range keys.Read { + k.AddReadKey(readKey) + } +} + +func (k *Keychain) AddEncryptionKey(name string) { + if _, exists := k.EncryptionKeys[name]; exists { + return + } + newPrivKey, _, err := threadmodels.GenerateRandomRSAKeyPair(2048) + if err != nil { + panic(err) + } + + k.EncryptionKeys[name] = newPrivKey +} + +func (k *Keychain) AddSigningKey(name string) { + if _, exists := k.SigningKeys[name]; exists { + return + } + newPrivKey, pubKey, err := threadmodels.GenerateRandomEd25519KeyPair() + if err != nil { + panic(err) + } + + k.SigningKeys[name] = newPrivKey + res, err := k.coder.EncodeToString(pubKey) + if err != nil { + panic(err) + } + k.GeneratedIdentities[name] = res +} + +func (k *Keychain) AddReadKey(name string) { + if _, exists := k.ReadKeys[name]; exists { + return + } + key, _ := symmetric.NewRandom() + + hasher := fnv.New64() + hasher.Write(key.Bytes()) + + k.ReadKeys[name] = &SymKey{ + Hash: hasher.Sum64(), + Key: key, + } +} + +func (k *Keychain) AddKey(key string) { + parts := strings.Split(key, ".") + if len(parts) != 3 { + panic("cannot parse a key") + } + name := parts[2] + + switch parts[1] { + case "Sign": + k.AddSigningKey(name) + case "Enc": + k.AddEncryptionKey(name) + case "Read": + k.AddReadKey(name) + default: + panic("incorrect format") + } +} + +func (k *Keychain) GetKey(key string) interface{} { + parts := strings.Split(key, ".") + if len(parts) != 3 { + panic("cannot parse a key") + } + name := parts[2] + + switch parts[1] { + case "Sign": + if key, exists := k.SigningKeys[name]; exists { + return key + } + case "Enc": + if key, exists := k.EncryptionKeys[name]; exists { + return key + } + case "Read": + if key, exists := k.ReadKeys[name]; exists { + return key + } + default: + panic("incorrect format") + } + return nil +} + +func (k *Keychain) GetIdentity(name string) string { + return k.GeneratedIdentities[name] +} diff --git a/data/threadbuilder/threadbuilder.go b/data/threadbuilder/threadbuilder.go new file mode 100644 index 00000000..f391e8ad --- /dev/null +++ b/data/threadbuilder/threadbuilder.go @@ -0,0 +1,444 @@ +package threadbuilder + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/core/smartblock" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/pb/model" + "github.com/gogo/protobuf/proto" + "io/ioutil" + "sort" +) + +type threadRecord struct { + *pb.ACLChange + id string + logId string + readKey *SymKey + signKey threadmodels.SigningPrivKey + + prevRecord *threadRecord + changesData *pb.ACLChangeChangeData +} + +type threadLog struct { + id string + owner string + records []*threadRecord +} + +type ThreadBuilder struct { + threadId string + logs map[string]*threadLog + allRecords map[string]*threadRecord + keychain *Keychain +} + +func NewThreadBuilder(keychain *Keychain) *ThreadBuilder { + return &ThreadBuilder{ + logs: make(map[string]*threadLog), + allRecords: make(map[string]*threadRecord), + keychain: keychain, + } +} + +func NewThreadBuilderFromFile(file string) (*ThreadBuilder, error) { + content, err := ioutil.ReadFile(file) + if err != nil { + return nil, err + } + + thread := YMLThread{} + err = yaml.Unmarshal(content, &thread) + if err != nil { + return nil, err + } + + tb := NewThreadBuilder(NewKeychain()) + tb.Parse(&thread) + + return tb, nil +} + +func (t *ThreadBuilder) ID() string { + return t.threadId +} + +func (t *ThreadBuilder) GetKeychain() *Keychain { + return t.keychain +} + +func (t *ThreadBuilder) GetLogs() ([]threadmodels.ThreadLog, error) { + var logs []threadmodels.ThreadLog + for _, l := range t.logs { + logs = append(logs, threadmodels.ThreadLog{ + ID: l.id, + Head: l.records[len(l.records)-1].id, + Counter: int64(len(l.records)), + }) + } + sort.Slice(logs, func(i, j int) bool { + return logs[i].ID < logs[j].ID + }) + return logs, nil +} + +func (t *ThreadBuilder) GetRecord(ctx context.Context, recordID string) (*threadmodels.ThreadRecord, error) { + rec := t.allRecords[recordID] + prevId := "" + if rec.prevRecord != nil { + prevId = rec.prevRecord.id + } + + var encrypted []byte + if rec.changesData != nil { + m, err := proto.Marshal(rec.changesData) + if err != nil { + panic("should be able to marshal data!") + } + + encrypted, err = rec.readKey.Key.Encrypt(m) + if err != nil { + panic("should be able to encrypt data with read key!") + } + + rec.ChangesData = encrypted + } + + aclMarshaled, err := proto.Marshal(rec.ACLChange) + if err != nil { + panic("should be able to marshal final acl message!") + } + + signature, err := rec.signKey.Sign(aclMarshaled) + if err != nil { + panic("should be able to sign final acl message!") + } + + transformedRec := &threadmodels.ThreadRecord{ + PrevId: prevId, + Id: rec.id, + LogId: rec.logId, + Signed: &threadmodels.SignedPayload{ + Payload: aclMarshaled, + Signature: signature, + }, + } + return transformedRec, nil +} + +func (t *ThreadBuilder) PushRecord(payload proto.Marshaler) (id string, err error) { + panic("implement me") +} + +func (t *ThreadBuilder) Parse(thread *YMLThread) { + // Just to clarify - we are generating new identities for the ones that + // are specified in the yml file, because our identities should be Ed25519 + // the same thing is happening for the encryption keys + t.keychain.ParseKeys(&thread.Keys) + t.threadId = t.parseThreadId(thread.Description) + for _, l := range thread.Logs { + newLog := &threadLog{ + id: l.Id, + owner: t.keychain.GetIdentity(l.Identity), + } + var records []*threadRecord + for _, r := range l.Records { + newRecord := &threadRecord{ + id: r.Id, + logId: newLog.id, + } + if len(records) > 0 { + newRecord.prevRecord = records[len(records)-1] + } + k := t.keychain.GetKey(r.ReadKey).(*SymKey) + newRecord.readKey = k + newRecord.signKey = t.keychain.SigningKeys[l.Identity] + + aclChange := &pb.ACLChange{} + aclChange.Identity = newLog.owner + if len(r.AclChanges) > 0 || r.AclSnapshot != nil { + aclChange.AclData = &pb.ACLChangeACLData{} + if r.AclSnapshot != nil { + aclChange.AclData.AclSnapshot = t.parseACLSnapshot(r.AclSnapshot) + } + if r.AclChanges != nil { + var aclChangeContents []*pb.ACLChangeACLContentValue + for _, ch := range r.AclChanges { + aclChangeContent := t.parseACLChange(ch) + aclChangeContents = append(aclChangeContents, aclChangeContent) + } + aclChange.AclData.AclContent = aclChangeContents + } + } + if len(r.Changes) > 0 || r.Snapshot != nil { + newRecord.changesData = &pb.ACLChangeChangeData{} + if r.Snapshot != nil { + newRecord.changesData.Snapshot = t.parseChangeSnapshot(r.Snapshot) + } + if len(r.Changes) > 0 { + var changeContents []*pb.ChangeContent + for _, ch := range r.Changes { + aclChangeContent := t.parseDocumentChange(ch) + changeContents = append(changeContents, aclChangeContent) + } + newRecord.changesData.Content = changeContents + } + } + aclChange.CurrentReadKeyHash = k.Hash + newRecord.ACLChange = aclChange + t.allRecords[newRecord.id] = newRecord + records = append(records, newRecord) + } + newLog.records = records + t.logs[newLog.id] = newLog + } + t.parseGraph(thread) +} + +func (t *ThreadBuilder) parseThreadId(description *ThreadDescription) string { + if description == nil { + panic("no author in thread") + } + key := t.keychain.SigningKeys[description.Author] + id, err := threadmodels.CreateACLThreadID(key.GetPublic(), smartblock.SmartBlockTypeWorkspace) + if err != nil { + panic(err) + } + + return id.String() +} + +func (t *ThreadBuilder) parseChangeSnapshot(s *ChangeSnapshot) *pb.ChangeSnapshot { + data := &model.SmartBlockSnapshotBase{} + var blocks []*model.Block + for _, b := range s.Blocks { + modelBlock := &model.Block{Id: b.Id, ChildrenIds: b.ChildrenIds} + blocks = append(blocks, modelBlock) + } + data.Blocks = blocks + return &pb.ChangeSnapshot{ + Data: data, + } +} + +func (t *ThreadBuilder) parseACLSnapshot(s *ACLSnapshot) *pb.ACLChangeACLSnapshot { + newState := &pb.ACLChangeACLState{} + for _, state := range s.UserStates { + aclUserState := &pb.ACLChangeUserState{} + aclUserState.Identity = t.keychain.GetIdentity(state.Identity) + + encKey := t.keychain. + GetKey(state.EncryptionKey).(threadmodels.EncryptionPrivKey) + rawKey, _ := encKey.GetPublic().Raw() + aclUserState.EncryptionKey = rawKey + + aclUserState.EncryptedReadKeys = t.encryptReadKeys(state.EncryptedReadKeys, encKey) + aclUserState.Permissions = t.convertPermission(state.Permissions) + newState.UserStates = append(newState.UserStates, aclUserState) + } + return &pb.ACLChangeACLSnapshot{ + AclState: newState, + } +} + +func (t *ThreadBuilder) parseDocumentChange(ch *DocumentChange) (convCh *pb.ChangeContent) { + switch { + case ch.BlockAdd != nil: + blockAdd := ch.BlockAdd + + convCh = &pb.ChangeContent{ + Value: &pb.ChangeContentValueOfBlockCreate{ + BlockCreate: &pb.ChangeBlockCreate{ + TargetId: blockAdd.TargetId, + Position: model.Block_Inner, + Blocks: []*model.Block{&model.Block{Id: blockAdd.Id}}, + }}} + } + if convCh == nil { + panic("cannot have empty document change") + } + + return convCh +} + +func (t *ThreadBuilder) parseACLChange(ch *ACLChange) (convCh *pb.ACLChangeACLContentValue) { + switch { + case ch.UserAdd != nil: + add := ch.UserAdd + + encKey := t.keychain. + GetKey(add.EncryptionKey).(threadmodels.EncryptionPrivKey) + rawKey, _ := encKey.GetPublic().Raw() + + convCh = &pb.ACLChangeACLContentValue{ + Value: &pb.ACLChangeACLContentValueValueOfUserAdd{ + UserAdd: &pb.ACLChangeUserAdd{ + Identity: t.keychain.GetIdentity(add.Identity), + EncryptionKey: rawKey, + EncryptedReadKeys: t.encryptReadKeys(add.EncryptedReadKeys, encKey), + Permissions: t.convertPermission(add.Permission), + }, + }, + } + case ch.UserJoin != nil: + join := ch.UserJoin + + encKey := t.keychain. + GetKey(join.EncryptionKey).(threadmodels.EncryptionPrivKey) + rawKey, _ := encKey.GetPublic().Raw() + + idKey, _ := t.keychain.SigningKeys[join.Identity].GetPublic().Raw() + signKey := t.keychain.GetKey(join.AcceptSignature).(threadmodels.SigningPrivKey) + signature, err := signKey.Sign(idKey) + if err != nil { + panic(err) + } + + convCh = &pb.ACLChangeACLContentValue{ + Value: &pb.ACLChangeACLContentValueValueOfUserJoin{ + UserJoin: &pb.ACLChangeUserJoin{ + Identity: t.keychain.GetIdentity(join.Identity), + EncryptionKey: rawKey, + AcceptSignature: signature, + UserInviteChangeId: join.InviteId, + EncryptedReadKeys: t.encryptReadKeys(join.EncryptedReadKeys, encKey), + }, + }, + } + case ch.UserInvite != nil: + invite := ch.UserInvite + rawAcceptKey, _ := t.keychain.GetKey(invite.AcceptKey).(threadmodels.SigningPrivKey).GetPublic().Raw() + encKey := t.keychain. + GetKey(invite.EncryptionKey).(threadmodels.EncryptionPrivKey) + rawEncKey, _ := encKey.GetPublic().Raw() + + convCh = &pb.ACLChangeACLContentValue{ + Value: &pb.ACLChangeACLContentValueValueOfUserInvite{ + UserInvite: &pb.ACLChangeUserInvite{ + AcceptPublicKey: rawAcceptKey, + EncryptPublicKey: rawEncKey, + EncryptedReadKeys: t.encryptReadKeys(invite.EncryptedReadKeys, encKey), + Permissions: t.convertPermission(invite.Permissions), + }, + }, + } + case ch.UserConfirm != nil: + confirm := ch.UserConfirm + + convCh = &pb.ACLChangeACLContentValue{ + Value: &pb.ACLChangeACLContentValueValueOfUserConfirm{ + UserConfirm: &pb.ACLChangeUserConfirm{ + Identity: t.keychain.GetIdentity(confirm.Identity), + UserAddId: confirm.UserAddId, + }, + }, + } + case ch.UserPermissionChange != nil: + permissionChange := ch.UserPermissionChange + + convCh = &pb.ACLChangeACLContentValue{ + Value: &pb.ACLChangeACLContentValueValueOfUserPermissionChange{ + UserPermissionChange: &pb.ACLChangeUserPermissionChange{ + Identity: t.keychain.GetIdentity(permissionChange.Identity), + Permissions: t.convertPermission(permissionChange.Permission), + }, + }, + } + case ch.UserRemove != nil: + remove := ch.UserRemove + + newReadKey := t.keychain.GetKey(remove.NewReadKey).(*SymKey) + + var replaces []*pb.ACLChangeReadKeyReplace + for _, id := range remove.IdentitiesLeft { + identity := t.keychain.GetIdentity(id) + encKey := t.keychain.EncryptionKeys[id] + rawEncKey, _ := encKey.GetPublic().Raw() + encReadKey, err := encKey.GetPublic().Encrypt(newReadKey.Key.Bytes()) + if err != nil { + panic(err) + } + replaces = append(replaces, &pb.ACLChangeReadKeyReplace{ + Identity: identity, + EncryptionKey: rawEncKey, + EncryptedReadKey: encReadKey, + }) + } + + convCh = &pb.ACLChangeACLContentValue{ + Value: &pb.ACLChangeACLContentValueValueOfUserRemove{ + UserRemove: &pb.ACLChangeUserRemove{ + Identity: t.keychain.GetIdentity(remove.RemovedIdentity), + ReadKeyReplaces: replaces, + }, + }, + } + } + if convCh == nil { + panic("cannot have empty acl change") + } + + return convCh +} + +func (t *ThreadBuilder) encryptReadKeys(keys []string, encKey threadmodels.EncryptionPrivKey) (enc [][]byte) { + for _, k := range keys { + realKey := t.keychain.GetKey(k).(*SymKey).Key.Bytes() + res, err := encKey.GetPublic().Encrypt(realKey) + if err != nil { + panic(err) + } + + enc = append(enc, res) + } + return +} + +func (t *ThreadBuilder) convertPermission(perm string) pb.ACLChangeUserPermissions { + switch perm { + case "admin": + return pb.ACLChange_Admin + case "writer": + return pb.ACLChange_Writer + case "reader": + return pb.ACLChange_Reader + default: + panic(fmt.Sprintf("incorrect permission: %s", perm)) + } +} + +func (t *ThreadBuilder) traverseFromHeads(f func(t *threadRecord) error) error { + allLogs, err := t.GetLogs() + if err != nil { + return err + } + + for _, log := range allLogs { + head := t.allRecords[log.Head] + err = f(head) + if err != nil { + return err + } + + for head.prevRecord != nil { + head = head.prevRecord + err = f(head) + if err != nil { + return err + } + } + } + return nil +} + +func (t *ThreadBuilder) parseGraph(thread *YMLThread) { + for _, node := range thread.Graph { + rec := t.allRecords[node.Id] + rec.AclHeadIds = node.ACLHeads + rec.TreeHeadIds = node.TreeHeads + rec.SnapshotBaseId = node.BaseSnapshot + } +} diff --git a/data/threadbuilder/threadbuildergraph.go b/data/threadbuilder/threadbuildergraph.go new file mode 100644 index 00000000..edb98971 --- /dev/null +++ b/data/threadbuilder/threadbuildergraph.go @@ -0,0 +1,11 @@ +//go:build ((!linux && !darwin) || android || ios || nographviz) && !amd64 +// +build !linux,!darwin android ios nographviz +// +build !amd64 + +package threadbuilder + +import "fmt" + +func (t *ThreadBuilder) Graph() (string, error) { + return "", fmt.Errorf("building graphs is not supported") +} diff --git a/data/threadbuilder/threadbuildergraph_nix.go b/data/threadbuilder/threadbuildergraph_nix.go new file mode 100644 index 00000000..75c7e06d --- /dev/null +++ b/data/threadbuilder/threadbuildergraph_nix.go @@ -0,0 +1,153 @@ +//go:build (linux || darwin) && !android && !ios && !nographviz && (amd64 || arm64) +// +build linux darwin +// +build !android +// +build !ios +// +build !nographviz +// +build amd64 arm64 + +package threadbuilder + +import ( + "fmt" + "strings" + "unicode" + + "github.com/awalterschulze/gographviz" +) + +// To quickly look at visualized string you can use https://dreampuf.github.io/GraphvizOnline + +type EdgeParameters struct { + style string + color string + label string +} + +func (t *ThreadBuilder) Graph() (string, error) { + // TODO: check updates on https://github.com/goccy/go-graphviz/issues/52 or make a fix yourself to use better library here + graph := gographviz.NewGraph() + graph.SetName("G") + graph.SetDir(true) + var nodes = make(map[string]struct{}) + + var addNodes = func(r *threadRecord) error { + // TODO: revisit function after checking + + style := "solid" + if r.GetAclData() != nil { + style = "filled" + } else if r.changesData != nil { + style = "dashed" + } + + var chSymbs []string + if r.changesData != nil { + for _, chc := range r.changesData.Content { + tp := fmt.Sprintf("%T", chc.Value) + tp = strings.Replace(tp, "ChangeContentValueOf", "", 1) + res := "" + for _, ts := range tp { + if unicode.IsUpper(ts) { + res += string(ts) + } + } + chSymbs = append(chSymbs, res) + } + } + if r.GetAclData() != nil { + for _, chc := range r.GetAclData().AclContent { + tp := fmt.Sprintf("%T", chc.Value) + tp = strings.Replace(tp, "ACLChangeACLContentValueValueOf", "", 1) + res := "" + for _, ts := range tp { + if unicode.IsUpper(ts) { + res += string(ts) + } + } + chSymbs = append(chSymbs, res) + } + } + + shortId := r.id + label := fmt.Sprintf("Id: %s\nChanges: %s\n", + shortId, + strings.Join(chSymbs, ","), + ) + e := graph.AddNode("G", "\""+r.id+"\"", map[string]string{ + "label": "\"" + label + "\"", + "style": "\"" + style + "\"", + }) + if e != nil { + return e + } + nodes[r.id] = struct{}{} + return nil + } + + var createEdge = func(firstId, secondId string, params EdgeParameters) error { + _, exists := nodes[firstId] + if !exists { + return fmt.Errorf("no such node") + } + _, exists = nodes[secondId] + if !exists { + return fmt.Errorf("no previous node") + } + + err := graph.AddEdge("\""+firstId+"\"", "\""+secondId+"\"", true, map[string]string{ + "color": params.color, + "style": params.style, + }) + if err != nil { + return err + } + + return nil + } + + var addLinks = func(t *threadRecord) error { + for _, prevId := range t.AclHeadIds { + err := createEdge(t.id, prevId, EdgeParameters{ + style: "dashed", + color: "red", + }) + if err != nil { + return err + } + } + + for _, prevId := range t.TreeHeadIds { + err := createEdge(t.id, prevId, EdgeParameters{ + style: "dashed", + color: "blue", + }) + if err != nil { + return err + } + } + + if t.SnapshotBaseId != "" { + err := createEdge(t.id, t.SnapshotBaseId, EdgeParameters{ + style: "bold", + color: "blue", + }) + if err != nil { + return err + } + } + + return nil + } + + err := t.traverseFromHeads(addNodes) + if err != nil { + return "", err + } + + err = t.traverseFromHeads(addLinks) + if err != nil { + return "", err + } + + return graph.String(), nil +} diff --git a/data/threadbuilder/userjoinexample.yml b/data/threadbuilder/userjoinexample.yml new file mode 100644 index 00000000..e70c9fc3 --- /dev/null +++ b/data/threadbuilder/userjoinexample.yml @@ -0,0 +1,109 @@ +thread: + author: A +logs: + - id: A.1 + identity: A + records: + - id: A.1.1 + aclSnapshot: + userStates: + - identity: A + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + permission: admin + snapshot: + blocks: + - id: root + aclChanges: + - userAdd: + identity: A + permission: admin + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + changes: + - blockAdd: + id: root + readKey: key.Read.1 + - id: A.1.2 + aclChanges: + - userInvite: + acceptKey: key.Sign.Onetime1 + encryptionKey: key.Enc.Onetime1 + encryptedReadKeys: [key.Read.1] + permissions: writer + - userAdd: + identity: C + permission: reader + encryptionKey: key.Enc.C + encryptedReadKeys: [ key.Read.1 ] + readKey: key.Read.1 + - id: A.1.3 + changes: + - blockAdd: + id: second + targetId: root + readKey: key.Read.1 + - id: B.1 + identity: B + records: + - id: B.1.1 + aclChanges: + - userJoin: + identity: B + encryptionKey: key.Enc.B + acceptSignature: key.Sign.Onetime1 + inviteId: A.1.2 + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: B.1.2 + changes: + - blockAdd: + id: first + targetId: root + readKey: key.Read.1 + - id: C.1 + identity: C + records: + - id: C.1.1 + changes: + - blockAdd: + id: third + targetId: root + readKey: key.Read.1 +keys: + Enc: + - A + - B + - C + - Onetime1 + Sign: + - A + - B + - C + - Onetime1 + Read: + - 1 +graph: + - id: A.1.1 + baseSnapshot: A.1.1 + - id: A.1.2 + baseSnapshot: A.1.1 + aclHeads: [A.1.1] + treeHeads: [A.1.1] + - id: B.1.1 + baseSnapshot: A.1.1 + aclHeads: [A.1.2] + treeHeads: [A.1.2] + - id: B.1.2 + baseSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + - id: A.1.3 + baseSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.2, C.1.1] + - id: C.1.1 + baseSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + diff --git a/data/threadbuilder/userremovebeforeexample.yml b/data/threadbuilder/userremovebeforeexample.yml new file mode 100644 index 00000000..a5abfd13 --- /dev/null +++ b/data/threadbuilder/userremovebeforeexample.yml @@ -0,0 +1,105 @@ +thread: + author: A +logs: + - id: A.1 + identity: A + records: + - id: A.1.1 + aclSnapshot: + userStates: + - identity: A + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + permission: admin + - identity: B + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + permission: admin + snapshot: + blocks: + - id: root + aclChanges: + - userAdd: + identity: A + permission: admin + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + - userAdd: + identity: B + permission: admin + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + changes: + - blockAdd: + id: root + readKey: key.Read.1 + - id: A.1.2 + aclChanges: + - userRemove: + removedIdentity: B + newReadKey: key.Read.2 + identitiesLeft: [A, C] + readKey: key.Read.2 + - id: A.1.3 + aclChanges: + - userAdd: + identity: E + permission: admin + encryptionKey: key.Enc.E + encryptedReadKeys: [key.Read.1, key.Read.2] + readKey: key.Read.2 + - id: B.1 + identity: B + records: + - id: B.1.1 + aclChanges: + - userAdd: + identity: C + permission: admin + encryptionKey: key.Enc.C + encryptedReadKeys: [ key.Read.1 ] + readKey: key.Read.1 + - id: B.1.2 + aclChanges: + - userAdd: + identity: D + permission: admin + encryptionKey: key.Enc.D + encryptedReadKeys: [ key.Read.1 ] + readKey: key.Read.1 +keys: + Enc: + - A + - B + - C + - D + - E + Sign: + - A + - B + - C + - D + - E + Read: + - 1 + - 2 +graph: + - id: A.1.1 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + - id: A.1.2 + baseSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + - id: B.1.1 + baseSnapshot: A.1.1 + aclHeads: [A.1.1] + treeHeads: [A.1.1] + - id: B.1.2 + baseSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + - id: A.1.3 + baseSnapshot: A.1.1 + aclHeads: [A.1.2] + treeHeads: [A.1.2] diff --git a/data/threadbuilder/userremoveexample.yml b/data/threadbuilder/userremoveexample.yml new file mode 100644 index 00000000..6176b0ac --- /dev/null +++ b/data/threadbuilder/userremoveexample.yml @@ -0,0 +1,106 @@ +thread: + author: A +logs: + - id: A.1 + identity: A + records: + - id: A.1.1 + aclSnapshot: + userStates: + - identity: A + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + permission: admin + snapshot: + blocks: + - id: root + aclChanges: + - userAdd: + identity: A + permission: admin + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + changes: + - blockAdd: + id: root + readKey: key.Read.1 + - id: A.1.2 + aclChanges: + - userInvite: + acceptKey: key.Sign.Onetime1 + encryptionKey: key.Enc.Onetime1 + encryptedReadKeys: [key.Read.1] + permissions: writer + readKey: key.Read.1 + - id: A.1.3 + aclChanges: + - userRemove: + removedIdentity: B + newReadKey: key.Read.2 + identitiesLeft: [A] + readKey: key.Read.2 + - id: A.1.4 + changes: + - blockAdd: + id: second + targetId: root + readKey: key.Read.2 + - id: B.1 + identity: B + records: + - id: B.1.1 + aclChanges: + - userJoin: + identity: B + encryptionKey: key.Enc.B + acceptSignature: key.Sign.Onetime1 + inviteId: A.1.2 + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: B.1.2 + changes: + - blockAdd: + id: first + targetId: root + readKey: key.Read.1 +keys: + Enc: + - A + - B + - Onetime1 + Sign: + - A + - B + - Onetime1 + Read: + - 1 + - 2 +graph: + - id: A.1.1 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + - id: A.1.2 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + aclHeads: [A.1.1] + treeHeads: [A.1.1] + - id: B.1.1 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + aclHeads: [A.1.2] + treeHeads: [A.1.2] + - id: B.1.2 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + - id: A.1.3 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + - id: A.1.4 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + aclHeads: [A.1.3] + treeHeads: [A.1.3] diff --git a/data/threadbuilder/validsnapshotexample.yml b/data/threadbuilder/validsnapshotexample.yml new file mode 100644 index 00000000..abdcd093 --- /dev/null +++ b/data/threadbuilder/validsnapshotexample.yml @@ -0,0 +1,126 @@ +thread: + author: A +logs: + - id: A.1 + identity: A + records: + - id: A.1.1 + aclSnapshot: + userStates: + - identity: A + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + permission: admin + - identity: B + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + permission: admin + snapshot: + blocks: + - id: root + aclChanges: + - userAdd: + identity: A + permission: admin + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + - userAdd: + identity: B + permission: admin + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: A.1.2 + aclSnapshot: + userStates: + - identity: A + encryptionKey: key.Enc.A + encryptedReadKeys: [key.Read.1] + permission: admin + - identity: B + encryptionKey: key.Enc.B + encryptedReadKeys: [key.Read.1] + permission: admin + - identity: C + encryptionKey: key.Enc.C + encryptedReadKeys: [ key.Read.1 ] + permission: admin + - identity: D + encryptionKey: key.Enc.D + encryptedReadKeys: [ key.Read.1 ] + permission: admin + snapshot: + blocks: + - id: root + aclChanges: + - userAdd: + identity: D + permission: admin + encryptionKey: key.Enc.D + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: A.1.3 + aclChanges: + - userAdd: + identity: E + permission: admin + encryptionKey: key.Enc.E + encryptedReadKeys: [key.Read.1] + readKey: key.Read.1 + - id: B.1 + identity: B + records: + - id: B.1.1 + aclChanges: + - userAdd: + identity: C + permission: admin + encryptionKey: key.Enc.C + encryptedReadKeys: [ key.Read.1 ] + readKey: key.Read.1 + - id: B.1.2 + aclChanges: + - userAdd: + identity: F + permission: admin + encryptionKey: key.Enc.F + encryptedReadKeys: [ key.Read.1 ] + readKey: key.Read.1 +keys: + Enc: + - A + - B + - C + - D + - E + - F + Sign: + - A + - B + - C + - D + - E + - F + Read: + - 1 + - 2 +graph: + - id: A.1.1 + baseSnapshot: A.1.1 + aclSnapshot: A.1.1 + - id: A.1.2 + baseSnapshot: A.1.1 + aclHeads: [B.1.1] + treeHeads: [B.1.1] + - id: B.1.1 + baseSnapshot: A.1.1 + aclHeads: [A.1.1] + treeHeads: [A.1.1] + - id: B.1.2 + baseSnapshot: A.1.2 + aclHeads: [A.1.2] + treeHeads: [A.1.2] + - id: A.1.3 + baseSnapshot: A.1.2 + aclHeads: [A.1.2] + treeHeads: [A.1.2] diff --git a/data/threadbuilder/ymlentities.go b/data/threadbuilder/ymlentities.go new file mode 100644 index 00000000..771bbf27 --- /dev/null +++ b/data/threadbuilder/ymlentities.go @@ -0,0 +1,103 @@ +package threadbuilder + +type ThreadDescription struct { + Author string `yaml:"author"` +} + +type Keys struct { + Enc []string `yaml:"Enc"` + Sign []string `yaml:"Sign"` + Read []string `yaml:"Read"` +} + +type ACLSnapshot struct { + UserStates []struct { + Identity string `yaml:"identity"` + EncryptionKey string `yaml:"encryptionKey"` + EncryptedReadKeys []string `yaml:"encryptedReadKeys"` + Permissions string `yaml:"permission"` + IsConfirmed bool `yaml:"isConfirmed"` + } `yaml:"userStates"` +} + +type ChangeSnapshot struct { + Blocks []struct { + Id string `yaml:"id"` + ChildrenIds []string `yaml:"childrenIds"` + } `yaml:"blocks"` +} + +type ACLChange struct { + UserAdd *struct { + Identity string `yaml:"identity"` + EncryptionKey string `yaml:"encryptionKey"` + EncryptedReadKeys []string `yaml:"encryptedReadKeys"` + Permission string `yaml:"permission"` + } `yaml:"userAdd"` + + UserJoin *struct { + Identity string `yaml:"identity"` + EncryptionKey string `yaml:"encryptionKey"` + AcceptSignature string `yaml:"acceptSignature"` + InviteId string `yaml:"inviteId"` + EncryptedReadKeys []string `yaml:"encryptedReadKeys"` + } `yaml:"userJoin"` + + UserInvite *struct { + AcceptKey string `yaml:"acceptKey"` + EncryptionKey string `yaml:"encryptionKey"` + EncryptedReadKeys []string `yaml:"encryptedReadKeys"` + Permissions string `yaml:"permissions"` + } `yaml:"userInvite"` + + UserConfirm *struct { + Identity string `yaml:"identity"` + UserAddId string `yaml:"UserAddId"` + } `yaml:"userConfirm"` + + UserRemove *struct { + RemovedIdentity string `yaml:"removedIdentity"` + NewReadKey string `yaml:"newReadKey"` + IdentitiesLeft []string `yaml:"identitiesLeft"` + } `yaml:"userRemove"` + + UserPermissionChange *struct { + Identity string `yaml:"identity"` + Permission string `yaml:"permission"` + } +} + +type DocumentChange struct { + BlockAdd *struct { + Id string `yaml:"id"` + TargetId string `yaml:"targetId"` + } `yaml:"blockAdd"` +} + +type YMLThread struct { + Description *ThreadDescription `yaml:"thread"` + Logs []struct { + Id string `yaml:"id"` + Identity string `yaml:"identity"` + Records []struct { + Id string `yaml:"id"` + + AclSnapshot *ACLSnapshot `yaml:"aclSnapshot"` + Snapshot *ChangeSnapshot `yaml:"snapshot"` + AclChanges []*ACLChange `yaml:"aclChanges"` + Changes []*DocumentChange `yaml:"changes"` + + ReadKey string `yaml:"readKey"` + } `yaml:"records"` + } `yaml:"logs"` + + Keys Keys `yaml:"keys"` + + Graph []struct { + Id string `yaml:"id"` + BaseSnapshot string `yaml:"baseSnapshot"` + AclSnapshot string `yaml:"aclSnapshot"` + ACLHeads []string `yaml:"aclHeads"` + TreeHeads []string `yaml:"treeHeads"` + } `yaml:"graph"` +} diff --git a/data/threadbuilder/ymlentities_test.go b/data/threadbuilder/ymlentities_test.go new file mode 100644 index 00000000..d2420eae --- /dev/null +++ b/data/threadbuilder/ymlentities_test.go @@ -0,0 +1,12 @@ +package threadbuilder + +import ( + "fmt" + "testing" +) + +func Test_YamlParse(t *testing.T) { + tb, _ := NewThreadBuilderFromFile("userjoinexample.yml") + gr, _ := tb.Graph() + fmt.Println(gr) +} diff --git a/data/threadhelpers.go b/data/threadhelpers.go new file mode 100644 index 00000000..78b8df3d --- /dev/null +++ b/data/threadhelpers.go @@ -0,0 +1,81 @@ +package data + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/core/block/editor/state" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" +) + +type ACLContext struct { + Tree *Tree + ACLState *ACLState + DocState *state.State +} + +func createTreeFromThread(t threadmodels.Thread, fromStart bool) (*Tree, error) { + treeBuilder := NewTreeBuilder(t, threadmodels.NewEd25519Decoder()) + return treeBuilder.Build(fromStart) +} + +func createACLStateFromThread( + t threadmodels.Thread, + identity string, + key threadmodels.EncryptionPrivKey, + decoder threadmodels.SigningPubKeyDecoder, + fromStart bool) (*ACLContext, error) { + tree, err := createTreeFromThread(t, fromStart) + if err != nil { + return nil, err + } + + aclTreeBuilder := NewACLTreeBuilder(t, decoder) + aclTree, err := aclTreeBuilder.Build() + if err != nil { + return nil, err + } + + if !fromStart { + snapshotValidator := NewSnapshotValidator(aclTree, identity, key, decoder) + valid, err := snapshotValidator.ValidateSnapshot(tree.root) + if err != nil { + return nil, err + } + if !valid { + // TODO: think about what to do if the snapshot is invalid - should we rebuild the tree without it + return createACLStateFromThread(t, identity, key, decoder, true) + } + } + + aclBuilder, err := NewACLStateBuilder(tree, identity, key, decoder) + if err != nil { + return nil, err + } + + aclState, err := aclBuilder.Build() + if err != nil { + return nil, err + } + return &ACLContext{ + Tree: tree, + ACLState: aclState, + }, nil +} + +func createDocumentStateFromThread( + t threadmodels.Thread, + identity string, + key threadmodels.EncryptionPrivKey, + decoder threadmodels.SigningPubKeyDecoder) (*ACLContext, error) { + context, err := createACLStateFromThread(t, identity, key, decoder, false) + if err != nil { + return nil, err + } + + docStateBuilder := newDocumentStateBuilder(context.Tree, context.ACLState) + docState, err := docStateBuilder.build() + if err != nil { + return nil, err + } + context.DocState = docState + + return context, nil +} diff --git a/data/threadmodels/keys.go b/data/threadmodels/keys.go new file mode 100644 index 00000000..95a7254b --- /dev/null +++ b/data/threadmodels/keys.go @@ -0,0 +1,255 @@ +package threadmodels + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/sha512" + "crypto/subtle" + "crypto/x509" + "errors" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/strkey" + "github.com/libp2p/go-libp2p-core/crypto" + crypto_pb "github.com/libp2p/go-libp2p-core/crypto/pb" + "io" +) + +type SigningPubKey crypto.PubKey +type SigningPrivKey crypto.PrivKey + +var MinRsaKeyBits = 2048 + +var ErrKeyLengthTooSmall = errors.New("error key length too small") + +type Key interface { + Equals(Key) bool + + Raw() ([]byte, error) +} + +type EncryptionPrivKey interface { + Key + + Decrypt([]byte) ([]byte, error) + GetPublic() EncryptionPubKey +} + +type EncryptionPubKey interface { + Key + + Encrypt(data []byte) ([]byte, error) +} + +type EncryptionRsaPrivKey struct { + privKey rsa.PrivateKey +} + +type EncryptionRsaPubKey struct { + pubKey rsa.PublicKey +} + +func (e *EncryptionRsaPubKey) Equals(key Key) bool { + other, ok := (key).(*EncryptionRsaPubKey) + if !ok { + return keyEquals(e, key) + } + + return e.pubKey.N.Cmp(other.pubKey.N) == 0 && e.pubKey.E == other.pubKey.E +} + +func (e *EncryptionRsaPubKey) Raw() ([]byte, error) { + return x509.MarshalPKIXPublicKey(&e.pubKey) +} + +func (e *EncryptionRsaPubKey) Encrypt(data []byte) ([]byte, error) { + hash := sha512.New() + return rsa.EncryptOAEP(hash, rand.Reader, &e.pubKey, data, nil) +} + +func (e *EncryptionRsaPrivKey) Equals(key Key) bool { + other, ok := (key).(*EncryptionRsaPrivKey) + if !ok { + return keyEquals(e, key) + } + + return e.privKey.N.Cmp(other.privKey.N) == 0 && e.privKey.E == other.privKey.E +} + +func (e *EncryptionRsaPrivKey) Raw() ([]byte, error) { + b := x509.MarshalPKCS1PrivateKey(&e.privKey) + return b, nil +} + +func (e *EncryptionRsaPrivKey) Decrypt(bytes []byte) ([]byte, error) { + hash := sha512.New() + return rsa.DecryptOAEP(hash, rand.Reader, &e.privKey, bytes, nil) +} + +func (e *EncryptionRsaPrivKey) GetPublic() EncryptionPubKey { + return &EncryptionRsaPubKey{pubKey: e.privKey.PublicKey} +} + +func GenerateRandomRSAKeyPair(bits int) (EncryptionPrivKey, EncryptionPubKey, error) { + return GenerateRSAKeyPair(bits, rand.Reader) +} + +func GenerateRSAKeyPair(bits int, src io.Reader) (EncryptionPrivKey, EncryptionPubKey, error) { + if bits < MinRsaKeyBits { + return nil, nil, ErrKeyLengthTooSmall + } + priv, err := rsa.GenerateKey(src, bits) + if err != nil { + return nil, nil, err + } + pk := priv.PublicKey + return &EncryptionRsaPrivKey{privKey: *priv}, &EncryptionRsaPubKey{pubKey: pk}, nil +} + +func NewEncryptionRsaPrivKeyFromBytes(bytes []byte) (EncryptionPrivKey, error) { + sk, err := x509.ParsePKCS1PrivateKey(bytes) + if err != nil { + return nil, err + } + if sk.N.BitLen() < MinRsaKeyBits { + return nil, ErrKeyLengthTooSmall + } + return &EncryptionRsaPrivKey{privKey: *sk}, nil +} + +func NewEncryptionRsaPubKeyFromBytes(bytes []byte) (EncryptionPubKey, error) { + pub, err := x509.ParsePKIXPublicKey(bytes) + if err != nil { + return nil, err + } + pk, ok := pub.(*rsa.PublicKey) + if !ok { + return nil, errors.New("not actually an rsa public key") + } + if pk.N.BitLen() < MinRsaKeyBits { + return nil, ErrKeyLengthTooSmall + } + + return &EncryptionRsaPubKey{pubKey: *pk}, nil +} + +func NewSigningEd25519PubKeyFromBytes(bytes []byte) (SigningPubKey, error) { + return crypto.UnmarshalEd25519PublicKey(bytes) +} + +func GenerateRandomEd25519KeyPair() (SigningPrivKey, SigningPubKey, error) { + return crypto.GenerateEd25519Key(rand.Reader) +} + +func keyEquals(k1, k2 Key) bool { + a, err := k1.Raw() + if err != nil { + return false + } + b, err := k2.Raw() + if err != nil { + return false + } + return subtle.ConstantTimeCompare(a, b) == 1 +} + +type Ed25519SigningPubKeyDecoder struct{} + +func NewEd25519Decoder() *Ed25519SigningPubKeyDecoder { + return &Ed25519SigningPubKeyDecoder{} +} + +func (e *Ed25519SigningPubKeyDecoder) DecodeFromBytes(bytes []byte) (SigningPubKey, error) { + return NewSigningEd25519PubKeyFromBytes(bytes) +} + +func (e *Ed25519SigningPubKeyDecoder) DecodeFromString(identity string) (SigningPubKey, error) { + pubKeyRaw, err := strkey.Decode(0x5b, identity) + if err != nil { + return nil, err + } + + return e.DecodeFromBytes(pubKeyRaw) +} + +func (e *Ed25519SigningPubKeyDecoder) DecodeFromStringIntoBytes(identity string) ([]byte, error) { + return strkey.Decode(0x5b, identity) +} + +func (e *Ed25519SigningPubKeyDecoder) EncodeToString(pubkey crypto.PubKey) (string, error) { + raw, err := pubkey.Raw() + if err != nil { + return "", err + } + return strkey.Encode(0x5b, raw) +} + +type SigningPubKeyDecoder interface { + DecodeFromBytes(bytes []byte) (SigningPubKey, error) + DecodeFromString(identity string) (SigningPubKey, error) + DecodeFromStringIntoBytes(identity string) ([]byte, error) +} + +// Below keys are required for testing and mocking purposes + +type EmptyRecorderEncryptionKey struct { + recordedEncrypted [][]byte + recordedDecrypted [][]byte +} + +func (f *EmptyRecorderEncryptionKey) Equals(key Key) bool { + return true +} + +func (f *EmptyRecorderEncryptionKey) Raw() ([]byte, error) { + panic("can't get bytes from this key") +} + +func (f *EmptyRecorderEncryptionKey) GetPublic() EncryptionPubKey { + panic("this key doesn't have a public key") +} + +func (f *EmptyRecorderEncryptionKey) Encrypt(msg []byte) ([]byte, error) { + f.recordedEncrypted = append(f.recordedEncrypted, msg) + return msg, nil +} + +func (f *EmptyRecorderEncryptionKey) Decrypt(msg []byte) ([]byte, error) { + f.recordedDecrypted = append(f.recordedDecrypted, msg) + return msg, nil +} + +type SignatureVerificationPayload struct { + message []byte + signature []byte +} + +type EmptyRecorderVerificationKey struct { + verifications []SignatureVerificationPayload +} + +func (e *EmptyRecorderVerificationKey) Bytes() ([]byte, error) { + panic("can't get bytes from this key") +} + +func (e *EmptyRecorderVerificationKey) Equals(key crypto.Key) bool { + return true +} + +func (e *EmptyRecorderVerificationKey) Raw() ([]byte, error) { + panic("can't get bytes from this key") +} + +func (e *EmptyRecorderVerificationKey) Type() crypto_pb.KeyType { + panic("can't get type from this key") +} + +func (e *EmptyRecorderVerificationKey) Verify(data []byte, sig []byte) (bool, error) { + e.verifications = append(e.verifications, SignatureVerificationPayload{ + message: data, + signature: sig, + }) + return true, nil +} + +func NewMockSigningPubKeyFromBytes(bytes []byte) (SigningPubKey, error) { + return &EmptyRecorderVerificationKey{}, nil +} diff --git a/data/threadmodels/models.go b/data/threadmodels/models.go new file mode 100644 index 00000000..a9ef39cc --- /dev/null +++ b/data/threadmodels/models.go @@ -0,0 +1,34 @@ +package threadmodels + +import ( + "context" + + "github.com/gogo/protobuf/proto" +) + +type Thread interface { + ID() string + GetLogs() ([]ThreadLog, error) + GetRecord(ctx context.Context, recordID string) (*ThreadRecord, error) + PushRecord(payload proto.Marshaler) (id string, err error) + + // SubscribeForRecords() +} + +type SignedPayload struct { + Payload []byte + Signature []byte +} + +type ThreadRecord struct { + PrevId string + Id string + LogId string + Signed *SignedPayload +} + +type ThreadLog struct { + ID string + Head string + Counter int64 +} diff --git a/data/threadmodels/threadid.go b/data/threadmodels/threadid.go new file mode 100644 index 00000000..4d2f0392 --- /dev/null +++ b/data/threadmodels/threadid.go @@ -0,0 +1,71 @@ +package threadmodels + +import ( + "crypto/rand" + "encoding/binary" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/core/smartblock" + "github.com/textileio/go-threads/core/thread" + "hash/fnv" +) + +func CreateACLThreadID(k SigningPubKey, blockType smartblock.SmartBlockType) (thread.ID, error) { + rndlen := 32 + buf := make([]byte, 8+rndlen) + + // adding random bytes in the end + _, err := rand.Read(buf[8 : 8+rndlen]) + if err != nil { + panic("random read failed") + } + + keyBytes, err := k.Bytes() + if err != nil { + return thread.Undef, err + } + + hasher := fnv.New64() + hasher.Write(keyBytes) + res := hasher.Sum64() + + // putting hash of the pubkey in the beginning + binary.LittleEndian.PutUint64(buf[:8], res) + + return threadIDFromBytes(blockType, buf) +} + +func VerifyACLThreadID(k SigningPubKey, threadId thread.ID) (bool, error) { + bytes := threadId.Bytes() + pubKeyBytes := threadId.Bytes()[len(bytes)-40 : len(bytes)-32] + hash := binary.LittleEndian.Uint64(pubKeyBytes) + + keyBytes, err := k.Bytes() + if err != nil { + return false, err + } + + hasher := fnv.New64() + hasher.Write(keyBytes) + realHash := hasher.Sum64() + + return hash == realHash, nil +} + +func threadIDFromBytes( + blockType smartblock.SmartBlockType, + b []byte) (thread.ID, error) { + blen := len(b) + + // two 8 bytes (max) numbers plus num + buf := make([]byte, 2*binary.MaxVarintLen64+blen) + n := binary.PutUvarint(buf, thread.V1) + n += binary.PutUvarint(buf[n:], uint64(thread.AccessControlled)) + n += binary.PutUvarint(buf[n:], uint64(blockType)) + + cn := copy(buf[n:], b) + if cn != blen { + return thread.Undef, fmt.Errorf("copy length is inconsistent") + } + + return thread.Cast(buf[:n+blen]) +} diff --git a/data/threadmodels/threadid_test.go b/data/threadmodels/threadid_test.go new file mode 100644 index 00000000..c6516aae --- /dev/null +++ b/data/threadmodels/threadid_test.go @@ -0,0 +1,27 @@ +package threadmodels + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/core/smartblock" + "testing" +) + +func TestCreateACLThreadIDVerify(t *testing.T) { + _, pubKey, err := GenerateRandomEd25519KeyPair() + if err != nil { + t.Fatalf("should not return error after generating key pair: %v", err) + } + + thread, err := CreateACLThreadID(pubKey, smartblock.SmartBlockTypeWorkspace) + if err != nil { + t.Fatalf("should not return error after generating thread: %v", err) + } + + verified, err := VerifyACLThreadID(pubKey, thread) + if err != nil { + t.Fatalf("verification should not return error: %v", err) + } + + if !verified { + t.Fatalf("the thread should be verified") + } +} diff --git a/data/tree.go b/data/tree.go new file mode 100644 index 00000000..0e58823d --- /dev/null +++ b/data/tree.go @@ -0,0 +1,375 @@ +package data + +import ( + "bytes" + "crypto/md5" + "fmt" + "sort" +) + +type Mode int + +const ( + Append Mode = iota + Rebuild + Nothing +) + +type Tree struct { + root *Change + headIds []string + metaHeadIds []string + attached map[string]*Change + unAttached map[string]*Change + // missed id -> list of dependency ids + waitList map[string][]string + invalidChanges map[string]struct{} + + // bufs + iterCompBuf []*Change + iterQueue []*Change + + duplicateEvents int +} + +func (t *Tree) RootId() string { + if t.root != nil { + return t.root.Id + } + return "" +} + +func (t *Tree) Root() *Change { + return t.root +} + +func (t *Tree) AddFast(changes ...*Change) { + for _, c := range changes { + // ignore existing + if _, ok := t.attached[c.Id]; ok { + continue + } else if _, ok := t.unAttached[c.Id]; ok { + continue + } + t.add(c) + } + t.updateHeads() +} + +func (t *Tree) Add(changes ...*Change) (mode Mode) { + var beforeHeadIds = t.headIds + var attached bool + var empty = t.Len() == 0 + for _, c := range changes { + // ignore existing + if _, ok := t.attached[c.Id]; ok { + continue + } else if _, ok := t.unAttached[c.Id]; ok { + continue + } + if t.add(c) { + attached = true + } + } + if !attached { + return Nothing + } + t.updateHeads() + if empty { + return Rebuild + } + for _, hid := range beforeHeadIds { + for _, newCh := range changes { + if _, ok := t.attached[newCh.Id]; ok { + if !t.after(newCh.Id, hid) { + return Rebuild + } + } + } + } + return Append +} + +func (t *Tree) RemoveInvalidChange(id string) { + stack := []string{id} + // removing all children of this id (either next or unattached) + for len(stack) > 0 { + var exists bool + top := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + if _, exists = t.invalidChanges[top]; exists { + continue + } + + var rem *Change + t.invalidChanges[top] = struct{}{} + if rem, exists = t.unAttached[top]; exists { + delete(t.unAttached, top) + } else if rem, exists = t.attached[top]; exists { + // remove from all prev changes + for _, id := range rem.PreviousIds { + prev, exists := t.attached[id] + if !exists { + continue + } + for i, next := range prev.Next { + if next.Id == top { + prev.Next[i] = nil + prev.Next = append(prev.Next[:i], prev.Next[i+1:]...) + break + } + } + } + delete(t.attached, top) + } + for _, el := range rem.Unattached { + stack = append(stack, el.Id) + } + for _, el := range rem.Next { + stack = append(stack, el.Id) + } + } +} + +func (t *Tree) add(c *Change) (attached bool) { + if c == nil { + return false + } + if _, exists := t.invalidChanges[c.Id]; exists { + return false + } + + if t.root == nil { // first element + t.root = c + t.attached = map[string]*Change{ + c.Id: c, + } + t.unAttached = make(map[string]*Change) + t.waitList = make(map[string][]string) + t.invalidChanges = make(map[string]struct{}) + return true + } + if len(c.PreviousIds) > 1 { + sort.Strings(c.PreviousIds) + } + // attaching only if all prev ids are attached + attached = true + for _, pid := range c.PreviousIds { + if prev, ok := t.attached[pid]; ok { + prev.Unattached = append(prev.Unattached, c) + continue + } + attached = false + if prev, ok := t.unAttached[pid]; ok { + prev.Unattached = append(prev.Unattached, c) + continue + } + wl := t.waitList[pid] + wl = append(wl, c.Id) + t.waitList[pid] = wl + } + if attached { + t.attach(c, true) + } else { + // clearing wait list + for _, wid := range t.waitList[c.Id] { + c.Unattached = append(c.Unattached, t.unAttached[wid]) + } + delete(t.waitList, c.Id) + t.unAttached[c.Id] = c + } + return +} + +func (t *Tree) canAttach(c *Change) (attach bool) { + if c == nil { + return false + } + attach = true + for _, id := range c.PreviousIds { + if _, exists := t.attached[id]; !exists { + attach = false + } + } + return +} + +func (t *Tree) attach(c *Change, newEl bool) { + t.attached[c.Id] = c + if !newEl { + delete(t.unAttached, c.Id) + } + + // add next to all prev changes + for _, id := range c.PreviousIds { + // prev id must be attached if we attach this id + prev := t.attached[id] + prev.Next = append(prev.Next, c) + if len(prev.Next) > 1 { + sort.Sort(sortChanges(prev.Next)) + } + for i, next := range prev.Unattached { + if next.Id == c.Id { + prev.Unattached[i] = nil + prev.Unattached = append(prev.Unattached[:i], prev.Unattached[i+1:]...) + break + } + } + } + + // clearing wait list + if waitIds, ok := t.waitList[c.Id]; ok { + for _, wid := range waitIds { + next := t.unAttached[wid] + if t.canAttach(next) { + t.attach(next, false) + } + } + delete(t.waitList, c.Id) + } + + for _, next := range c.Unattached { + if t.canAttach(next) { + t.attach(next, false) + } + } +} + +func (t *Tree) after(id1, id2 string) (found bool) { + t.iterate(t.attached[id2], func(c *Change) (isContinue bool) { + if c.Id == id1 { + found = true + return false + } + return true + }) + return +} + +func (t *Tree) dfs(startChange string) (uniqMap map[string]*Change) { + stack := make([]*Change, 0, 10) + stack = append(stack, t.attached[startChange]) + uniqMap = map[string]*Change{} + + for len(stack) > 0 { + ch := stack[len(stack)-1] + stack = stack[:len(stack)-1] + if _, exists := uniqMap[ch.Id]; exists { + continue + } + + uniqMap[ch.Id] = ch + + for _, prev := range ch.PreviousIds { + stack = append(stack, t.attached[prev]) + } + } + return uniqMap +} + +func (t *Tree) updateHeads() { + var newHeadIds, newMetaHeadIds []string + t.iterate(t.root, func(c *Change) (isContinue bool) { + if len(c.Next) == 0 { + newHeadIds = append(newHeadIds, c.Id) + } + return true + }) + t.headIds = newHeadIds + t.metaHeadIds = newMetaHeadIds + sort.Strings(t.headIds) + sort.Strings(t.metaHeadIds) +} + +func (t *Tree) iterate(start *Change, f func(c *Change) (isContinue bool)) { + it := newIterator() + defer freeIterator(it) + it.iterate(start, f) +} + +func (t *Tree) iterateSkip(start *Change, skipBefore *Change, f func(c *Change) (isContinue bool)) { + it := newIterator() + defer freeIterator(it) + it.iterateSkip(start, skipBefore, f) +} + +func (t *Tree) Iterate(startId string, f func(c *Change) (isContinue bool)) { + t.iterate(t.attached[startId], f) +} + +func (t *Tree) IterateBranching(startId string, f func(c *Change, branchLevel int) (isContinue bool)) { + // branchLevel indicates the number of parallel branches + var bc int + t.iterate(t.attached[startId], func(c *Change) (isContinue bool) { + if pl := len(c.PreviousIds); pl > 1 { + bc -= pl - 1 + } + bl := bc + if nl := len(c.Next); nl > 1 { + bc += nl - 1 + } + return f(c, bl) + }) +} + +func (t *Tree) Hash() string { + h := md5.New() + n := 0 + t.iterate(t.root, func(c *Change) (isContinue bool) { + n++ + fmt.Fprintf(h, "-%s", c.Id) + return true + }) + return fmt.Sprintf("%d-%x", n, h.Sum(nil)) +} + +func (t *Tree) GetDuplicateEvents() int { + return t.duplicateEvents +} + +func (t *Tree) ResetDuplicateEvents() { + t.duplicateEvents = 0 +} + +func (t *Tree) Len() int { + return len(t.attached) +} + +func (t *Tree) Heads() []string { + return t.headIds +} + +func (t *Tree) String() string { + var buf = bytes.NewBuffer(nil) + t.Iterate(t.RootId(), func(c *Change) (isContinue bool) { + buf.WriteString(c.Id) + if len(c.Next) > 1 { + buf.WriteString("-<") + } else if len(c.Next) > 0 { + buf.WriteString("->") + } else { + buf.WriteString("-|") + } + return true + }) + return buf.String() +} + +func (t *Tree) Get(id string) *Change { + return t.attached[id] +} + +type sortChanges []*Change + +func (s sortChanges) Len() int { + return len(s) +} + +func (s sortChanges) Less(i, j int) bool { + return s[i].Id < s[j].Id +} + +func (s sortChanges) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/data/treebuilder.go b/data/treebuilder.go new file mode 100644 index 00000000..0a226d70 --- /dev/null +++ b/data/treebuilder.go @@ -0,0 +1,437 @@ +package data + +import ( + "context" + "errors" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/data/threadmodels" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/logging" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" + "github.com/gogo/protobuf/proto" + "github.com/textileio/go-threads/core/thread" + "sort" + "time" +) + +var ( + log = logging.Logger("anytype-data") + ErrEmpty = errors.New("logs empty") +) + +type TreeBuilder struct { + cache map[string]*Change + logHeads map[string]*Change + identityKeys map[string]threadmodels.SigningPubKey + signingPubKeyDecoder threadmodels.SigningPubKeyDecoder + tree *Tree + thread threadmodels.Thread +} + +func NewTreeBuilder(t threadmodels.Thread, decoder threadmodels.SigningPubKeyDecoder) *TreeBuilder { + return &TreeBuilder{ + cache: make(map[string]*Change), + logHeads: make(map[string]*Change), + identityKeys: make(map[string]threadmodels.SigningPubKey), + signingPubKeyDecoder: decoder, + tree: &Tree{}, // TODO: add NewTree method + thread: t, + } +} + +func (tb *TreeBuilder) loadChange(id string) (ch *Change, err error) { + if ch, ok := tb.cache[id]; ok { + return ch, nil + } + + // TODO: Add virtual changes logic + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + record, err := tb.thread.GetRecord(ctx, id) + if err != nil { + return nil, err + } + + aclChange := new(pb.ACLChange) + + // TODO: think what should we do with such cases, because this can be used by attacker to break our tree + if err = proto.Unmarshal(record.Signed.Payload, aclChange); err != nil { + return + } + var verified bool + verified, err = tb.verify(aclChange.Identity, record.Signed.Payload, record.Signed.Signature) + if err != nil { + return + } + if !verified { + err = fmt.Errorf("the signature of the payload cannot be verified") + return + } + + ch, err = NewChange(id, aclChange) + tb.cache[id] = ch + + return ch, nil +} + +func (tb *TreeBuilder) verify(identity string, payload, signature []byte) (isVerified bool, err error) { + identityKey, exists := tb.identityKeys[identity] + if !exists { + identityKey, err = tb.signingPubKeyDecoder.DecodeFromString(identity) + if err != nil { + return + } + tb.identityKeys[identity] = identityKey + } + return identityKey.Verify(payload, signature) +} + +func (tb *TreeBuilder) getLogs() (logs []threadmodels.ThreadLog, err error) { + // TODO: Add beforeId building logic + logs, err = tb.thread.GetLogs() + if err != nil { + return nil, fmt.Errorf("GetLogs error: %w", err) + } + + log.Debugf("build tree: logs: %v", logs) + if len(logs) == 0 || len(logs) == 1 && len(logs[0].Head) <= 1 { + return nil, ErrEmpty + } + var nonEmptyLogs = logs[:0] + for _, l := range logs { + if len(l.Head) == 0 { + continue + } + if ch, err := tb.loadChange(l.Head); err != nil { + log.Errorf("loading head %s of the log %s failed: %v", l.Head, l.ID, err) + } else { + tb.logHeads[l.ID] = ch + } + nonEmptyLogs = append(nonEmptyLogs, l) + } + return nonEmptyLogs, nil +} + +func (tb *TreeBuilder) Build(fromStart bool) (*Tree, error) { + logs, err := tb.getLogs() + if err != nil { + return nil, err + } + + // TODO: check if this should be changed if we are building from start + heads, err := tb.getActualHeads(logs) + if err != nil { + return nil, fmt.Errorf("get acl heads error: %v", err) + } + + if fromStart { + if err = tb.buildTreeFromStart(heads); err != nil { + return nil, fmt.Errorf("buildTree error: %v", err) + } + } else { + breakpoint, err := tb.findBreakpoint(heads) + if err != nil { + return nil, fmt.Errorf("findBreakpoint error: %v", err) + } + + if err = tb.buildTree(heads, breakpoint); err != nil { + return nil, fmt.Errorf("buildTree error: %v", err) + } + } + + tb.cache = nil + + return tb.tree, nil +} + +func (tb *TreeBuilder) buildTreeFromStart(heads []string) (err error) { + changes, possibleRoots, err := tb.dfsFromStart(heads) + if len(possibleRoots) == 0 { + return fmt.Errorf("cannot have tree without root") + } + root, err := tb.getRoot(possibleRoots) + if err != nil { + return err + } + + tb.tree.AddFast(root) + tb.tree.AddFast(changes...) + return +} + +func (tb *TreeBuilder) dfsFromStart(stack []string) (buf []*Change, possibleRoots []*Change, err error) { + buf = make([]*Change, 0, len(stack)*2) + uniqMap := make(map[string]struct{}) + + for len(stack) > 0 { + id := stack[len(stack)-1] + stack = stack[:len(stack)-1] + if _, exists := uniqMap[id]; exists { + continue + } + + ch, err := tb.loadChange(id) + if err != nil { + continue + } + + uniqMap[id] = struct{}{} + buf = append(buf, ch) + + for _, prev := range ch.PreviousIds { + stack = append(stack, prev) + } + if len(ch.PreviousIds) == 0 { + possibleRoots = append(possibleRoots, ch) + } + } + return buf, possibleRoots, nil +} + +func (tb *TreeBuilder) buildTree(heads []string, breakpoint string) (err error) { + ch, err := tb.loadChange(breakpoint) + if err != nil { + return + } + tb.tree.AddFast(ch) + changes, err := tb.dfs(heads, breakpoint) + + tb.tree.AddFast(changes...) + return +} + +func (tb *TreeBuilder) dfs(stack []string, breakpoint string) (buf []*Change, err error) { + buf = make([]*Change, 0, len(stack)*2) + uniqMap := map[string]struct{}{breakpoint: {}} + for len(stack) > 0 { + id := stack[len(stack)-1] + stack = stack[:len(stack)-1] + if _, exists := uniqMap[id]; exists { + continue + } + + ch, err := tb.loadChange(id) + if err != nil { + continue + } + + uniqMap[id] = struct{}{} + buf = append(buf, ch) + + for _, prev := range ch.PreviousIds { + stack = append(stack, prev) + } + } + return buf, nil +} + +func (tb *TreeBuilder) getActualHeads(logs []threadmodels.ThreadLog) (heads []string, err error) { + sort.Slice(logs, func(i, j int) bool { + return logs[i].ID < logs[j].ID + }) + var knownHeads []string + var validLogs = logs[:0] + for _, l := range logs { + if slice.FindPos(knownHeads, l.Head) != -1 { // do not scan known heads + continue + } + sh, err := tb.getNearSnapshot(l.Head) + if err != nil { + log.Warnf("can't get near snapshot: %v; ignore", err) + continue + } + if sh.LogHeads != nil { + for _, headId := range sh.LogHeads { + knownHeads = append(knownHeads, headId) + } + } + validLogs = append(validLogs, l) + } + for _, l := range validLogs { + if slice.FindPos(knownHeads, l.Head) != -1 { // do not scan known heads + continue + } else { + heads = append(heads, l.Head) + } + } + if len(heads) == 0 { + return nil, fmt.Errorf("no usable logs in head") + } + return +} + +func (tb *TreeBuilder) getNearSnapshot(id string) (sh *Change, err error) { + ch, err := tb.loadChange(id) + if err != nil { + return + } + + if ch.IsSnapshot { + sh = ch + } else { + sh, err = tb.loadChange(ch.SnapshotId) + if err != nil { + return nil, err + } + } + + return sh, nil +} + +func (tb *TreeBuilder) findBreakpoint(heads []string) (breakpoint string, err error) { + var ( + ch *Change + snapshotIds []string + ) + for _, head := range heads { + if ch, err = tb.loadChange(head); err != nil { + return + } + shId := ch.SnapshotId + if slice.FindPos(snapshotIds, shId) == -1 { + snapshotIds = append(snapshotIds, shId) + } + } + return tb.findCommonSnapshot(snapshotIds) +} + +func (tb *TreeBuilder) findCommonSnapshot(snapshotIds []string) (snapshotId string, err error) { + if len(snapshotIds) == 1 { + return snapshotIds[0], nil + } else if len(snapshotIds) == 0 { + return "", fmt.Errorf("snapshots not found") + } + + for len(snapshotIds) > 1 { + l := len(snapshotIds) + shId, e := tb.findCommonForTwoSnapshots(snapshotIds[l-2], snapshotIds[l-1]) + if e != nil { + return "", e + } + snapshotIds[l-2] = shId + snapshotIds = snapshotIds[:l-1] + } + return snapshotIds[0], nil +} + +func (tb *TreeBuilder) findCommonForTwoSnapshots(s1, s2 string) (s string, err error) { + // fast cases + if s1 == s2 { + return s1, nil + } + ch1, err := tb.loadChange(s1) + if err != nil { + return "", err + } + if ch1.SnapshotId == s2 { + return s2, nil + } + ch2, err := tb.loadChange(s2) + if err != nil { + return "", err + } + if ch2.SnapshotId == s1 { + return s1, nil + } + if ch1.SnapshotId == ch2.SnapshotId && ch1.SnapshotId != "" { + return ch1.SnapshotId, nil + } + // traverse + var t1 = make([]string, 0, 5) + var t2 = make([]string, 0, 5) + t1 = append(t1, ch1.Id, ch1.SnapshotId) + t2 = append(t2, ch2.Id, ch2.SnapshotId) + for { + lid1 := t1[len(t1)-1] + if lid1 != "" { + l1, e := tb.loadChange(lid1) + if e != nil { + return "", e + } + if l1.SnapshotId != "" { + if slice.FindPos(t2, l1.SnapshotId) != -1 { + return l1.SnapshotId, nil + } + } + t1 = append(t1, l1.SnapshotId) + } + lid2 := t2[len(t2)-1] + if lid2 != "" { + l2, e := tb.loadChange(t2[len(t2)-1]) + if e != nil { + return "", e + } + if l2.SnapshotId != "" { + if slice.FindPos(t1, l2.SnapshotId) != -1 { + return l2.SnapshotId, nil + } + } + t2 = append(t2, l2.SnapshotId) + } + if lid1 == "" && lid2 == "" { + break + } + } + + log.Warnf("changes build tree: possible versions split") + + // prefer not first snapshot + if len(ch1.PreviousIds) == 0 && len(ch2.PreviousIds) > 0 { + log.Warnf("changes build tree: prefer %s(%d prevIds) over %s(%d prevIds)", s2, len(ch2.PreviousIds), s1, len(ch1.PreviousIds)) + return s2, nil + } else if len(ch1.PreviousIds) > 0 && len(ch2.PreviousIds) == 0 { + log.Warnf("changes build tree: prefer %s(%d prevIds) over %s(%d prevIds)", s1, len(ch1.PreviousIds), s2, len(ch2.PreviousIds)) + return s1, nil + } + + isEmptySnapshot := func(ch *Change) bool { + // TODO: add more sophisticated checks in Change for snapshots + return !ch.IsSnapshot + } + + // TODO: can we even have empty snapshots? + // prefer not empty snapshot + if isEmptySnapshot(ch1) && !isEmptySnapshot(ch2) { + log.Warnf("changes build tree: prefer %s(not empty) over %s(empty)", s2, s1) + return s2, nil + } else if isEmptySnapshot(ch2) && !isEmptySnapshot(ch1) { + log.Warnf("changes build tree: prefer %s(not empty) over %s(empty)", s1, s2) + return s1, nil + } + + // TODO: add virtual change mechanics + // unexpected behavior - just return lesser id + if s1 < s2 { + log.Warnf("changes build tree: prefer %s (%s<%s)", s1, s1, s2) + return s1, nil + } + log.Warnf("changes build tree: prefer %s (%s<%s)", s2, s2, s1) + + return s2, nil +} + +func (tb *TreeBuilder) getRoot(possibleRoots []*Change) (*Change, error) { + threadId, err := thread.Decode(tb.thread.ID()) + if err != nil { + return nil, err + } + + for _, r := range possibleRoots { + id := r.Content.Identity + sk, err := tb.signingPubKeyDecoder.DecodeFromString(id) + if err != nil { + continue + } + + res, err := threadmodels.VerifyACLThreadID(sk, threadId) + if err != nil { + continue + } + + if res { + return r, nil + } + } + return nil, fmt.Errorf("could not find any root") +} diff --git a/data/treebuilder_test.go b/data/treebuilder_test.go new file mode 100644 index 00000000..fa4a8971 --- /dev/null +++ b/data/treebuilder_test.go @@ -0,0 +1,58 @@ +package data + +//func TestACLTreeBuilder_UserJoinCorrectHeadsAndLen(t *testing.T) { +// thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userjoinexample.yml") +// if err != nil { +// t.Fatal(err) +// } +// +// res, err := createTreeFromThread(thread) +// if err != nil { +// t.Fatalf("build tree should not result in an error: %v", res) +// } +// +// assert.Equal(t, res.Heads(), []string{"C.1.1"}) +// assert.Equal(t, res.Len(), 4) +//} +// +//func TestTreeBuilder_UserJoinTestTreeIterate(t *testing.T) { +// thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userjoinexample.yml") +// if err != nil { +// t.Fatal(err) +// } +// +// res, err := createTreeFromThread(thread) +// if err != nil { +// t.Fatalf("build tree should not result in an error: %v", res) +// } +// +// assert.Equal(t, res.Heads(), []string{"C.1.1"}) +// assert.Equal(t, res.Len(), 4) +// var changeIds []string +// res.iterate(res.root, func(c *Change) (isContinue bool) { +// changeIds = append(changeIds, c.Id) +// return true +// }) +// assert.Equal(t, changeIds, []string{"A.1.1", "A.1.2", "B.1.1", "C.1.1"}) +//} +// +//func TestTreeBuilder_UserRemoveTestTreeIterate(t *testing.T) { +// thread, err := threadbuilder.NewThreadBuilderFromFile("threadbuilder/userremoveexample.yml") +// if err != nil { +// t.Fatal(err) +// } +// +// res, err := createTreeFromThread(thread) +// if err != nil { +// t.Fatalf("build tree should not result in an error: %v", res) +// } +// +// assert.Equal(t, res.Heads(), []string{"A.1.3"}) +// assert.Equal(t, res.Len(), 4) +// var changeIds []string +// res.iterate(res.root, func(c *Change) (isContinue bool) { +// changeIds = append(changeIds, c.Id) +// return true +// }) +// assert.Equal(t, changeIds, []string{"A.1.1", "A.1.2", "B.1.1", "A.1.3"}) +//} diff --git a/data/treegraph.go b/data/treegraph.go new file mode 100644 index 00000000..60dc9967 --- /dev/null +++ b/data/treegraph.go @@ -0,0 +1,11 @@ +//go:build ((!linux && !darwin) || android || ios || nographviz) && !amd64 +// +build !linux,!darwin android ios nographviz +// +build !amd64 + +package data + +import "fmt" + +func (t *Tree) Graph() (data string, err error) { + return "", fmt.Errorf("not supported") +} diff --git a/data/treegraph_nix.go b/data/treegraph_nix.go new file mode 100644 index 00000000..b06f7435 --- /dev/null +++ b/data/treegraph_nix.go @@ -0,0 +1,149 @@ +//go:build (linux || darwin) && !android && !ios && !nographviz && (amd64 || arm64) +// +build linux darwin +// +build !android +// +build !ios +// +build !nographviz +// +build amd64 arm64 + +package data + +import ( + "bytes" + "fmt" + "github.com/goccy/go-graphviz" + "github.com/goccy/go-graphviz/cgraph" + "strings" + "time" + "unicode" +) + +func (t *Tree) Graph() (data string, err error) { + var order = make(map[string]string) + var seq = 0 + t.Iterate(t.RootId(), func(c *Change) (isContinue bool) { + v := order[c.Id] + if v == "" { + order[c.Id] = fmt.Sprint(seq) + } else { + order[c.Id] = fmt.Sprintf("%s,%d", v, seq) + } + seq++ + return true + }) + g := graphviz.New() + defer g.Close() + graph, err := g.Graph() + if err != nil { + return + } + defer func() { + err = graph.Close() + }() + var nodes = make(map[string]*cgraph.Node) + var addChange = func(c *Change) error { + n, e := graph.CreateNode(c.Id) + if e != nil { + return e + } + if c.Content.GetAclData() != nil { + n.SetStyle(cgraph.FilledNodeStyle) + } else if c.IsSnapshot { + n.SetStyle(cgraph.DashedNodeStyle) + } + nodes[c.Id] = n + ord := order[c.Id] + if ord == "" { + ord = "miss" + } + var chSymbs []string + if c.Content.AclData != nil { + for _, chc := range c.Content.AclData.AclContent { + tp := fmt.Sprintf("%T", chc.Value) + tp = strings.Replace(tp, "ACLChangeACLContentValueValueOf", "", 1) + res := "" + for _, ts := range tp { + if unicode.IsUpper(ts) { + res += string(ts) + } + } + chSymbs = append(chSymbs, res) + } + } + if c.DecryptedDocumentChange != nil { + for _, chc := range c.DecryptedDocumentChange.Content { + tp := fmt.Sprintf("%T", chc.Value) + tp = strings.Replace(tp, "ChangeContentValueOf", "", 1) + res := "" + for _, ts := range tp { + if unicode.IsUpper(ts) { + res += string(ts) + } + } + chSymbs = append(chSymbs, res) + } + } + + shortId := c.Id + label := fmt.Sprintf("Id: %s\nOrd: %s\nTime: %s\nChanges: %s\n", + shortId, + ord, + time.Unix(c.Content.Timestamp, 0).Format("02.01.06 15:04:05"), + strings.Join(chSymbs, ","), + ) + n.SetLabel(label) + return nil + } + for _, c := range t.attached { + if err = addChange(c); err != nil { + return + } + } + for _, c := range t.unAttached { + if err = addChange(c); err != nil { + return + } + } + var getNode = func(id string) (*cgraph.Node, error) { + if n, ok := nodes[id]; ok { + return n, nil + } + n, err := graph.CreateNode(fmt.Sprintf("%s: not in tree", id)) + if err != nil { + return nil, err + } + nodes[id] = n + return n, nil + } + var addLinks = func(c *Change) error { + for _, prevId := range c.PreviousIds { + self, e := getNode(c.Id) + if e != nil { + return e + } + prev, e := getNode(prevId) + if e != nil { + return e + } + _, e = graph.CreateEdge("", self, prev) + if e != nil { + return e + } + } + return nil + } + for _, c := range t.attached { + if err = addLinks(c); err != nil { + return + } + } + for _, c := range t.unAttached { + if err = addLinks(c); err != nil { + return + } + } + var buf bytes.Buffer + if err = g.Render(graph, "dot", &buf); err != nil { + return + } + return buf.String(), nil +} diff --git a/data/treeiterator.go b/data/treeiterator.go new file mode 100644 index 00000000..dd08b6df --- /dev/null +++ b/data/treeiterator.go @@ -0,0 +1,150 @@ +package data + +import "sync" + +var itPool = &sync.Pool{ + New: func() interface{} { + return &iterator{} + }, +} + +func newIterator() *iterator { + return itPool.Get().(*iterator) +} + +func freeIterator(i *iterator) { + itPool.Put(i) +} + +type iterator struct { + compBuf []*Change + queue []*Change + doneMap map[*Change]struct{} + breakpoint *Change + f func(c *Change) bool +} + +func (i *iterator) iterateSkip(start *Change, skipBefore *Change, f func(c *Change) (isContinue bool)) { + skipping := true + i.iterate(start, func(c *Change) (isContinue bool) { + if skipping && c != skipBefore { + return true + } + skipping = false + return f(c) + }) +} + +func (i *iterator) iterate(start *Change, f func(c *Change) (isContinue bool)) { + if start == nil { + return + } + // reset + i.queue = i.queue[:0] + i.compBuf = i.compBuf[:0] + i.doneMap = make(map[*Change]struct{}) + i.queue = append(i.queue, start) + i.breakpoint = nil + i.f = f + + for len(i.queue) > 0 { + c := i.queue[0] + i.queue = i.queue[1:] + nl := len(c.Next) + if nl == 1 { + if !i.iterateLin(c) { + return + } + if i.breakpoint != nil { + i.toQueue(i.breakpoint) + i.breakpoint = nil + } + } else { + _, done := i.doneMap[c] + if !done { + if !f(c) { + return + } + i.doneMap[c] = struct{}{} + } + if nl != 0 { + for _, next := range c.Next { + i.toQueue(next) + } + } + } + } +} + +func (i *iterator) iterateLin(c *Change) bool { + for len(c.Next) == 1 { + _, done := i.doneMap[c] + if !done { + if !i.f(c) { + return false + } + i.doneMap[c] = struct{}{} + } + + c = c.Next[0] + if len(c.PreviousIds) > 1 { + break + } + } + i.breakpoint = c + return true +} + +func (i *iterator) comp(c1, c2 *Change) uint8 { + if c1.Id == c2.Id { + return 0 + } + i.compBuf = i.compBuf[:0] + i.compBuf = append(i.compBuf, c1.Next...) + var uniq = make(map[*Change]struct{}) + var appendUniqueToBuf = func(next []*Change) { + for _, n := range next { + if _, ok := uniq[n]; !ok { + i.compBuf = append(i.compBuf, n) + uniq[n] = struct{}{} + } + } + } + var used int + for len(i.compBuf)-used > 0 { + l := len(i.compBuf) - used + for _, n := range i.compBuf[used:] { + delete(uniq, n) + if n.Id == c2.Id { + return 1 + } else { + appendUniqueToBuf(n.Next) + } + } + used += l + } + return 2 +} + +func (i *iterator) toQueue(c *Change) { + var pos = -1 +For: + for idx, qc := range i.queue { + switch i.comp(c, qc) { + // exists + case 0: + return + // + case 1: + pos = idx + break For + } + } + if pos == -1 { + i.queue = append(i.queue, c) + } else if pos == 0 { + i.queue = append([]*Change{c}, i.queue...) + } else { + i.queue = append(i.queue[:pos], append([]*Change{c}, i.queue[pos:]...)...) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..dd9e3e9e --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/anytypeio/go-anytype-infrastructure-experiments + +go 1.18 + +require ( + github.com/textileio/go-threads v1.0.2-0.20210304072541-d0f91da84404 +) + +replace github.com/textileio/go-threads => github.com/anytypeio/go-threads v1.1.0-rc1.0.20220223104843-a67245cee80e