Change space methods (handle requests)

This commit is contained in:
mcrakhman 2023-06-05 15:09:17 +02:00
parent aff2061bd1
commit 85a093dd4a
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
3 changed files with 43 additions and 8 deletions

View File

@ -31,7 +31,7 @@ var log = logger.NewNamed(CName)
type ObjectSync interface {
LastUsage() time.Time
HandleMessage(ctx context.Context, hm HandleMessage) (err error)
HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
HandleRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
CloseThread(id string) (err error)
app.ComponentRunnable
}
@ -103,10 +103,12 @@ func (s *objectSync) LastUsage() time.Time {
return time.Time{}
}
func (s *objectSync) HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
hm.ReceiveTime = time.Now()
hm.StartHandlingTime = hm.ReceiveTime
return s.handleRequest(ctx, hm.SenderId, hm.Message)
func (s *objectSync) HandleRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return nil, err
}
return s.handleRequest(ctx, peerId, req)
}
func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {

View File

@ -73,6 +73,7 @@ func (r *requestManager) Close(ctx context.Context) (err error) {
}
func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
// TODO: limit concurrent sends?
return r.doRequest(ctx, peerId, req)
}

View File

@ -4,6 +4,8 @@ import (
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/headsync"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
@ -58,6 +60,8 @@ type Space interface {
Id() string
Init(ctx context.Context) error
Description() (desc SpaceDescription, err error)
TreeBuilder() objecttreebuilder.TreeBuilder
SyncStatus() syncstatus.StatusUpdater
Storage() spacestorage.SpaceStorage
@ -67,7 +71,8 @@ type Space interface {
DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error)
HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error)
HandleRequest(ctx context.Context, msg objectsync.HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
HandleSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
TryClose(objectTTL time.Duration) (close bool, err error)
Close() error
@ -86,6 +91,28 @@ type space struct {
syncStatus syncstatus.StatusProvider
settings settings.Settings
storage spacestorage.SpaceStorage
aclList list.AclList
}
func (s *space) Description() (desc SpaceDescription, err error) {
root := s.aclList.Root()
settingsStorage, err := s.storage.TreeStorage(s.storage.SpaceSettingsId())
if err != nil {
return
}
settingsRoot, err := settingsStorage.Root()
if err != nil {
return
}
desc = SpaceDescription{
SpaceHeader: s.header,
AclId: root.Id,
AclPayload: root.Payload,
SpaceSettingsId: settingsRoot.Id,
SpaceSettingsPayload: settingsRoot.RawChange,
}
return
}
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
@ -104,8 +131,12 @@ func (s *space) HandleMessage(ctx context.Context, msg objectsync.HandleMessage)
return s.objectSync.HandleMessage(ctx, msg)
}
func (s *space) HandleRequest(ctx context.Context, msg objectsync.HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
return s.objectSync.HandleRequest(ctx, msg)
func (s *space) HandleSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
return s.objectSync.HandleRequest(ctx, req)
}
func (s *space) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
return s.headSync.HandleRangeRequest(ctx, req)
}
func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder {
@ -127,6 +158,7 @@ func (s *space) Init(ctx context.Context) (err error) {
s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync)
s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.aclList = s.app.MustComponent(syncacl.CName).(list.AclList)
return nil
}