Storage close and diff fixes and logs
This commit is contained in:
parent
40ce73db68
commit
3886436841
@ -26,7 +26,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
|
||||
RecWithId: req.AclRoot,
|
||||
SpaceHeaderWithId: req.SpaceHeader,
|
||||
}
|
||||
_, err = r.s.spaceStorageProvider.CreateSpaceStorage(payload)
|
||||
st, err := r.s.spaceStorageProvider.CreateSpaceStorage(payload)
|
||||
if err != nil {
|
||||
err = spacesyncproto.ErrUnexpected
|
||||
if err == storage.ErrSpaceStorageExists {
|
||||
@ -34,6 +34,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
|
||||
}
|
||||
return
|
||||
}
|
||||
st.Close()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -77,6 +77,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
|
||||
d.pingTreesInCache(ctx, newIds)
|
||||
d.pingTreesInCache(ctx, changedIds)
|
||||
d.pingTreesInCache(ctx, removedIds)
|
||||
|
||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||
zap.Int("changedIds", len(changedIds)),
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -190,8 +191,10 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
|
||||
}
|
||||
|
||||
func (s *space) Close() error {
|
||||
log.With(zap.String("id", s.id)).Debug("space is closing")
|
||||
defer func() {
|
||||
s.isClosed.Store(true)
|
||||
log.With(zap.String("id", s.id)).Debug("space closed")
|
||||
}()
|
||||
s.diffService.Close()
|
||||
s.syncService.Close()
|
||||
|
||||
@ -63,17 +63,18 @@ func WrapError(err error, rootChange *treechangeproto.RawTreeChangeWithId, treeI
|
||||
}
|
||||
}
|
||||
|
||||
func MessageDescription(msg *ObjectSyncMessage) string {
|
||||
func MessageDescription(msg *ObjectSyncMessage) (res string) {
|
||||
content := msg.GetContent()
|
||||
switch {
|
||||
case content.GetHeadUpdate() != nil:
|
||||
return fmt.Sprintf("head update/%v", content.GetHeadUpdate().Heads)
|
||||
res = fmt.Sprintf("head update/%v", content.GetHeadUpdate().Heads)
|
||||
case content.GetFullSyncRequest() != nil:
|
||||
return fmt.Sprintf("fullsync request/%v", content.GetFullSyncRequest().Heads)
|
||||
res = fmt.Sprintf("fullsync request/%v", content.GetFullSyncRequest().Heads)
|
||||
case content.GetFullSyncResponse() != nil:
|
||||
return fmt.Sprintf("fullsync response/%v", content.GetFullSyncResponse().Heads)
|
||||
res = fmt.Sprintf("fullsync response/%v", content.GetFullSyncResponse().Heads)
|
||||
case content.GetErrorResponse() != nil:
|
||||
return fmt.Sprintf("error response/%v", content.GetErrorResponse().Error)
|
||||
res = fmt.Sprintf("error response/%v", content.GetErrorResponse().Error)
|
||||
}
|
||||
return ""
|
||||
res = fmt.Sprintf("%s/tracking=[%s]", res, msg.TrackingId)
|
||||
return res
|
||||
}
|
||||
|
||||
@ -20,18 +20,16 @@ const maxSimultaneousOperationsPerStream = 10
|
||||
|
||||
// StreamPool can be made generic to work with different streams
|
||||
type StreamPool interface {
|
||||
Sender
|
||||
ocache.ObjectLastUsage
|
||||
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
|
||||
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
|
||||
HasActiveStream(peerId string) bool
|
||||
Close() (err error)
|
||||
}
|
||||
|
||||
type Sender interface {
|
||||
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
||||
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
|
||||
HasActiveStream(peerId string) bool
|
||||
Close() (err error)
|
||||
}
|
||||
|
||||
type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
@ -90,8 +88,10 @@ func (s *streamPool) SendSync(
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.With("trackingId", msg.TrackingId).Debug("waiting for id")
|
||||
// TODO: limit wait time here and remove the waiter
|
||||
reply = <-waiter.ch
|
||||
log.With("trackingId", msg.TrackingId).Debug("finished waiting for id")
|
||||
return
|
||||
}
|
||||
|
||||
@ -215,15 +215,17 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
|
||||
s.messageHandler(stream.Context(), peerId, msg)
|
||||
return
|
||||
}
|
||||
|
||||
log.With("trackingId", msg.TrackingId).Debug("getting message with tracking id")
|
||||
s.waitersMx.Lock()
|
||||
waiter, exists := s.waiters[msg.TrackingId]
|
||||
|
||||
if !exists {
|
||||
log.With("trackingId", msg.TrackingId).Debug("tracking id not exists")
|
||||
s.waitersMx.Unlock()
|
||||
s.messageHandler(stream.Context(), peerId, msg)
|
||||
return
|
||||
}
|
||||
log.With("trackingId", msg.TrackingId).Debug("tracking id exists")
|
||||
|
||||
delete(s.waiters, msg.TrackingId)
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
@ -25,7 +25,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
|
||||
RecWithId: req.AclRoot,
|
||||
SpaceHeaderWithId: req.SpaceHeader,
|
||||
}
|
||||
_, err = r.s.spaceStorageProvider.CreateSpaceStorage(payload)
|
||||
st, err := r.s.spaceStorageProvider.CreateSpaceStorage(payload)
|
||||
if err != nil {
|
||||
err = spacesyncproto.ErrUnexpected
|
||||
if err == storage.ErrSpaceStorageExists {
|
||||
@ -33,6 +33,7 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
|
||||
}
|
||||
return
|
||||
}
|
||||
st.Close()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -2,9 +2,11 @@ package storage
|
||||
|
||||
import (
|
||||
"github.com/akrylysov/pogreb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
|
||||
"go.uber.org/zap"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
@ -14,6 +16,8 @@ var defPogrebOptions = &pogreb.Options{
|
||||
BackgroundCompactionInterval: time.Minute * 5,
|
||||
}
|
||||
|
||||
var log = logger.NewNamed("storage.spacestorage")
|
||||
|
||||
type spaceStorage struct {
|
||||
spaceId string
|
||||
objDb *pogreb.DB
|
||||
@ -24,6 +28,7 @@ type spaceStorage struct {
|
||||
}
|
||||
|
||||
func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) {
|
||||
log.With(zap.String("id", spaceId)).Debug("space storage opened with new")
|
||||
dbPath := path.Join(rootPath, spaceId)
|
||||
objDb, err := pogreb.Open(dbPath, defPogrebOptions)
|
||||
if err != nil {
|
||||
@ -74,6 +79,7 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS
|
||||
}
|
||||
|
||||
func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) {
|
||||
log.With(zap.String("id", payload.SpaceHeaderWithId.Id)).Debug("space storage opened with create")
|
||||
dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id)
|
||||
db, err := pogreb.Open(dbPath, defPogrebOptions)
|
||||
if err != nil {
|
||||
@ -165,5 +171,6 @@ func (s *spaceStorage) StoredIds() (ids []string, err error) {
|
||||
}
|
||||
|
||||
func (s *spaceStorage) Close() (err error) {
|
||||
log.With(zap.String("id", s.spaceId)).Debug("space storage closed")
|
||||
return s.objDb.Close()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user