diff --git a/client/clientspace/rpchandler.go b/client/clientspace/rpchandler.go index 8f291eaa..2e65adfd 100644 --- a/client/clientspace/rpchandler.go +++ b/client/clientspace/rpchandler.go @@ -26,7 +26,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac RecWithId: req.AclRoot, SpaceHeaderWithId: req.SpaceHeader, } - _, err = r.s.spaceStorageProvider.CreateSpaceStorage(payload) + st, err := r.s.spaceStorageProvider.CreateSpaceStorage(payload) if err != nil { err = spacesyncproto.ErrUnexpected if err == storage.ErrSpaceStorageExists { @@ -34,6 +34,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac } return } + st.Close() return } diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go index 28385791..2638dfb4 100644 --- a/common/commonspace/diffservice/diffsyncer.go +++ b/common/commonspace/diffservice/diffsyncer.go @@ -77,6 +77,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.pingTreesInCache(ctx, newIds) d.pingTreesInCache(ctx, changedIds) + d.pingTreesInCache(ctx, removedIds) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 3e83ee91..c84dc1c1 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -17,6 +17,7 @@ import ( tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" + "go.uber.org/zap" "sync" "sync/atomic" "time" @@ -190,8 +191,10 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene } func (s *space) Close() error { + log.With(zap.String("id", s.id)).Debug("space is closing") defer func() { s.isClosed.Store(true) + log.With(zap.String("id", s.id)).Debug("space closed") }() s.diffService.Close() s.syncService.Close() diff --git a/common/commonspace/spacesyncproto/spacesync.go b/common/commonspace/spacesyncproto/spacesync.go index c9351714..952faf4b 100644 --- a/common/commonspace/spacesyncproto/spacesync.go +++ b/common/commonspace/spacesyncproto/spacesync.go @@ -63,17 +63,18 @@ func WrapError(err error, rootChange *treechangeproto.RawTreeChangeWithId, treeI } } -func MessageDescription(msg *ObjectSyncMessage) string { +func MessageDescription(msg *ObjectSyncMessage) (res string) { content := msg.GetContent() switch { case content.GetHeadUpdate() != nil: - return fmt.Sprintf("head update/%v", content.GetHeadUpdate().Heads) + res = fmt.Sprintf("head update/%v", content.GetHeadUpdate().Heads) case content.GetFullSyncRequest() != nil: - return fmt.Sprintf("fullsync request/%v", content.GetFullSyncRequest().Heads) + res = fmt.Sprintf("fullsync request/%v", content.GetFullSyncRequest().Heads) case content.GetFullSyncResponse() != nil: - return fmt.Sprintf("fullsync response/%v", content.GetFullSyncResponse().Heads) + res = fmt.Sprintf("fullsync response/%v", content.GetFullSyncResponse().Heads) case content.GetErrorResponse() != nil: - return fmt.Sprintf("error response/%v", content.GetErrorResponse().Error) + res = fmt.Sprintf("error response/%v", content.GetErrorResponse().Error) } - return "" + res = fmt.Sprintf("%s/tracking=[%s]", res, msg.TrackingId) + return res } diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 1421839f..c139845f 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -20,18 +20,16 @@ const maxSimultaneousOperationsPerStream = 10 // StreamPool can be made generic to work with different streams type StreamPool interface { - Sender ocache.ObjectLastUsage AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) - HasActiveStream(peerId string) bool - Close() (err error) -} -type Sender interface { SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) + + HasActiveStream(peerId string) bool + Close() (err error) } type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) @@ -90,8 +88,10 @@ func (s *streamPool) SendSync( if err != nil { return } + log.With("trackingId", msg.TrackingId).Debug("waiting for id") // TODO: limit wait time here and remove the waiter reply = <-waiter.ch + log.With("trackingId", msg.TrackingId).Debug("finished waiting for id") return } @@ -215,15 +215,17 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre s.messageHandler(stream.Context(), peerId, msg) return } - + log.With("trackingId", msg.TrackingId).Debug("getting message with tracking id") s.waitersMx.Lock() waiter, exists := s.waiters[msg.TrackingId] if !exists { + log.With("trackingId", msg.TrackingId).Debug("tracking id not exists") s.waitersMx.Unlock() s.messageHandler(stream.Context(), peerId, msg) return } + log.With("trackingId", msg.TrackingId).Debug("tracking id exists") delete(s.waiters, msg.TrackingId) s.waitersMx.Unlock() diff --git a/node/nodespace/rpchandler.go b/node/nodespace/rpchandler.go index baefafda..1145e35c 100644 --- a/node/nodespace/rpchandler.go +++ b/node/nodespace/rpchandler.go @@ -25,7 +25,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac RecWithId: req.AclRoot, SpaceHeaderWithId: req.SpaceHeader, } - _, err = r.s.spaceStorageProvider.CreateSpaceStorage(payload) + st, err := r.s.spaceStorageProvider.CreateSpaceStorage(payload) if err != nil { err = spacesyncproto.ErrUnexpected if err == storage.ErrSpaceStorageExists { @@ -33,6 +33,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac } return } + st.Close() return } diff --git a/node/storage/spacestorage.go b/node/storage/spacestorage.go index 9ac3bfbb..b9406fb3 100644 --- a/node/storage/spacestorage.go +++ b/node/storage/spacestorage.go @@ -2,9 +2,11 @@ package storage import ( "github.com/akrylysov/pogreb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "go.uber.org/zap" "path" "sync" "time" @@ -14,6 +16,8 @@ var defPogrebOptions = &pogreb.Options{ BackgroundCompactionInterval: time.Minute * 5, } +var log = logger.NewNamed("storage.spacestorage") + type spaceStorage struct { spaceId string objDb *pogreb.DB @@ -24,6 +28,7 @@ type spaceStorage struct { } func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) { + log.With(zap.String("id", spaceId)).Debug("space storage opened with new") dbPath := path.Join(rootPath, spaceId) objDb, err := pogreb.Open(dbPath, defPogrebOptions) if err != nil { @@ -74,6 +79,7 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS } func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) { + log.With(zap.String("id", payload.SpaceHeaderWithId.Id)).Debug("space storage opened with create") dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id) db, err := pogreb.Open(dbPath, defPogrebOptions) if err != nil { @@ -165,5 +171,6 @@ func (s *spaceStorage) StoredIds() (ids []string, err error) { } func (s *spaceStorage) Close() (err error) { + log.With(zap.String("id", s.spaceId)).Debug("space storage closed") return s.objDb.Close() }