From eb1049b160cfcc6ba329ff590e79297cc68d3e15 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 25 Jan 2023 23:35:20 +0300 Subject: [PATCH] debug/fixes --- commonspace/commongetter.go | 15 +++------ commonspace/headsync/headsync.go | 2 +- .../tree/synctree/synctreehandler_test.go | 7 ++-- commonspace/objectsync/msgpool.go | 3 +- commonspace/space.go | 33 ++++++++++++++++--- net/streampool/stream.go | 7 ++-- net/streampool/streampool.go | 29 ---------------- net/streampool/streampoolservice.go | 3 -- 8 files changed, 45 insertions(+), 54 deletions(-) diff --git a/commonspace/commongetter.go b/commonspace/commongetter.go index ee7e4b03..12abdf4c 100644 --- a/commonspace/commongetter.go +++ b/commonspace/commongetter.go @@ -5,7 +5,6 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/treegetter" - "golang.org/x/exp/slices" ) type commonGetter struct { @@ -22,9 +21,6 @@ func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter } func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) { - if object == nil { - panic("nil object") - } c.reservedObjects = append(c.reservedObjects, object) } @@ -36,13 +32,12 @@ func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (obj } func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject { - pos := slices.IndexFunc(c.reservedObjects, func(object syncobjectgetter.SyncObject) bool { - return object.Id() == id - }) - if pos == -1 { - return nil + for _, obj := range c.reservedObjects { + if obj != nil && obj.Id() == id { + return obj + } } - return c.reservedObjects[pos] + return nil } func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 0dc4fa89..50a18a99 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -58,7 +58,7 @@ func NewHeadSync( l := log.With(zap.String("spaceId", spaceId)) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l) - periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l) + periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l) return &headSync{ spaceId: spaceId, diff --git a/commonspace/object/tree/synctree/synctreehandler_test.go b/commonspace/object/tree/synctree/synctreehandler_test.go index 9b2e1120..0b50024c 100644 --- a/commonspace/object/tree/synctree/synctreehandler_test.go +++ b/commonspace/object/tree/synctree/synctreehandler_test.go @@ -3,6 +3,7 @@ package synctree import ( "context" "fmt" + "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree/mock_objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/mock_synctree" @@ -70,7 +71,7 @@ func (fx *syncHandlerFixture) stop() { func TestSyncHandler_HandleHeadUpdate(t *testing.T) { ctx := context.Background() - log = zap.NewNop().Sugar() + log = logger.CtxLogger{Logger: zap.NewNop()} t.Run("head update non empty all heads added", func(t *testing.T) { fx := newSyncHandlerFixture(t) @@ -207,7 +208,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { ctx := context.Background() - log = zap.NewNop().Sugar() + log = logger.CtxLogger{Logger: zap.NewNop()} t.Run("full sync request with change", func(t *testing.T) { fx := newSyncHandlerFixture(t) @@ -338,7 +339,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { func TestSyncHandler_HandleFullSyncResponse(t *testing.T) { ctx := context.Background() - log = zap.NewNop().Sugar() + log = logger.CtxLogger{Logger: zap.NewNop()} t.Run("full sync response with change", func(t *testing.T) { fx := newSyncHandlerFixture(t) diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index bca09e66..e9d4082c 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -2,6 +2,7 @@ package objectsync import ( "context" + "fmt" "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -78,7 +79,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn s.waitersMx.Unlock() log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting") - err = ctx.Err() + err = fmt.Errorf("sendSync context error: %v", ctx.Err()) case reply = <-waiter.ch: // success } diff --git a/commonspace/space.go b/commonspace/space.go index 9dda2ece..dab65d67 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -14,7 +14,6 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" - "github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" @@ -86,6 +85,7 @@ type Space interface { PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) DeleteTree(ctx context.Context, id string) (err error) + BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) HeadSync() headsync.HeadSync ObjectSync() objectsync.ObjectSync @@ -104,7 +104,7 @@ type space struct { headSync headsync.HeadSync syncStatus syncstatus.StatusUpdater storage spacestorage.SpaceStorage - cache treegetter.TreeGetter + cache *commonGetter account accountservice.Service aclList *syncacl.SyncAcl configuration nodeconf.Configuration @@ -171,6 +171,7 @@ func (s *space) Init(ctx context.Context) (err error) { return } s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool()) + s.cache.AddObject(s.aclList) deletionState := deletionstate.NewDeletionState(s.storage) deps := settings.Deps{ @@ -191,14 +192,13 @@ func (s *space) Init(ctx context.Context) (err error) { DeletionState: deletionState, } s.settingsObject = settings.NewSettingsObject(deps, s.id) - - objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsObject) - s.objectSync.Init(objectGetter) + s.objectSync.Init() s.headSync.Init(initialIds, deletionState) err = s.settingsObject.Init(ctx) if err != nil { return } + s.cache.AddObject(s.settingsObject) s.syncStatus.Run() return nil @@ -289,6 +289,11 @@ type BuildTreeOpts struct { WaitTreeRemoteSync bool } +type HistoryTreeOpts struct { + BeforeId string + Include bool +} + func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) { if s.isClosed.Load() { err = ErrSpaceClosed @@ -310,6 +315,24 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } +func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) { + if s.isClosed.Load() { + err = ErrSpaceClosed + return + } + + params := objecttree.HistoryTreeParams{ + AclList: s.aclList, + BeforeId: opts.BeforeId, + IncludeBeforeId: opts.Include, + } + params.TreeStorage, err = s.storage.TreeStorage(id) + if err != nil { + return + } + return objecttree.BuildHistoryTree(params) +} + func (s *space) DeleteTree(ctx context.Context, id string) (err error) { return s.settingsObject.DeleteObject(id) } diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 56e221bd..06dbb0c9 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -2,12 +2,15 @@ package streampool import ( "context" + "fmt" "github.com/anytypeio/any-sync/app/logger" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" ) +var msgCounter atomic.Uint32 + type stream struct { peerId string stream drpc.Stream @@ -37,8 +40,8 @@ func (sr *stream) readLoop() error { return err } ctx := streamCtx(context.Background(), sr.streamId, sr.peerId) - ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "streamMessage"), zap.String("peerId", sr.peerId)) - if err := sr.pool.HandleMessage(ctx, sr.peerId, msg); err != nil { + ctx = logger.CtxWithFields(ctx, zap.String("rootOp", fmt.Sprintf("streamMsg.%d", msgCounter.Add(1))), zap.String("peerId", sr.peerId)) + if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil { sr.l.Info("msg handle error", zap.Error(err)) return err } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index f346fb30..c718c097 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" - "github.com/cheggaaa/mb/v3" "go.uber.org/zap" "golang.org/x/exp/slices" "golang.org/x/net/context" @@ -49,7 +48,6 @@ type streamPool struct { streams map[uint32]*stream opening map[string]*openingProcess exec *sendPool - handleQueue *mb.MB[handleMessage] mu sync.RWMutex lastStreamId uint32 } @@ -64,13 +62,6 @@ type handleMessage struct { peerId string } -func (s *streamPool) init() { - // TODO: to config - for i := 0; i < 10; i++ { - go s.handleMessageLoop() - } -} - func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { st := s.addStream(peerId, drpcStream, tags...) return st.readLoop() @@ -308,26 +299,6 @@ func (s *streamPool) removeStream(streamId uint32) { st.l.Debug("stream removed", zap.Strings("tags", st.tags)) } -func (s *streamPool) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error) { - return s.handleQueue.Add(ctx, handleMessage{ - ctx: ctx, - msg: msg, - peerId: peerId, - }) -} - -func (s *streamPool) handleMessageLoop() { - for { - hm, err := s.handleQueue.WaitOne(context.Background()) - if err != nil { - return - } - if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil { - log.WarnCtx(hm.ctx, "handle message error", zap.Error(err)) - } - } -} - func (s *streamPool) Close() (err error) { return s.exec.Close() } diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index c7cc0c8d..e4d5965f 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -3,7 +3,6 @@ package streampool import ( "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app/logger" - "github.com/cheggaaa/mb/v3" ) const CName = "common.net.streampool" @@ -30,9 +29,7 @@ func (s *service) NewStreamPool(h StreamHandler) StreamPool { streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, exec: newStreamSender(10, 100), - handleQueue: mb.New[handleMessage](100), } - sp.init() return sp }