Start tree cache and document rewrite

This commit is contained in:
mcrakhman 2022-08-10 22:16:16 +02:00
parent 0e3d7cc8ca
commit e7956901de
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
8 changed files with 279 additions and 78 deletions

View File

@ -43,10 +43,9 @@ type docTree struct {
accountData *account.AccountData accountData *account.AccountData
updateListener TreeUpdateListener updateListener TreeUpdateListener
id string id string
header *treepb.TreeHeader header *treepb.TreeHeader
tree *Tree tree *Tree
aclState *ACLState
treeBuilder *treeBuilder treeBuilder *treeBuilder
validator DocTreeValidator validator DocTreeValidator
@ -62,7 +61,6 @@ func BuildDocTreeWithIdentity(t treestorage.TreeStorage, acc *account.AccountDat
treeStorage: t, treeStorage: t,
accountData: acc, accountData: acc,
tree: nil, tree: nil,
aclState: nil,
treeBuilder: treeBuilder, treeBuilder: treeBuilder,
validator: validator, validator: validator,
updateListener: listener, updateListener: listener,
@ -100,7 +98,6 @@ func BuildDocTree(t treestorage.TreeStorage, decoder signingkey.PubKeyDecoder, l
docTree := &docTree{ docTree := &docTree{
treeStorage: t, treeStorage: t,
tree: nil, tree: nil,
aclState: nil,
treeBuilder: treeBuilder, treeBuilder: treeBuilder,
validator: validator, validator: validator,
updateListener: listener, updateListener: listener,
@ -222,7 +219,7 @@ func (d *docTree) AddContent(ctx context.Context, aclTree ACLTree, content proto
// clearing tree, because we already fixed everything in the last snapshot // clearing tree, because we already fixed everything in the last snapshot
d.tree = &Tree{} d.tree = &Tree{}
} }
d.tree.AddFast(ch) d.tree.AddFast(ch) // TODO: Add head
rawCh := &aclpb.RawChange{ rawCh := &aclpb.RawChange{
Payload: fullMarshalledChange, Payload: fullMarshalledChange,
Signature: ch.Signature(), Signature: ch.Signature(),

129
pkg/acl/tree/treestorage.go Normal file
View File

@ -0,0 +1,129 @@
package tree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"github.com/gogo/protobuf/proto"
"time"
)
func CreateNewTreeStorageWithACL(
acc *account.AccountData,
build func(builder ACLChangeBuilder) error,
create treestorage.CreatorFunc) (treestorage.TreeStorage, error) {
bld := newACLChangeBuilder()
bld.Init(
newACLStateWithIdentity(acc.Identity, acc.EncKey, signingkey.NewEd25519PubKeyDecoder()),
&Tree{},
acc)
err := build(bld)
if err != nil {
return nil, err
}
change, payload, err := bld.BuildAndApply()
if err != nil {
return nil, err
}
rawChange := &aclpb.RawChange{
Payload: payload,
Signature: change.Signature(),
Id: change.CID(),
}
header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_ACLTree)
if err != nil {
return nil, err
}
thr, err := create(id, header, []*aclpb.RawChange{rawChange})
if err != nil {
return nil, err
}
err = thr.SetHeads([]string{change.CID()})
if err != nil {
return nil, err
}
return thr, nil
}
func CreateNewTreeStorage(
acc *account.AccountData,
aclTree ACLTree,
content proto.Marshaler,
create treestorage.CreatorFunc) (treestorage.TreeStorage, error) {
state := aclTree.ACLState()
change := &aclpb.Change{
AclHeadIds: aclTree.Heads(),
CurrentReadKeyHash: state.currentReadKeyHash,
Timestamp: int64(time.Now().Nanosecond()),
Identity: acc.Identity,
}
marshalledData, err := content.Marshal()
if err != nil {
return nil, err
}
encrypted, err := state.userReadKeys[state.currentReadKeyHash].Encrypt(marshalledData)
if err != nil {
return nil, err
}
change.ChangesData = encrypted
fullMarshalledChange, err := proto.Marshal(change)
if err != nil {
return nil, err
}
signature, err := acc.SignKey.Sign(fullMarshalledChange)
if err != nil {
return nil, err
}
id, err := cid.NewCIDFromBytes(fullMarshalledChange)
if err != nil {
return nil, err
}
rawChange := &aclpb.RawChange{
Payload: fullMarshalledChange,
Signature: signature,
Id: id,
}
header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_DocTree)
if err != nil {
return nil, err
}
thr, err := create(id, header, []*aclpb.RawChange{rawChange})
if err != nil {
return nil, err
}
err = thr.SetHeads([]string{id})
if err != nil {
return nil, err
}
return thr, nil
}
func createTreeHeaderAndId(change *aclpb.RawChange, treeType treepb.TreeHeaderTreeType) (*treepb.TreeHeader, string, error) {
header := &treepb.TreeHeader{
FirstChangeId: change.Id,
Type: treeType,
}
marshalledHeader, err := proto.Marshal(header)
if err != nil {
return nil, "", err
}
treeId, err := cid.NewCIDFromBytes(marshalledHeader)
if err != nil {
return nil, "", err
}
return header, treeId, nil
}

View File

@ -6,6 +6,7 @@ message TreeHeader {
string firstChangeId = 1; string firstChangeId = 1;
bool isWorkspace = 2; bool isWorkspace = 2;
TreeType type = 3; TreeType type = 3;
string aclTreeId = 4;
// TODO: add user identity, signature and nano timestamp // TODO: add user identity, signature and nano timestamp
enum TreeType { enum TreeType {

View File

@ -51,6 +51,7 @@ type TreeHeader struct {
FirstChangeId string `protobuf:"bytes,1,opt,name=firstChangeId,proto3" json:"firstChangeId,omitempty"` FirstChangeId string `protobuf:"bytes,1,opt,name=firstChangeId,proto3" json:"firstChangeId,omitempty"`
IsWorkspace bool `protobuf:"varint,2,opt,name=isWorkspace,proto3" json:"isWorkspace,omitempty"` IsWorkspace bool `protobuf:"varint,2,opt,name=isWorkspace,proto3" json:"isWorkspace,omitempty"`
Type TreeHeaderTreeType `protobuf:"varint,3,opt,name=type,proto3,enum=tree.TreeHeaderTreeType" json:"type,omitempty"` Type TreeHeaderTreeType `protobuf:"varint,3,opt,name=type,proto3,enum=tree.TreeHeaderTreeType" json:"type,omitempty"`
AclTreeId string `protobuf:"bytes,4,opt,name=aclTreeId,proto3" json:"aclTreeId,omitempty"`
} }
func (m *TreeHeader) Reset() { *m = TreeHeader{} } func (m *TreeHeader) Reset() { *m = TreeHeader{} }
@ -107,6 +108,13 @@ func (m *TreeHeader) GetType() TreeHeaderTreeType {
return TreeHeader_ACLTree return TreeHeader_ACLTree
} }
func (m *TreeHeader) GetAclTreeId() string {
if m != nil {
return m.AclTreeId
}
return ""
}
func init() { func init() {
proto.RegisterEnum("tree.TreeHeaderTreeType", TreeHeaderTreeType_name, TreeHeaderTreeType_value) proto.RegisterEnum("tree.TreeHeaderTreeType", TreeHeaderTreeType_name, TreeHeaderTreeType_value)
proto.RegisterType((*TreeHeader)(nil), "tree.TreeHeader") proto.RegisterType((*TreeHeader)(nil), "tree.TreeHeader")
@ -117,21 +125,22 @@ func init() {
} }
var fileDescriptor_e7d760b855878644 = []byte{ var fileDescriptor_e7d760b855878644 = []byte{
// 220 bytes of a gzipped FileDescriptorProto // 237 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x29, 0xc8, 0x4e, 0xd7, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x29, 0xc8, 0x4e, 0xd7,
0x4f, 0x4c, 0xce, 0xd1, 0x2f, 0x29, 0x4a, 0x4d, 0x2d, 0x2e, 0xc9, 0x2f, 0x4a, 0x4c, 0x4f, 0x05, 0x4f, 0x4c, 0xce, 0xd1, 0x2f, 0x29, 0x4a, 0x4d, 0x2d, 0x2e, 0xc9, 0x2f, 0x4a, 0x4c, 0x4f, 0x05,
0xb3, 0x0b, 0x92, 0xf4, 0x0b, 0x8a, 0xf2, 0x4b, 0xf2, 0x8b, 0xc1, 0x3c, 0x3d, 0x30, 0x5b, 0x88, 0xb3, 0x0b, 0x92, 0xf4, 0x0b, 0x8a, 0xf2, 0x4b, 0xf2, 0x8b, 0xc1, 0x3c, 0x3d, 0x30, 0x5b, 0x88,
0x05, 0xc4, 0x56, 0x5a, 0xc9, 0xc8, 0xc5, 0x15, 0x52, 0x94, 0x9a, 0xea, 0x91, 0x9a, 0x98, 0x92, 0x05, 0xc4, 0x56, 0x3a, 0xce, 0xc8, 0xc5, 0x15, 0x52, 0x94, 0x9a, 0xea, 0x91, 0x9a, 0x98, 0x92,
0x5a, 0x24, 0xa4, 0xc2, 0xc5, 0x9b, 0x96, 0x59, 0x54, 0x5c, 0xe2, 0x9c, 0x91, 0x98, 0x97, 0x9e, 0x5a, 0x24, 0xa4, 0xc2, 0xc5, 0x9b, 0x96, 0x59, 0x54, 0x5c, 0xe2, 0x9c, 0x91, 0x98, 0x97, 0x9e,
0xea, 0x99, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x84, 0x2a, 0x28, 0xa4, 0xc0, 0xc5, 0x9d, 0xea, 0x99, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x84, 0x2a, 0x28, 0xa4, 0xc0, 0xc5, 0x9d,
0x59, 0x1c, 0x9e, 0x5f, 0x94, 0x5d, 0x5c, 0x90, 0x98, 0x9c, 0x2a, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x59, 0x1c, 0x9e, 0x5f, 0x94, 0x5d, 0x5c, 0x90, 0x98, 0x9c, 0x2a, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1,
0x11, 0x84, 0x2c, 0x24, 0xa4, 0xcb, 0xc5, 0x52, 0x52, 0x59, 0x90, 0x2a, 0xc1, 0xac, 0xc0, 0xa8, 0x11, 0x84, 0x2c, 0x24, 0xa4, 0xcb, 0xc5, 0x52, 0x52, 0x59, 0x90, 0x2a, 0xc1, 0xac, 0xc0, 0xa8,
0xc1, 0x67, 0x24, 0xa9, 0x07, 0xb6, 0x17, 0x61, 0x0f, 0x98, 0x19, 0x52, 0x59, 0x90, 0x1a, 0x04, 0xc1, 0x67, 0x24, 0xa9, 0x07, 0xb6, 0x17, 0x61, 0x0f, 0x98, 0x19, 0x52, 0x59, 0x90, 0x1a, 0x04,
0x56, 0xa6, 0xa4, 0xc2, 0xc5, 0x01, 0x13, 0x11, 0xe2, 0xe6, 0x62, 0x77, 0x74, 0xf6, 0x01, 0x71, 0x56, 0x26, 0x24, 0xc3, 0xc5, 0x99, 0x98, 0x9c, 0x03, 0x12, 0xf4, 0x4c, 0x91, 0x60, 0x01, 0x5b,
0x05, 0x18, 0x40, 0x1c, 0x97, 0xfc, 0x64, 0x30, 0x87, 0xd1, 0x49, 0xe1, 0xc4, 0x23, 0x39, 0xc6, 0x89, 0x10, 0x50, 0x52, 0xe1, 0xe2, 0x80, 0xa9, 0x17, 0xe2, 0xe6, 0x62, 0x77, 0x74, 0xf6, 0x01,
0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x71, 0x05, 0x18, 0x40, 0x1c, 0x97, 0xfc, 0x64, 0x30, 0x87, 0xd1, 0x49, 0xe1, 0xc4, 0x23, 0x39,
0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0x20, 0xbe, 0x4c, 0x62, 0x03, 0x7b, 0xcd, 0x18, 0x10, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63,
0x00, 0x00, 0xff, 0xff, 0x72, 0x29, 0xbc, 0x0e, 0x0a, 0x01, 0x00, 0x00, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0x20, 0x61, 0x90, 0xc4, 0x06, 0xf6, 0xb8, 0x31,
0x20, 0x00, 0x00, 0xff, 0xff, 0x0c, 0x86, 0x4d, 0x81, 0x28, 0x01, 0x00, 0x00,
} }
func (m *TreeHeader) Marshal() (dAtA []byte, err error) { func (m *TreeHeader) Marshal() (dAtA []byte, err error) {
@ -154,6 +163,13 @@ func (m *TreeHeader) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.AclTreeId) > 0 {
i -= len(m.AclTreeId)
copy(dAtA[i:], m.AclTreeId)
i = encodeVarintTree(dAtA, i, uint64(len(m.AclTreeId)))
i--
dAtA[i] = 0x22
}
if m.Type != 0 { if m.Type != 0 {
i = encodeVarintTree(dAtA, i, uint64(m.Type)) i = encodeVarintTree(dAtA, i, uint64(m.Type))
i-- i--
@ -206,6 +222,10 @@ func (m *TreeHeader) Size() (n int) {
if m.Type != 0 { if m.Type != 0 {
n += 1 + sovTree(uint64(m.Type)) n += 1 + sovTree(uint64(m.Type))
} }
l = len(m.AclTreeId)
if l > 0 {
n += 1 + l + sovTree(uint64(l))
}
return n return n
} }
@ -315,6 +335,38 @@ func (m *TreeHeader) Unmarshal(dAtA []byte) error {
break break
} }
} }
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field AclTreeId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTree
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTree
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTree
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.AclTreeId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipTree(dAtA[iNdEx:]) skippy, err := skipTree(dAtA[iNdEx:])

View File

@ -79,7 +79,7 @@ func (s *service) treeDump(w http.ResponseWriter, req *http.Request) {
dump string dump string
err error err error
) )
err = s.treeCache.DoWrite(context.Background(), treeId, func(tree acltree.ACLTree) error { err = s.treeCache.Do(context.Background(), treeId, func(tree acltree.ACLTree) error {
dump, err = tree.DebugDump() dump, err = tree.DebugDump()
if err != nil { if err != nil {
return err return err

View File

@ -7,6 +7,8 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/testchangepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/testchangepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
@ -22,9 +24,10 @@ var CName = "DocumentService"
var log = logger.NewNamed("documentservice") var log = logger.NewNamed("documentservice")
type service struct { type service struct {
messageService message.Service messageService message.Service
treeCache treecache.Service treeCache treecache.Service
account account.Service account account.Service
treeStorageProvider treestorage.Provider
// to create new documents we need to know all nodes // to create new documents we need to know all nodes
nodes []*node.Node nodes []*node.Node
} }
@ -42,6 +45,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) {
s.account = a.MustComponent(account.CName).(account.Service) s.account = a.MustComponent(account.CName).(account.Service)
s.messageService = a.MustComponent(message.CName).(message.Service) s.messageService = a.MustComponent(message.CName).(message.Service)
s.treeCache = a.MustComponent(treecache.CName).(treecache.Service) s.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
// TODO: add TreeStorageProvider service
nodesService := a.MustComponent(node.CName).(node.Service) nodesService := a.MustComponent(node.CName).(node.Service)
s.nodes = nodesService.Nodes() s.nodes = nodesService.Nodes()
@ -71,7 +75,7 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro
log.With(zap.String("id", id), zap.String("text", text)). log.With(zap.String("id", id), zap.String("text", text)).
Debug("updating document") Debug("updating document")
err = s.treeCache.DoWrite(ctx, id, func(tree acltree.ACLTree) error { err = s.treeCache.Do(ctx, id, func(tree acltree.ACLTree) error {
ch, err = tree.AddContent(ctx, func(builder acltree.ChangeBuilder) error { ch, err = tree.AddContent(ctx, func(builder acltree.ChangeBuilder) error {
builder.AddChangeContent( builder.AddChangeContent(
&testchangepb.PlainTextChangeData{ &testchangepb.PlainTextChangeData{
@ -109,7 +113,7 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro
})) }))
} }
func (s *service) CreateDocument(ctx context.Context, text string) (id string, err error) { func (s *service) CreateACLTree(ctx context.Context) (id string, err error) {
acc := s.account.Account() acc := s.account.Account()
var ( var (
ch *aclpb.RawChange ch *aclpb.RawChange
@ -118,7 +122,7 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
heads []string heads []string
) )
err = s.treeCache.Create(ctx, func(builder acltree.ChangeBuilder) error { t, err := tree.CreateNewTreeStorageWithACL(acc, func(builder tree.ACLChangeBuilder) error {
err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin) err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin)
if err != nil { if err != nil {
return err return err
@ -130,28 +134,81 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
return err return err
} }
} }
builder.AddChangeContent(createInitialChangeContent(text))
return nil return nil
}, func(tree acltree.ACLTree) error { }, s.treeStorageProvider.CreateTreeStorage)
id = tree.ID()
heads = tree.Heads() id, err = t.TreeID()
header = tree.Header() if err != nil {
snapshotPath = tree.SnapshotPath() return "", err
ch, err = tree.Storage().GetChange(ctx, heads[0]) }
header, err = t.Header()
if err != nil {
return "", err
}
heads = []string{header.FirstChangeId}
snapshotPath = []string{header.FirstChangeId}
ch, err = t.GetChange(ctx, header.FirstChangeId)
if err != nil {
return "", err
}
if err != nil {
return "", err
}
err = s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
Heads: heads,
Changes: []*aclpb.RawChange{ch},
TreeId: id,
SnapshotPath: snapshotPath,
TreeHeader: header,
}))
return id, nil
}
func (s *service) CreateDocumentTree(ctx context.Context, aclTreeId string, text string) (id string, err error) {
acc := s.account.Account()
var (
ch *aclpb.RawChange
header *treepb.TreeHeader
snapshotPath []string
heads []string
)
err = s.treeCache.Do(ctx, aclTreeId, func(t tree.ACLTree) error {
t.RLock()
defer t.RUnlock()
content := createInitialChangeContent(text)
doc, err := tree.CreateNewTreeStorage(acc, t, content, s.treeStorageProvider.CreateTreeStorage)
if err != nil { if err != nil {
return err return err
} }
log.With(
zap.String("id", id), id, err = doc.TreeID()
zap.Strings("heads", heads), if err != nil {
zap.String("header", header.String())). return err
Debug("document created in the database") }
header, err = doc.Header()
if err != nil {
return err
}
heads = []string{header.FirstChangeId}
snapshotPath = []string{header.FirstChangeId}
ch, err = doc.GetChange(ctx, header.FirstChangeId)
if err != nil {
return err
}
return nil return nil
}) })
if err != nil { if err != nil {
return "", err return "", err
} }
log.With(zap.String("id", id), zap.String("text", text)). log.With(zap.String("id", id), zap.String("text", text)).
Debug("creating document") Debug("creating document")

View File

@ -79,7 +79,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)).
Debug("processing head update") Debug("processing head update")
err = r.treeCache.DoWrite(ctx, update.TreeId, func(tree acltree.ACLTree) error { err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes // TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, update.Changes...) result, err = tree.AddRawChanges(ctx, update.Changes...)
if err != nil { if err != nil {
@ -133,7 +133,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
Debug("processing full sync request") Debug("processing full sync request")
err = r.treeCache.DoWrite(ctx, request.TreeId, func(tree acltree.ACLTree) error { err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes // TODO: check if we already have those changes
// if we have non-empty request // if we have non-empty request
if len(request.Heads) != 0 { if len(request.Heads) != 0 {
@ -177,7 +177,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
Debug("processing full sync response") Debug("processing full sync response")
err = r.treeCache.DoWrite(ctx, response.TreeId, func(tree acltree.ACLTree) error { err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes // TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, response.Changes...) result, err = tree.AddRawChanges(ctx, response.Changes...)
if err != nil { if err != nil {

View File

@ -6,6 +6,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
@ -16,16 +17,14 @@ import (
const CName = "treecache" const CName = "treecache"
// TODO: add context // TODO: add context
type ACLTreeFunc = func(tree acltree.ACLTree) error type ACLTreeFunc = func(tree tree.ACLTree) error
type ChangeBuildFunc = func(builder acltree.ChangeBuilder) error type ChangeBuildFunc = func(builder acltree.ChangeBuilder) error
var log = logger.NewNamed("treecache") var log = logger.NewNamed("treecache")
type Service interface { type Service interface {
DoWrite(ctx context.Context, treeId string, f ACLTreeFunc) error Do(ctx context.Context, treeId string, f ACLTreeFunc) error
DoRead(ctx context.Context, treeId string, f ACLTreeFunc) error
Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error
Create(ctx context.Context, build ChangeBuildFunc, f ACLTreeFunc) error
} }
type service struct { type service struct {
@ -38,51 +37,17 @@ func New() app.ComponentRunnable {
return &service{} return &service{}
} }
func (s *service) Create(ctx context.Context, build ChangeBuildFunc, f ACLTreeFunc) error { func (s *service) Do(ctx context.Context, treeId string, f ACLTreeFunc) error {
acc := s.account.Account()
st, err := acltree.CreateNewTreeStorageWithACL(acc, build, s.treeProvider.CreateTreeStorage)
if err != nil {
return err
}
id, err := st.TreeID()
if err != nil {
return err
}
return s.DoWrite(ctx, id, f)
}
func (s *service) DoWrite(ctx context.Context, treeId string, f ACLTreeFunc) error {
log. log.
With(zap.String("treeId", treeId)). With(zap.String("treeId", treeId)).
Debug("requesting tree from cache to perform operation") Debug("requesting tree from cache to perform operation")
tree, err := s.cache.Get(ctx, treeId) t, err := s.cache.Get(ctx, treeId)
defer s.cache.Release(treeId) defer s.cache.Release(treeId)
if err != nil { if err != nil {
return err return err
} }
aclTree := tree.(acltree.ACLTree) return f(t.(tree.ACLTree))
aclTree.Lock()
defer aclTree.Unlock()
return f(tree.(acltree.ACLTree))
}
func (s *service) DoRead(ctx context.Context, treeId string, f ACLTreeFunc) error {
log.
With(zap.String("treeId", treeId)).
Debug("requesting tree from cache to perform operation")
tree, err := s.cache.Get(ctx, treeId)
defer s.cache.Release(treeId)
if err != nil {
return err
}
aclTree := tree.(acltree.ACLTree)
aclTree.RLock()
defer aclTree.RUnlock()
return f(tree.(acltree.ACLTree))
} }
func (s *service) Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error { func (s *service) Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error {