Add message to syncproto, fix request handler

This commit is contained in:
mcrakhman 2022-08-05 13:44:23 +02:00 committed by Mikhail Iudin
parent ebfdbdd508
commit 26704a72e9
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
9 changed files with 1930 additions and 2325 deletions

View File

@ -21,7 +21,7 @@ protos-go:
@$(eval P_TREE_STORAGE_PATH_PB := $(ROOT_PKG)/acl/treestorage/treepb) @$(eval P_TREE_STORAGE_PATH_PB := $(ROOT_PKG)/acl/treestorage/treepb)
@$(eval P_ACL_CHANGES_PATH_PB := $(ROOT_PKG)/acl/aclchanges/aclpb) @$(eval P_ACL_CHANGES_PATH_PB := $(ROOT_PKG)/acl/aclchanges/aclpb)
@$(eval P_PLAINTEXT_CHANGES_PATH_PB := $(ROOT_PKG)/acl/testutils/testchanges/testchangepb) @$(eval P_PLAINTEXT_CHANGES_PATH_PB := $(ROOT_PKG)/acl/testutils/testchanges/testchangepb)
@$(eval P_SYNC_CHANGES_PATH_PB := service/sync/syncpb) @$(eval P_SYNC_CHANGES_PATH_PB := syncproto)
@$(eval P_TIMESTAMP := Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types) @$(eval P_TIMESTAMP := Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types)
@$(eval P_STRUCT := Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types) @$(eval P_STRUCT := Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types)
@$(eval P_ACL_CHANGES := M$(P_ACL_CHANGES_PATH_PB)/protos/aclchanges.proto=github.com/anytypeio/go-anytype-infrastructure-experiments/$(P_ACL_CHANGES_PATH_PB)) @$(eval P_ACL_CHANGES := M$(P_ACL_CHANGES_PATH_PB)/protos/aclchanges.proto=github.com/anytypeio/go-anytype-infrastructure-experiments/$(P_ACL_CHANGES_PATH_PB))
@ -32,11 +32,7 @@ protos-go:
$(GOGO_START) protoc --gogofaster_out=:. $(P_TREE_STORAGE_PATH_PB)/protos/*.proto; mv $(P_TREE_STORAGE_PATH_PB)/protos/*.go $(P_TREE_STORAGE_PATH_PB) $(GOGO_START) protoc --gogofaster_out=:. $(P_TREE_STORAGE_PATH_PB)/protos/*.proto; mv $(P_TREE_STORAGE_PATH_PB)/protos/*.go $(P_TREE_STORAGE_PATH_PB)
$(GOGO_START) protoc --gogofaster_out=:. $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.proto; mv $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.go $(P_PLAINTEXT_CHANGES_PATH_PB) $(GOGO_START) protoc --gogofaster_out=:. $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.proto; mv $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.go $(P_PLAINTEXT_CHANGES_PATH_PB)
$(eval PKGMAP := $$(P_ACL_CHANGES),$$(P_TREE_CHANGES)) $(eval PKGMAP := $$(P_ACL_CHANGES),$$(P_TREE_CHANGES))
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. $(P_SYNC_CHANGES_PATH_PB)/protos/*.proto; mv $(P_SYNC_CHANGES_PATH_PB)/protos/*.go $(P_SYNC_CHANGES_PATH_PB) $(GOGO_START) protoc --gogofaster_out=$(PKGMAP),plugins=grpc:. $(P_SYNC_CHANGES_PATH_PB)/proto/*.proto
protos:
GOGO_NO_UNDERSCORE=1 GOGO_EXPORT_ONEOF_INTERFACE=1 protoc --gogofaster_out=plugins=grpc:. syncproto/proto/*.proto
build: build:
@$(eval FLAGS := $$(shell govvv -flags -pkg github.com/anytypeio/go-anytype-infrastructure-experiments/app)) @$(eval FLAGS := $$(shell govvv -flags -pkg github.com/anytypeio/go-anytype-infrastructure-experiments/app))

View File

@ -6,6 +6,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec" "github.com/libp2p/go-libp2p-core/sec"
@ -35,8 +36,9 @@ type service struct {
} }
func (s *service) Init(ctx context.Context, a *app.App) (err error) { func (s *service) Init(ctx context.Context, a *app.App) (err error) {
peerConf := a.MustComponent(config.CName).(*config.Config).PeerList account := a.MustComponent(config.CName).(*config.Config).Account
pkb, err := crypto.ConfigDecodeKey(peerConf.MyId.PrivKey) decoder := signingkey.NewEDPrivKeyDecoder()
pkb, err := decoder.DecodeFromStringIntoBytes(account.SigningKey)
if err != nil { if err != nil {
return return
} }
@ -44,7 +46,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) {
return return
} }
pid, err := peer.Decode(peerConf.MyId.PeerId) pid, err := peer.Decode(account.PeerId)
if err != nil { if err != nil {
return return
} }
@ -66,7 +68,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) {
return fmt.Errorf("peerId and privateKey mismatched") return fmt.Errorf("peerId and privateKey mismatched")
} }
log.Info("secure service init", zap.String("peerId", peerConf.MyId.PeerId)) log.Info("secure service init", zap.String("peerId", account.PeerId))
return nil return nil
} }

View File

@ -8,8 +8,8 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
) )
@ -24,11 +24,12 @@ func New() app.Component {
} }
type RequestHandler interface { type RequestHandler interface {
HandleFullSyncContent(ctx context.Context, senderId string, request *syncpb.SyncContent) (err error) HandleFullSyncContent(ctx context.Context, senderId string, request *syncproto.Sync) (err error)
} }
type MessageSender interface { type MessageSender interface {
SendMessage(peerId string, msg *syncpb.SyncContent) error SendMessage(peerId string, msg *syncproto.Sync) error
SendSpace(spaceId string, msg *syncproto.Sync) error
} }
const CName = "SyncRequestHandler" const CName = "SyncRequestHandler"
@ -52,7 +53,7 @@ func (r *requestHandler) Close(ctx context.Context) (err error) {
return nil return nil
} }
func (r *requestHandler) HandleFullSyncContent(ctx context.Context, senderId string, content *syncpb.SyncContent) error { func (r *requestHandler) HandleFullSyncContent(ctx context.Context, senderId string, content *syncproto.Sync) error {
msg := content.GetMessage() msg := content.GetMessage()
switch { switch {
case msg.GetFullSyncRequest() != nil: case msg.GetFullSyncRequest() != nil:
@ -65,9 +66,9 @@ func (r *requestHandler) HandleFullSyncContent(ctx context.Context, senderId str
return nil return nil
} }
func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) { func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncproto.SyncHeadUpdate) (err error) {
var ( var (
fullRequest *syncpb.SyncFullRequest fullRequest *syncproto.SyncFullRequest
snapshotPath []string snapshotPath []string
result acltree.AddResult result acltree.AddResult
) )
@ -91,33 +92,33 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
// if there are no such tree // if there are no such tree
if err == treestorage.ErrUnknownTreeId { if err == treestorage.ErrUnknownTreeId {
// TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request // TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request
fullRequest = &syncpb.SyncFullRequest{ fullRequest = &syncproto.SyncFullRequest{
TreeId: update.TreeId, TreeId: update.TreeId,
TreeHeader: update.TreeHeader, TreeHeader: update.TreeHeader,
} }
} }
// if we have incompatible heads, or we haven't seen the tree at all // if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil { if fullRequest != nil {
return r.messageService.SendMessage(senderId, syncpb.WrapFullRequest(fullRequest)) return r.messageService.SendMessage(senderId, syncproto.WrapFullRequest(fullRequest))
} }
// if error or nothing has changed // if error or nothing has changed
if err != nil || len(result.Added) == 0 { if err != nil || len(result.Added) == 0 {
return err return err
} }
// otherwise sending heads update message // otherwise sending heads update message
newUpdate := &syncpb.SyncHeadUpdate{ newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads, Heads: result.Heads,
Changes: result.Added, Changes: result.Added,
SnapshotPath: snapshotPath, SnapshotPath: snapshotPath,
TreeId: update.TreeId, TreeId: update.TreeId,
TreeHeader: update.TreeHeader, TreeHeader: update.TreeHeader,
} }
return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate)) return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate))
} }
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) { func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.SyncFullRequest) (err error) {
var ( var (
fullResponse *syncpb.SyncFullResponse fullResponse *syncproto.SyncFullResponse
snapshotPath []string snapshotPath []string
result acltree.AddResult result acltree.AddResult
) )
@ -141,24 +142,24 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
if err != nil { if err != nil {
return err return err
} }
err = r.messageService.SendMessage(senderId, syncpb.WrapFullResponse(fullResponse)) err = r.messageService.SendMessage(senderId, syncproto.WrapFullResponse(fullResponse))
// if error or nothing has changed // if error or nothing has changed
if err != nil || len(result.Added) == 0 { if err != nil || len(result.Added) == 0 {
return err return err
} }
// otherwise sending heads update message // otherwise sending heads update message
newUpdate := &syncpb.SyncHeadUpdate{ newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads, Heads: result.Heads,
Changes: result.Added, Changes: result.Added,
SnapshotPath: snapshotPath, SnapshotPath: snapshotPath,
TreeId: request.TreeId, TreeId: request.TreeId,
TreeHeader: request.TreeHeader, TreeHeader: request.TreeHeader,
} }
return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate)) return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate))
} }
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncpb.SyncFullResponse) (err error) { func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) {
var ( var (
snapshotPath []string snapshotPath []string
result acltree.AddResult result acltree.AddResult
@ -185,21 +186,21 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
} }
} }
// sending heads update message // sending heads update message
newUpdate := &syncpb.SyncHeadUpdate{ newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads, Heads: result.Heads,
Changes: result.Added, Changes: result.Added,
SnapshotPath: snapshotPath, SnapshotPath: snapshotPath,
TreeId: response.TreeId, TreeId: response.TreeId,
} }
return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate)) return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate))
} }
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.SyncFullRequest, error) {
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &syncpb.SyncFullRequest{ return &syncproto.SyncFullRequest{
Heads: tree.Heads(), Heads: tree.Heads(),
Changes: ourChanges, Changes: ourChanges,
TreeId: treeId, TreeId: treeId,
@ -212,7 +213,7 @@ func (r *requestHandler) prepareFullSyncResponse(
treeId string, treeId string,
theirPath []string, theirPath []string,
theirChanges []*aclpb.RawChange, theirChanges []*aclpb.RawChange,
tree acltree.ACLTree) (*syncpb.SyncFullResponse, error) { tree acltree.ACLTree) (*syncproto.SyncFullResponse, error) {
// TODO: we can probably use the common snapshot calculated on the request step from previous peer // TODO: we can probably use the common snapshot calculated on the request step from previous peer
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath)
if err != nil { if err != nil {
@ -231,7 +232,7 @@ func (r *requestHandler) prepareFullSyncResponse(
} }
} }
return &syncpb.SyncFullResponse{ return &syncproto.SyncFullResponse{
Heads: tree.Heads(), Heads: tree.Heads(),
Changes: final, Changes: final,
TreeId: treeId, TreeId: treeId,
@ -240,7 +241,7 @@ func (r *requestHandler) prepareFullSyncResponse(
}, nil }, nil
} }
func (r *requestHandler) createTree(ctx context.Context, response *syncpb.SyncFullResponse) error { func (r *requestHandler) createTree(ctx context.Context, response *syncproto.SyncFullResponse) error {
return r.treeCache.Add( return r.treeCache.Add(
ctx, ctx,
response.TreeId, response.TreeId,

View File

@ -1,19 +0,0 @@
package syncpb
func WrapHeadUpdate(update *SyncHeadUpdate) *SyncContent {
return &SyncContent{Message: &SyncContentValue{
Value: &SyncContentValueValueOfHeadUpdate{HeadUpdate: update},
}}
}
func WrapFullRequest(request *SyncFullRequest) *SyncContent {
return &SyncContent{Message: &SyncContentValue{
Value: &SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request},
}}
}
func WrapFullResponse(response *SyncFullResponse) *SyncContent {
return &SyncContent{Message: &SyncContentValue{
Value: &SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response},
}}
}

View File

@ -1,47 +0,0 @@
syntax = "proto3";
package sync;
option go_package = "syncpb";
import "pkg/acl/aclchanges/aclpb/protos/aclchanges.proto";
import "pkg/acl/treestorage/treepb/protos/tree.proto";
message Sync {
message Content {
ContentValue message = 1;
}
message ContentValue {
oneof value {
HeadUpdate headUpdate = 1;
Full.Request fullSyncRequest = 2;
Full.Response fullSyncResponse = 3;
}
}
message HeadUpdate {
repeated string heads = 1;
repeated acl.RawChange changes = 2;
string treeId = 3;
repeated string snapshotPath = 4;
tree.TreeHeader treeHeader = 5;
}
message Full {
// here with send the request with all changes we have (we already know sender's snapshot path)
message Request {
repeated string heads = 1;
repeated acl.RawChange changes = 2;
string treeId = 3;
repeated string snapshotPath = 4;
tree.TreeHeader treeHeader = 5;
}
message Response {
repeated string heads = 1;
repeated acl.RawChange changes = 2;
string treeId = 3;
repeated string snapshotPath = 4;
tree.TreeHeader treeHeader = 5;
}
}
}

File diff suppressed because it is too large Load Diff

19
syncproto/helpers.go Normal file
View File

@ -0,0 +1,19 @@
package syncproto
func WrapHeadUpdate(update *SyncHeadUpdate) *Sync {
return &Sync{Message: &SyncContentValue{
Value: &SyncContentValueValueOfHeadUpdate{HeadUpdate: update},
}}
}
func WrapFullRequest(request *SyncFullRequest) *Sync {
return &Sync{Message: &SyncContentValue{
Value: &SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request},
}}
}
func WrapFullResponse(response *SyncFullResponse) *Sync {
return &Sync{Message: &SyncContentValue{
Value: &SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response},
}}
}

View File

@ -2,6 +2,9 @@ syntax = "proto3";
package anytype; package anytype;
option go_package = "/syncproto"; option go_package = "/syncproto";
import "pkg/acl/aclchanges/aclpb/protos/aclchanges.proto";
import "pkg/acl/treestorage/treepb/protos/tree.proto";
message Message { message Message {
Header header = 1; Header header = 1;
bytes data = 2; bytes data = 2;
@ -20,7 +23,6 @@ enum MessageType {
MessageTypeSync = 2; MessageTypeSync = 2;
} }
message System { message System {
Handshake handshake = 1; Handshake handshake = 1;
Ping ping = 2; Ping ping = 2;
@ -60,5 +62,40 @@ message Subscription {
message Sync { message Sync {
string spaceId = 1; string spaceId = 1;
ContentValue message = 2;
message ContentValue {
oneof value {
HeadUpdate headUpdate = 1;
Full.Request fullSyncRequest = 2;
Full.Response fullSyncResponse = 3;
}
}
message HeadUpdate {
repeated string heads = 1;
repeated acl.RawChange changes = 2;
string treeId = 3;
repeated string snapshotPath = 4;
tree.TreeHeader treeHeader = 5;
}
message Full {
// here with send the request with all changes we have (we already know sender's snapshot path)
message Request {
repeated string heads = 1;
repeated acl.RawChange changes = 2;
string treeId = 3;
repeated string snapshotPath = 4;
tree.TreeHeader treeHeader = 5;
}
message Response {
repeated string heads = 1;
repeated acl.RawChange changes = 2;
string treeId = 3;
repeated string snapshotPath = 4;
tree.TreeHeader treeHeader = 5;
}
}
} }

File diff suppressed because it is too large Load Diff