From 0a118c489167f8bad094fea7b097020c7fa3daff Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 12 Jul 2022 00:30:24 +0200 Subject: [PATCH] Add inmemory thread and build logic --- acltree/threadutility.go | 32 ++++++++++ thread/inmemory.go | 112 ++++++++++++++++++++++++++++++++++ thread/pb/protos/thread.proto | 2 +- thread/pb/thread.pb.go | 24 ++++---- 4 files changed, 157 insertions(+), 13 deletions(-) create mode 100644 acltree/threadutility.go create mode 100644 thread/inmemory.go diff --git a/acltree/threadutility.go b/acltree/threadutility.go new file mode 100644 index 00000000..da35c74c --- /dev/null +++ b/acltree/threadutility.go @@ -0,0 +1,32 @@ +package acltree + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/thread" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys" +) + +func BuildThreadWithACL( + acc *account.AccountData, + build func(builder ChangeBuilder), + create func(change *thread.RawChange) (thread.Thread, error)) (thread.Thread, error) { + bld := newChangeBuilder() + bld.Init( + newACLState(acc.Identity, acc.EncKey, keys.NewEd25519Decoder()), + &Tree{}, + acc) + build(bld) + bld.SetMakeSnapshot(true) + + change, payload, err := bld.Build() + if err != nil { + return nil, err + } + + rawChange := &thread.RawChange{ + Payload: payload, + Signature: change.Signature(), + Id: change.CID(), + } + return create(rawChange) +} diff --git a/thread/inmemory.go b/thread/inmemory.go new file mode 100644 index 00000000..d5fa7478 --- /dev/null +++ b/thread/inmemory.go @@ -0,0 +1,112 @@ +package thread + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges" + "github.com/anytypeio/go-anytype-infrastructure-experiments/thread/pb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" + "github.com/gogo/protobuf/proto" + "sync" +) + +type inMemoryThread struct { + id string + header *pb.ThreadHeader + heads []string + orphans []string + changes map[string]*RawChange + + sync.Mutex +} + +func NewInMemoryThread(firstChange *RawChange) (Thread, error) { + header := &pb.ThreadHeader{ + FirstChangeId: firstChange.Id, + IsWorkspace: false, + } + marshalledHeader, err := proto.Marshal(header) + if err != nil { + return nil, err + } + threadId, err := cid.NewCIDFromBytes(marshalledHeader) + if err != nil { + return nil, err + } + + changes := make(map[string]*RawChange) + changes[firstChange.Id] = firstChange + + return &inMemoryThread{ + id: threadId, + header: header, + heads: []string{firstChange.Id}, + orphans: nil, + changes: changes, + Mutex: sync.Mutex{}, + }, nil +} + +func (t *inMemoryThread) ID() string { + return t.id +} + +func (t *inMemoryThread) Header() *pb.ThreadHeader { + return t.header +} + +func (t *inMemoryThread) Heads() []string { + return t.heads +} + +func (t *inMemoryThread) Orphans() []string { + return t.orphans +} + +func (t *inMemoryThread) SetHeads(heads []string) { + t.heads = t.heads[:0] + + for _, h := range heads { + t.heads = append(t.heads, h) + } +} + +func (t *inMemoryThread) RemoveOrphans(orphans ...string) { + t.orphans = slice.Difference(t.orphans, orphans) +} + +func (t *inMemoryThread) AddOrphans(orphans ...string) { + t.orphans = append(t.orphans, orphans...) +} + +func (t *inMemoryThread) AddRawChange(change *RawChange) error { + // TODO: better to do deep copy + t.changes[change.Id] = change + return nil +} + +func (t *inMemoryThread) AddChange(change aclchanges.Change) error { + signature := change.Signature() + id := change.CID() + aclChange := change.ProtoChange() + + fullMarshalledChange, err := proto.Marshal(aclChange) + if err != nil { + return err + } + rawChange := &RawChange{ + Payload: fullMarshalledChange, + Signature: signature, + Id: id, + } + t.changes[id] = rawChange + return nil +} + +func (t *inMemoryThread) GetChange(ctx context.Context, changeId string) (*RawChange, error) { + if res, exists := t.changes[changeId]; exists { + return res, nil + } + return nil, fmt.Errorf("could not get change with id: %s", changeId) +} diff --git a/thread/pb/protos/thread.proto b/thread/pb/protos/thread.proto index d3930e4c..fb2b15e4 100644 --- a/thread/pb/protos/thread.proto +++ b/thread/pb/protos/thread.proto @@ -3,6 +3,6 @@ package anytype; option go_package = "pb"; message ThreadHeader { - string firstRecordId = 1; + string firstChangeId = 1; bool isWorkspace = 2; } \ No newline at end of file diff --git a/thread/pb/thread.pb.go b/thread/pb/thread.pb.go index f6b0ef5c..e941b634 100644 --- a/thread/pb/thread.pb.go +++ b/thread/pb/thread.pb.go @@ -23,7 +23,7 @@ var _ = math.Inf const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type ThreadHeader struct { - FirstRecordId string `protobuf:"bytes,1,opt,name=firstRecordId,proto3" json:"firstRecordId,omitempty"` + FirstChangeId string `protobuf:"bytes,1,opt,name=firstChangeId,proto3" json:"firstChangeId,omitempty"` IsWorkspace bool `protobuf:"varint,2,opt,name=isWorkspace,proto3" json:"isWorkspace,omitempty"` } @@ -60,9 +60,9 @@ func (m *ThreadHeader) XXX_DiscardUnknown() { var xxx_messageInfo_ThreadHeader proto.InternalMessageInfo -func (m *ThreadHeader) GetFirstRecordId() string { +func (m *ThreadHeader) GetFirstChangeId() string { if m != nil { - return m.FirstRecordId + return m.FirstChangeId } return "" } @@ -86,12 +86,12 @@ var fileDescriptor_b228ffbfd554b168 = []byte{ 0x4d, 0x4c, 0xd1, 0x2f, 0x48, 0xd2, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x2f, 0xd6, 0x87, 0x08, 0xe8, 0x81, 0x79, 0x42, 0xec, 0x89, 0x79, 0x95, 0x25, 0x95, 0x05, 0xa9, 0x4a, 0x61, 0x5c, 0x3c, 0x21, 0x60, 0x09, 0x8f, 0xd4, 0xc4, 0x94, 0xd4, 0x22, 0x21, 0x15, 0x2e, 0xde, 0xb4, 0xcc, 0xa2, 0xe2, - 0x92, 0xa0, 0xd4, 0xe4, 0xfc, 0xa2, 0x14, 0xcf, 0x14, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xce, 0x20, + 0x12, 0xe7, 0x8c, 0xc4, 0xbc, 0xf4, 0x54, 0xcf, 0x14, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x54, 0x41, 0x21, 0x05, 0x2e, 0xee, 0xcc, 0xe2, 0xf0, 0xfc, 0xa2, 0xec, 0xe2, 0x82, 0xc4, 0xe4, 0x54, 0x09, 0x26, 0x05, 0x46, 0x0d, 0x8e, 0x20, 0x64, 0x21, 0x27, 0x99, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x71, 0xc2, 0x63, 0x39, 0x86, 0x0b, 0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, 0x88, 0x62, 0x2a, 0x48, 0x4a, 0x62, 0x03, 0xbb, 0xc2, 0x18, 0x10, - 0x00, 0x00, 0xff, 0xff, 0x05, 0x80, 0xf4, 0xa3, 0xa6, 0x00, 0x00, 0x00, + 0x00, 0x00, 0xff, 0xff, 0x2a, 0xae, 0x9d, 0xc2, 0xa6, 0x00, 0x00, 0x00, } func (m *ThreadHeader) Marshal() (dAtA []byte, err error) { @@ -124,10 +124,10 @@ func (m *ThreadHeader) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x10 } - if len(m.FirstRecordId) > 0 { - i -= len(m.FirstRecordId) - copy(dAtA[i:], m.FirstRecordId) - i = encodeVarintThread(dAtA, i, uint64(len(m.FirstRecordId))) + if len(m.FirstChangeId) > 0 { + i -= len(m.FirstChangeId) + copy(dAtA[i:], m.FirstChangeId) + i = encodeVarintThread(dAtA, i, uint64(len(m.FirstChangeId))) i-- dAtA[i] = 0xa } @@ -151,7 +151,7 @@ func (m *ThreadHeader) Size() (n int) { } var l int _ = l - l = len(m.FirstRecordId) + l = len(m.FirstChangeId) if l > 0 { n += 1 + l + sovThread(uint64(l)) } @@ -198,7 +198,7 @@ func (m *ThreadHeader) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field FirstRecordId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field FirstChangeId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -226,7 +226,7 @@ func (m *ThreadHeader) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.FirstRecordId = string(dAtA[iNdEx:postIndex]) + m.FirstChangeId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 0 {