Start tree cache and document rewrite
This commit is contained in:
parent
e4ad5b6485
commit
e709febfb2
@ -43,10 +43,9 @@ type docTree struct {
|
||||
accountData *account.AccountData
|
||||
updateListener TreeUpdateListener
|
||||
|
||||
id string
|
||||
header *treepb.TreeHeader
|
||||
tree *Tree
|
||||
aclState *ACLState
|
||||
id string
|
||||
header *treepb.TreeHeader
|
||||
tree *Tree
|
||||
|
||||
treeBuilder *treeBuilder
|
||||
validator DocTreeValidator
|
||||
@ -62,7 +61,6 @@ func BuildDocTreeWithIdentity(t treestorage.TreeStorage, acc *account.AccountDat
|
||||
treeStorage: t,
|
||||
accountData: acc,
|
||||
tree: nil,
|
||||
aclState: nil,
|
||||
treeBuilder: treeBuilder,
|
||||
validator: validator,
|
||||
updateListener: listener,
|
||||
@ -100,7 +98,6 @@ func BuildDocTree(t treestorage.TreeStorage, decoder signingkey.PubKeyDecoder, l
|
||||
docTree := &docTree{
|
||||
treeStorage: t,
|
||||
tree: nil,
|
||||
aclState: nil,
|
||||
treeBuilder: treeBuilder,
|
||||
validator: validator,
|
||||
updateListener: listener,
|
||||
@ -222,7 +219,7 @@ func (d *docTree) AddContent(ctx context.Context, aclTree ACLTree, content proto
|
||||
// clearing tree, because we already fixed everything in the last snapshot
|
||||
d.tree = &Tree{}
|
||||
}
|
||||
d.tree.AddFast(ch)
|
||||
d.tree.AddFast(ch) // TODO: Add head
|
||||
rawCh := &aclpb.RawChange{
|
||||
Payload: fullMarshalledChange,
|
||||
Signature: ch.Signature(),
|
||||
|
||||
129
pkg/acl/tree/treestorage.go
Normal file
129
pkg/acl/tree/treestorage.go
Normal file
@ -0,0 +1,129 @@
|
||||
package tree
|
||||
|
||||
import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"time"
|
||||
)
|
||||
|
||||
func CreateNewTreeStorageWithACL(
|
||||
acc *account.AccountData,
|
||||
build func(builder ACLChangeBuilder) error,
|
||||
create treestorage.CreatorFunc) (treestorage.TreeStorage, error) {
|
||||
bld := newACLChangeBuilder()
|
||||
bld.Init(
|
||||
newACLStateWithIdentity(acc.Identity, acc.EncKey, signingkey.NewEd25519PubKeyDecoder()),
|
||||
&Tree{},
|
||||
acc)
|
||||
err := build(bld)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
change, payload, err := bld.BuildAndApply()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rawChange := &aclpb.RawChange{
|
||||
Payload: payload,
|
||||
Signature: change.Signature(),
|
||||
Id: change.CID(),
|
||||
}
|
||||
header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_ACLTree)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
thr, err := create(id, header, []*aclpb.RawChange{rawChange})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = thr.SetHeads([]string{change.CID()})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return thr, nil
|
||||
}
|
||||
|
||||
func CreateNewTreeStorage(
|
||||
acc *account.AccountData,
|
||||
aclTree ACLTree,
|
||||
content proto.Marshaler,
|
||||
create treestorage.CreatorFunc) (treestorage.TreeStorage, error) {
|
||||
|
||||
state := aclTree.ACLState()
|
||||
change := &aclpb.Change{
|
||||
AclHeadIds: aclTree.Heads(),
|
||||
CurrentReadKeyHash: state.currentReadKeyHash,
|
||||
Timestamp: int64(time.Now().Nanosecond()),
|
||||
Identity: acc.Identity,
|
||||
}
|
||||
|
||||
marshalledData, err := content.Marshal()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
encrypted, err := state.userReadKeys[state.currentReadKeyHash].Encrypt(marshalledData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
change.ChangesData = encrypted
|
||||
|
||||
fullMarshalledChange, err := proto.Marshal(change)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
signature, err := acc.SignKey.Sign(fullMarshalledChange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
id, err := cid.NewCIDFromBytes(fullMarshalledChange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rawChange := &aclpb.RawChange{
|
||||
Payload: fullMarshalledChange,
|
||||
Signature: signature,
|
||||
Id: id,
|
||||
}
|
||||
header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_DocTree)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
thr, err := create(id, header, []*aclpb.RawChange{rawChange})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = thr.SetHeads([]string{id})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return thr, nil
|
||||
}
|
||||
|
||||
func createTreeHeaderAndId(change *aclpb.RawChange, treeType treepb.TreeHeaderTreeType) (*treepb.TreeHeader, string, error) {
|
||||
header := &treepb.TreeHeader{
|
||||
FirstChangeId: change.Id,
|
||||
Type: treeType,
|
||||
}
|
||||
marshalledHeader, err := proto.Marshal(header)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
treeId, err := cid.NewCIDFromBytes(marshalledHeader)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return header, treeId, nil
|
||||
}
|
||||
@ -6,6 +6,7 @@ message TreeHeader {
|
||||
string firstChangeId = 1;
|
||||
bool isWorkspace = 2;
|
||||
TreeType type = 3;
|
||||
string aclTreeId = 4;
|
||||
// TODO: add user identity, signature and nano timestamp
|
||||
|
||||
enum TreeType {
|
||||
|
||||
@ -51,6 +51,7 @@ type TreeHeader struct {
|
||||
FirstChangeId string `protobuf:"bytes,1,opt,name=firstChangeId,proto3" json:"firstChangeId,omitempty"`
|
||||
IsWorkspace bool `protobuf:"varint,2,opt,name=isWorkspace,proto3" json:"isWorkspace,omitempty"`
|
||||
Type TreeHeaderTreeType `protobuf:"varint,3,opt,name=type,proto3,enum=tree.TreeHeaderTreeType" json:"type,omitempty"`
|
||||
AclTreeId string `protobuf:"bytes,4,opt,name=aclTreeId,proto3" json:"aclTreeId,omitempty"`
|
||||
}
|
||||
|
||||
func (m *TreeHeader) Reset() { *m = TreeHeader{} }
|
||||
@ -107,6 +108,13 @@ func (m *TreeHeader) GetType() TreeHeaderTreeType {
|
||||
return TreeHeader_ACLTree
|
||||
}
|
||||
|
||||
func (m *TreeHeader) GetAclTreeId() string {
|
||||
if m != nil {
|
||||
return m.AclTreeId
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("tree.TreeHeaderTreeType", TreeHeaderTreeType_name, TreeHeaderTreeType_value)
|
||||
proto.RegisterType((*TreeHeader)(nil), "tree.TreeHeader")
|
||||
@ -117,21 +125,22 @@ func init() {
|
||||
}
|
||||
|
||||
var fileDescriptor_e7d760b855878644 = []byte{
|
||||
// 220 bytes of a gzipped FileDescriptorProto
|
||||
// 237 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x29, 0xc8, 0x4e, 0xd7,
|
||||
0x4f, 0x4c, 0xce, 0xd1, 0x2f, 0x29, 0x4a, 0x4d, 0x2d, 0x2e, 0xc9, 0x2f, 0x4a, 0x4c, 0x4f, 0x05,
|
||||
0xb3, 0x0b, 0x92, 0xf4, 0x0b, 0x8a, 0xf2, 0x4b, 0xf2, 0x8b, 0xc1, 0x3c, 0x3d, 0x30, 0x5b, 0x88,
|
||||
0x05, 0xc4, 0x56, 0x5a, 0xc9, 0xc8, 0xc5, 0x15, 0x52, 0x94, 0x9a, 0xea, 0x91, 0x9a, 0x98, 0x92,
|
||||
0x05, 0xc4, 0x56, 0x3a, 0xce, 0xc8, 0xc5, 0x15, 0x52, 0x94, 0x9a, 0xea, 0x91, 0x9a, 0x98, 0x92,
|
||||
0x5a, 0x24, 0xa4, 0xc2, 0xc5, 0x9b, 0x96, 0x59, 0x54, 0x5c, 0xe2, 0x9c, 0x91, 0x98, 0x97, 0x9e,
|
||||
0xea, 0x99, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x84, 0x2a, 0x28, 0xa4, 0xc0, 0xc5, 0x9d,
|
||||
0x59, 0x1c, 0x9e, 0x5f, 0x94, 0x5d, 0x5c, 0x90, 0x98, 0x9c, 0x2a, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1,
|
||||
0x11, 0x84, 0x2c, 0x24, 0xa4, 0xcb, 0xc5, 0x52, 0x52, 0x59, 0x90, 0x2a, 0xc1, 0xac, 0xc0, 0xa8,
|
||||
0xc1, 0x67, 0x24, 0xa9, 0x07, 0xb6, 0x17, 0x61, 0x0f, 0x98, 0x19, 0x52, 0x59, 0x90, 0x1a, 0x04,
|
||||
0x56, 0xa6, 0xa4, 0xc2, 0xc5, 0x01, 0x13, 0x11, 0xe2, 0xe6, 0x62, 0x77, 0x74, 0xf6, 0x01, 0x71,
|
||||
0x05, 0x18, 0x40, 0x1c, 0x97, 0xfc, 0x64, 0x30, 0x87, 0xd1, 0x49, 0xe1, 0xc4, 0x23, 0x39, 0xc6,
|
||||
0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39,
|
||||
0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0x20, 0xbe, 0x4c, 0x62, 0x03, 0x7b, 0xcd, 0x18, 0x10,
|
||||
0x00, 0x00, 0xff, 0xff, 0x72, 0x29, 0xbc, 0x0e, 0x0a, 0x01, 0x00, 0x00,
|
||||
0x56, 0x26, 0x24, 0xc3, 0xc5, 0x99, 0x98, 0x9c, 0x03, 0x12, 0xf4, 0x4c, 0x91, 0x60, 0x01, 0x5b,
|
||||
0x89, 0x10, 0x50, 0x52, 0xe1, 0xe2, 0x80, 0xa9, 0x17, 0xe2, 0xe6, 0x62, 0x77, 0x74, 0xf6, 0x01,
|
||||
0x71, 0x05, 0x18, 0x40, 0x1c, 0x97, 0xfc, 0x64, 0x30, 0x87, 0xd1, 0x49, 0xe1, 0xc4, 0x23, 0x39,
|
||||
0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63,
|
||||
0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0x20, 0x61, 0x90, 0xc4, 0x06, 0xf6, 0xb8, 0x31,
|
||||
0x20, 0x00, 0x00, 0xff, 0xff, 0x0c, 0x86, 0x4d, 0x81, 0x28, 0x01, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *TreeHeader) Marshal() (dAtA []byte, err error) {
|
||||
@ -154,6 +163,13 @@ func (m *TreeHeader) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if len(m.AclTreeId) > 0 {
|
||||
i -= len(m.AclTreeId)
|
||||
copy(dAtA[i:], m.AclTreeId)
|
||||
i = encodeVarintTree(dAtA, i, uint64(len(m.AclTreeId)))
|
||||
i--
|
||||
dAtA[i] = 0x22
|
||||
}
|
||||
if m.Type != 0 {
|
||||
i = encodeVarintTree(dAtA, i, uint64(m.Type))
|
||||
i--
|
||||
@ -206,6 +222,10 @@ func (m *TreeHeader) Size() (n int) {
|
||||
if m.Type != 0 {
|
||||
n += 1 + sovTree(uint64(m.Type))
|
||||
}
|
||||
l = len(m.AclTreeId)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovTree(uint64(l))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
@ -315,6 +335,38 @@ func (m *TreeHeader) Unmarshal(dAtA []byte) error {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 4:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field AclTreeId", wireType)
|
||||
}
|
||||
var stringLen uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTree
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
intStringLen := int(stringLen)
|
||||
if intStringLen < 0 {
|
||||
return ErrInvalidLengthTree
|
||||
}
|
||||
postIndex := iNdEx + intStringLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthTree
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.AclTreeId = string(dAtA[iNdEx:postIndex])
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipTree(dAtA[iNdEx:])
|
||||
|
||||
@ -79,7 +79,7 @@ func (s *service) treeDump(w http.ResponseWriter, req *http.Request) {
|
||||
dump string
|
||||
err error
|
||||
)
|
||||
err = s.treeCache.DoWrite(context.Background(), treeId, func(tree acltree.ACLTree) error {
|
||||
err = s.treeCache.Do(context.Background(), treeId, func(tree acltree.ACLTree) error {
|
||||
dump, err = tree.DebugDump()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -7,6 +7,8 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/testchangepb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
|
||||
@ -22,9 +24,10 @@ var CName = "DocumentService"
|
||||
var log = logger.NewNamed("documentservice")
|
||||
|
||||
type service struct {
|
||||
messageService message.Service
|
||||
treeCache treecache.Service
|
||||
account account.Service
|
||||
messageService message.Service
|
||||
treeCache treecache.Service
|
||||
account account.Service
|
||||
treeStorageProvider treestorage.Provider
|
||||
// to create new documents we need to know all nodes
|
||||
nodes []*node.Node
|
||||
}
|
||||
@ -42,6 +45,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) {
|
||||
s.account = a.MustComponent(account.CName).(account.Service)
|
||||
s.messageService = a.MustComponent(message.CName).(message.Service)
|
||||
s.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
|
||||
// TODO: add TreeStorageProvider service
|
||||
|
||||
nodesService := a.MustComponent(node.CName).(node.Service)
|
||||
s.nodes = nodesService.Nodes()
|
||||
@ -71,7 +75,7 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro
|
||||
log.With(zap.String("id", id), zap.String("text", text)).
|
||||
Debug("updating document")
|
||||
|
||||
err = s.treeCache.DoWrite(ctx, id, func(tree acltree.ACLTree) error {
|
||||
err = s.treeCache.Do(ctx, id, func(tree acltree.ACLTree) error {
|
||||
ch, err = tree.AddContent(ctx, func(builder acltree.ChangeBuilder) error {
|
||||
builder.AddChangeContent(
|
||||
&testchangepb.PlainTextChangeData{
|
||||
@ -109,7 +113,7 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro
|
||||
}))
|
||||
}
|
||||
|
||||
func (s *service) CreateDocument(ctx context.Context, text string) (id string, err error) {
|
||||
func (s *service) CreateACLTree(ctx context.Context) (id string, err error) {
|
||||
acc := s.account.Account()
|
||||
var (
|
||||
ch *aclpb.RawChange
|
||||
@ -118,7 +122,7 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
|
||||
heads []string
|
||||
)
|
||||
|
||||
err = s.treeCache.Create(ctx, func(builder acltree.ChangeBuilder) error {
|
||||
t, err := tree.CreateNewTreeStorageWithACL(acc, func(builder tree.ACLChangeBuilder) error {
|
||||
err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -130,28 +134,81 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
builder.AddChangeContent(createInitialChangeContent(text))
|
||||
return nil
|
||||
}, func(tree acltree.ACLTree) error {
|
||||
id = tree.ID()
|
||||
heads = tree.Heads()
|
||||
header = tree.Header()
|
||||
snapshotPath = tree.SnapshotPath()
|
||||
ch, err = tree.Storage().GetChange(ctx, heads[0])
|
||||
}, s.treeStorageProvider.CreateTreeStorage)
|
||||
|
||||
id, err = t.TreeID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
header, err = t.Header()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
heads = []string{header.FirstChangeId}
|
||||
snapshotPath = []string{header.FirstChangeId}
|
||||
ch, err = t.GetChange(ctx, header.FirstChangeId)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
err = s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
|
||||
Heads: heads,
|
||||
Changes: []*aclpb.RawChange{ch},
|
||||
TreeId: id,
|
||||
SnapshotPath: snapshotPath,
|
||||
TreeHeader: header,
|
||||
}))
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *service) CreateDocumentTree(ctx context.Context, aclTreeId string, text string) (id string, err error) {
|
||||
acc := s.account.Account()
|
||||
var (
|
||||
ch *aclpb.RawChange
|
||||
header *treepb.TreeHeader
|
||||
snapshotPath []string
|
||||
heads []string
|
||||
)
|
||||
err = s.treeCache.Do(ctx, aclTreeId, func(t tree.ACLTree) error {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
content := createInitialChangeContent(text)
|
||||
doc, err := tree.CreateNewTreeStorage(acc, t, content, s.treeStorageProvider.CreateTreeStorage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.With(
|
||||
zap.String("id", id),
|
||||
zap.Strings("heads", heads),
|
||||
zap.String("header", header.String())).
|
||||
Debug("document created in the database")
|
||||
|
||||
id, err = doc.TreeID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
header, err = doc.Header()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
heads = []string{header.FirstChangeId}
|
||||
snapshotPath = []string{header.FirstChangeId}
|
||||
ch, err = doc.GetChange(ctx, header.FirstChangeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
log.With(zap.String("id", id), zap.String("text", text)).
|
||||
Debug("creating document")
|
||||
|
||||
|
||||
@ -79,7 +79,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
||||
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)).
|
||||
Debug("processing head update")
|
||||
|
||||
err = r.treeCache.DoWrite(ctx, update.TreeId, func(tree acltree.ACLTree) error {
|
||||
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
|
||||
// TODO: check if we already have those changes
|
||||
result, err = tree.AddRawChanges(ctx, update.Changes...)
|
||||
if err != nil {
|
||||
@ -133,7 +133,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
||||
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
|
||||
Debug("processing full sync request")
|
||||
|
||||
err = r.treeCache.DoWrite(ctx, request.TreeId, func(tree acltree.ACLTree) error {
|
||||
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
|
||||
// TODO: check if we already have those changes
|
||||
// if we have non-empty request
|
||||
if len(request.Heads) != 0 {
|
||||
@ -177,7 +177,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
||||
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
|
||||
Debug("processing full sync response")
|
||||
|
||||
err = r.treeCache.DoWrite(ctx, response.TreeId, func(tree acltree.ACLTree) error {
|
||||
err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error {
|
||||
// TODO: check if we already have those changes
|
||||
result, err = tree.AddRawChanges(ctx, response.Changes...)
|
||||
if err != nil {
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
||||
@ -16,16 +17,14 @@ import (
|
||||
const CName = "treecache"
|
||||
|
||||
// TODO: add context
|
||||
type ACLTreeFunc = func(tree acltree.ACLTree) error
|
||||
type ACLTreeFunc = func(tree tree.ACLTree) error
|
||||
type ChangeBuildFunc = func(builder acltree.ChangeBuilder) error
|
||||
|
||||
var log = logger.NewNamed("treecache")
|
||||
|
||||
type Service interface {
|
||||
DoWrite(ctx context.Context, treeId string, f ACLTreeFunc) error
|
||||
DoRead(ctx context.Context, treeId string, f ACLTreeFunc) error
|
||||
Do(ctx context.Context, treeId string, f ACLTreeFunc) error
|
||||
Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error
|
||||
Create(ctx context.Context, build ChangeBuildFunc, f ACLTreeFunc) error
|
||||
}
|
||||
|
||||
type service struct {
|
||||
@ -38,51 +37,17 @@ func New() app.ComponentRunnable {
|
||||
return &service{}
|
||||
}
|
||||
|
||||
func (s *service) Create(ctx context.Context, build ChangeBuildFunc, f ACLTreeFunc) error {
|
||||
acc := s.account.Account()
|
||||
st, err := acltree.CreateNewTreeStorageWithACL(acc, build, s.treeProvider.CreateTreeStorage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := st.TreeID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.DoWrite(ctx, id, f)
|
||||
}
|
||||
|
||||
func (s *service) DoWrite(ctx context.Context, treeId string, f ACLTreeFunc) error {
|
||||
func (s *service) Do(ctx context.Context, treeId string, f ACLTreeFunc) error {
|
||||
log.
|
||||
With(zap.String("treeId", treeId)).
|
||||
Debug("requesting tree from cache to perform operation")
|
||||
|
||||
tree, err := s.cache.Get(ctx, treeId)
|
||||
t, err := s.cache.Get(ctx, treeId)
|
||||
defer s.cache.Release(treeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aclTree := tree.(acltree.ACLTree)
|
||||
aclTree.Lock()
|
||||
defer aclTree.Unlock()
|
||||
return f(tree.(acltree.ACLTree))
|
||||
}
|
||||
|
||||
func (s *service) DoRead(ctx context.Context, treeId string, f ACLTreeFunc) error {
|
||||
log.
|
||||
With(zap.String("treeId", treeId)).
|
||||
Debug("requesting tree from cache to perform operation")
|
||||
|
||||
tree, err := s.cache.Get(ctx, treeId)
|
||||
defer s.cache.Release(treeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aclTree := tree.(acltree.ACLTree)
|
||||
aclTree.RLock()
|
||||
defer aclTree.RUnlock()
|
||||
return f(tree.(acltree.ACLTree))
|
||||
return f(t.(tree.ACLTree))
|
||||
}
|
||||
|
||||
func (s *service) Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user