From 67d535362fbebfff6592493b1fb36d2fdcf2d8a0 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 6 Jun 2023 17:18:59 +0200 Subject: [PATCH] Different fixes --- .../object/tree/synctree/synctreehandler.go | 2 +- commonspace/payloads.go | 18 ++++++++---------- commonspace/requestmanager/requestmanager.go | 3 ++- commonspace/space.go | 3 ++- commonspace/spacestorage/inmemorystorage.go | 13 +++++++++---- commonspace/spacestorage/spacestorage.go | 4 +--- net/streampool/streampool.go | 2 ++ 7 files changed, 25 insertions(+), 20 deletions(-) diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 5953ca97..43e764cc 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -92,7 +92,7 @@ func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (e case content.GetHeadUpdate() != nil: var syncReq *treechangeproto.TreeSyncMessage syncReq, err = s.syncProtocol.HeadUpdate(ctx, senderId, content.GetHeadUpdate()) - if err != nil { + if err != nil || syncReq == nil { return } return s.syncClient.QueueRequest(senderId, treeId, syncReq) diff --git a/commonspace/payloads.go b/commonspace/payloads.go index 5d2284d0..8d1c334c 100644 --- a/commonspace/payloads.go +++ b/commonspace/payloads.go @@ -214,14 +214,15 @@ func validateSpaceStorageCreatePayload(payload spacestorage.SpaceStorageCreatePa } func ValidateSpaceHeader(rawHeaderWithId *spacesyncproto.RawSpaceHeaderWithId, identity crypto.PubKey) (err error) { + if rawHeaderWithId == nil { + return spacestorage.ErrIncorrectSpaceHeader + } sepIdx := strings.Index(rawHeaderWithId.Id, ".") if sepIdx == -1 { - err = spacestorage.ErrIncorrectSpaceHeader - return + return spacestorage.ErrIncorrectSpaceHeader } if !cidutil.VerifyCid(rawHeaderWithId.RawHeader, rawHeaderWithId.Id[:sepIdx]) { - err = objecttree.ErrIncorrectCid - return + return objecttree.ErrIncorrectCid } var rawSpaceHeader spacesyncproto.RawSpaceHeader err = proto.Unmarshal(rawHeaderWithId.RawHeader, &rawSpaceHeader) @@ -239,19 +240,16 @@ func ValidateSpaceHeader(rawHeaderWithId *spacesyncproto.RawSpaceHeaderWithId, i } res, err := payloadIdentity.Verify(rawSpaceHeader.SpaceHeader, rawSpaceHeader.Signature) if err != nil || !res { - err = spacestorage.ErrIncorrectSpaceHeader - return + return spacestorage.ErrIncorrectSpaceHeader } if rawHeaderWithId.Id[sepIdx+1:] != strconv.FormatUint(header.ReplicationKey, 36) { - err = spacestorage.ErrIncorrectSpaceHeader - return + return spacestorage.ErrIncorrectSpaceHeader } if identity == nil { return } if !payloadIdentity.Equals(identity) { - err = ErrIncorrectIdentity - return + return ErrIncorrectIdentity } return } diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go index f28ae3ef..ac4351c0 100644 --- a/commonspace/requestmanager/requestmanager.go +++ b/commonspace/requestmanager/requestmanager.go @@ -82,8 +82,9 @@ func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectS defer r.Unlock() pl, exists := r.pools[peerId] if !exists { - pl := streampool.NewExecPool(r.workers, r.queueSize) + pl = streampool.NewExecPool(r.workers, r.queueSize) r.pools[peerId] = pl + pl.Run() } // TODO: for later think when many clients are there, // we need to close pools for inactive clients diff --git a/commonspace/space.go b/commonspace/space.go index d2bbc68b..ef5119de 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -169,7 +169,8 @@ func (s *space) Init(ctx context.Context) (err error) { s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync) s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) s.aclList = s.app.MustComponent(syncacl.CName).(list.AclList) - return nil + s.header, err = s.storage.SpaceHeader() + return } func (s *space) SyncStatus() syncstatus.StatusUpdater { diff --git a/commonspace/spacestorage/inmemorystorage.go b/commonspace/spacestorage/inmemorystorage.go index db1d1166..096f5a84 100644 --- a/commonspace/spacestorage/inmemorystorage.go +++ b/commonspace/spacestorage/inmemorystorage.go @@ -1,6 +1,7 @@ package spacestorage import ( + "context" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anyproto/any-sync/commonspace/object/acl/liststorage" @@ -22,6 +23,14 @@ type InMemorySpaceStorage struct { sync.Mutex } +func (i *InMemorySpaceStorage) Run(ctx context.Context) (err error) { + return nil +} + +func (i *InMemorySpaceStorage) Close(ctx context.Context) (err error) { + return nil +} + func (i *InMemorySpaceStorage) Init(a *app.App) (err error) { return nil } @@ -157,10 +166,6 @@ func (i *InMemorySpaceStorage) ReadSpaceHash() (hash string, err error) { return i.spaceHash, nil } -func (i *InMemorySpaceStorage) Close() error { - return nil -} - func (i *InMemorySpaceStorage) AllTrees() map[string]treestorage.TreeStorage { i.Lock() defer i.Unlock() diff --git a/commonspace/spacestorage/spacestorage.go b/commonspace/spacestorage/spacestorage.go index a4ffaecb..e2807f5a 100644 --- a/commonspace/spacestorage/spacestorage.go +++ b/commonspace/spacestorage/spacestorage.go @@ -28,7 +28,7 @@ const ( ) type SpaceStorage interface { - app.Component + app.ComponentRunnable Id() string SetSpaceDeleted() error IsSpaceDeleted() (bool, error) @@ -44,8 +44,6 @@ type SpaceStorage interface { CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error) WriteSpaceHash(hash string) error ReadSpaceHash() (hash string, err error) - - Close() error } type SpaceStorageCreatePayload struct { diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 50a020b3..54c513b9 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -250,6 +250,8 @@ func (s *streamPool) openStream(ctx context.Context, p peer.Peer) *openingProces close(op.ch) delete(s.opening, p.Id()) }() + // in case there was no peerId in context + ctx := peer.CtxWithPeerId(ctx, p.Id()) // open new stream and add to pool st, tags, err := s.handler.OpenStream(ctx, p) if err != nil {