Some renamings and fixes

This commit is contained in:
mcrakhman 2022-09-18 11:23:01 +02:00 committed by Mikhail Iudin
parent f3a2944d60
commit 4270bd25b5
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
7 changed files with 12 additions and 10 deletions

View File

@ -50,6 +50,7 @@ func (s *space) Init(ctx context.Context) error {
s.rpc = &rpcHandler{s: s} s.rpc = &rpcHandler{s: s}
s.diffService.Init(s.getObjectIds()) s.diffService.Init(s.getObjectIds())
s.syncService.Init() s.syncService.Init()
// basically this provides access for the external cache to use space's tree building functions
s.cache.SetBuildFunc(s.BuildTree) s.cache.SetBuildFunc(s.BuildTree)
return nil return nil
} }
@ -67,7 +68,7 @@ func (s *space) DiffService() diffservice.DiffService {
} }
func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener synctree.UpdateListener) (tree.ObjectTree, error) { func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener synctree.UpdateListener) (tree.ObjectTree, error) {
return synctree.CreateSyncTree(ctx, payload, s.syncService, listener, nil, s.storage.CreateTreeStorage) return synctree.CreateSyncTree(ctx, payload, s.syncService, listener, s.aclList, s.storage.CreateTreeStorage)
} }
func (s *space) BuildTree(ctx context.Context, id string, listener synctree.UpdateListener) (t tree.ObjectTree, err error) { func (s *space) BuildTree(ctx context.Context, id string, listener synctree.UpdateListener) (t tree.ObjectTree, err error) {
@ -109,7 +110,6 @@ func (s *space) BuildTree(ctx context.Context, id string, listener synctree.Upda
if err != nil { if err != nil {
return return
} }
// TODO: maybe it is better to use the tree that we already built and just replace the storage
// now we are sure that we can save it to the storage // now we are sure that we can save it to the storage
store, err = s.storage.CreateTreeStorage(payload) store, err = s.storage.CreateTreeStorage(payload)
if err != nil { if err != nil {

View File

@ -0,0 +1 @@
package spacetree

View File

@ -21,7 +21,7 @@ type StreamPool interface {
SyncClient SyncClient
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
HasStream(peerId string) bool HasActiveStream(peerId string) bool
Close() (err error) Close() (err error)
} }
@ -55,8 +55,8 @@ func newStreamPool(messageHandler MessageHandler) StreamPool {
} }
} }
func (s *streamPool) HasStream(peerId string) (res bool) { func (s *streamPool) HasActiveStream(peerId string) (res bool) {
_, err := s.getStream(peerId) _, err := s.getOrDeleteStream(peerId)
return err == nil return err == nil
} }
@ -83,7 +83,7 @@ func (s *streamPool) SendSync(
} }
func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) { func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
stream, err := s.getStream(peerId) stream, err := s.getOrDeleteStream(peerId)
if err != nil { if err != nil {
return return
} }
@ -91,7 +91,7 @@ func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSync
return stream.Send(message) return stream.Send(message)
} }
func (s *streamPool) getStream(id string) (stream spacesyncproto.SpaceStream, err error) { func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
stream, exists := s.peerStreams[id] stream, exists := s.peerStreams[id]

View File

@ -85,7 +85,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
return return
} }
for _, peer := range respPeers { for _, peer := range respPeers {
if s.streamPool.HasStream(peer.Id()) { if s.streamPool.HasActiveStream(peer.Id()) {
continue continue
} }
cl := spacesyncproto.NewDRPCSpaceClient(peer) cl := spacesyncproto.NewDRPCSpaceClient(peer)

View File

@ -15,6 +15,7 @@ type UpdateListener interface {
Rebuild(tree tree.ObjectTree) Rebuild(tree tree.ObjectTree)
} }
// SyncTree sends head updates to sync service and also sends new changes to update listener
type SyncTree struct { type SyncTree struct {
tree.ObjectTree tree.ObjectTree
syncService syncservice.SyncService syncService syncservice.SyncService

View File

@ -43,6 +43,7 @@ type ObjectTree interface {
Heads() []string Heads() []string
Root() *Change Root() *Change
HasChange(string) bool HasChange(string) bool
DebugDump() (string, error)
Iterate(convert ChangeConvertFunc, iterate ChangeIterateFunc) error Iterate(convert ChangeConvertFunc, iterate ChangeIterateFunc) error
IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error
@ -51,7 +52,6 @@ type ObjectTree interface {
ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*aclpb.RawTreeChangeWithId, error) ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*aclpb.RawTreeChangeWithId, error)
Storage() storage.TreeStorage Storage() storage.TreeStorage
DebugDump() (string, error)
AddContent(ctx context.Context, content SignableChangeContent) (AddResult, error) AddContent(ctx context.Context, content SignableChangeContent) (AddResult, error)
AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeChangeWithId) (AddResult, error) AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeChangeWithId) (AddResult, error)

View File

@ -12,6 +12,6 @@ func ValidateRawTree(payload storage.TreeStorageCreatePayload, aclList list.ACLL
return return
} }
_, err = BuildObjectTree(treeStorage, nil, aclList) _, err = BuildObjectTree(treeStorage, aclList)
return return
} }