debug/fixes
This commit is contained in:
parent
81bdd3fefb
commit
eb1049b160
@ -5,7 +5,6 @@ import (
|
||||
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
type commonGetter struct {
|
||||
@ -22,9 +21,6 @@ func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter
|
||||
}
|
||||
|
||||
func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
|
||||
if object == nil {
|
||||
panic("nil object")
|
||||
}
|
||||
c.reservedObjects = append(c.reservedObjects, object)
|
||||
}
|
||||
|
||||
@ -36,13 +32,12 @@ func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (obj
|
||||
}
|
||||
|
||||
func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject {
|
||||
pos := slices.IndexFunc(c.reservedObjects, func(object syncobjectgetter.SyncObject) bool {
|
||||
return object.Id() == id
|
||||
})
|
||||
if pos == -1 {
|
||||
return nil
|
||||
for _, obj := range c.reservedObjects {
|
||||
if obj != nil && obj.Id() == id {
|
||||
return obj
|
||||
}
|
||||
}
|
||||
return c.reservedObjects[pos]
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) {
|
||||
|
||||
@ -58,7 +58,7 @@ func NewHeadSync(
|
||||
l := log.With(zap.String("spaceId", spaceId))
|
||||
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
||||
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l)
|
||||
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
|
||||
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l)
|
||||
|
||||
return &headSync{
|
||||
spaceId: spaceId,
|
||||
|
||||
@ -3,6 +3,7 @@ package synctree
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/mock_synctree"
|
||||
@ -70,7 +71,7 @@ func (fx *syncHandlerFixture) stop() {
|
||||
|
||||
func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
log = zap.NewNop().Sugar()
|
||||
log = logger.CtxLogger{Logger: zap.NewNop()}
|
||||
|
||||
t.Run("head update non empty all heads added", func(t *testing.T) {
|
||||
fx := newSyncHandlerFixture(t)
|
||||
@ -207,7 +208,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
||||
|
||||
func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
log = zap.NewNop().Sugar()
|
||||
log = logger.CtxLogger{Logger: zap.NewNop()}
|
||||
|
||||
t.Run("full sync request with change", func(t *testing.T) {
|
||||
fx := newSyncHandlerFixture(t)
|
||||
@ -338,7 +339,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
|
||||
|
||||
func TestSyncHandler_HandleFullSyncResponse(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
log = zap.NewNop().Sugar()
|
||||
log = logger.CtxLogger{Logger: zap.NewNop()}
|
||||
|
||||
t.Run("full sync response with change", func(t *testing.T) {
|
||||
fx := newSyncHandlerFixture(t)
|
||||
|
||||
@ -2,6 +2,7 @@ package objectsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/ocache"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
@ -78,7 +79,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting")
|
||||
err = ctx.Err()
|
||||
err = fmt.Errorf("sendSync context error: %v", ctx.Err())
|
||||
case reply = <-waiter.ch:
|
||||
// success
|
||||
}
|
||||
|
||||
@ -14,7 +14,6 @@ import (
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||
"github.com/anytypeio/any-sync/commonspace/settings"
|
||||
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
||||
@ -86,6 +85,7 @@ type Space interface {
|
||||
PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error)
|
||||
BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error)
|
||||
DeleteTree(ctx context.Context, id string) (err error)
|
||||
BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error)
|
||||
|
||||
HeadSync() headsync.HeadSync
|
||||
ObjectSync() objectsync.ObjectSync
|
||||
@ -104,7 +104,7 @@ type space struct {
|
||||
headSync headsync.HeadSync
|
||||
syncStatus syncstatus.StatusUpdater
|
||||
storage spacestorage.SpaceStorage
|
||||
cache treegetter.TreeGetter
|
||||
cache *commonGetter
|
||||
account accountservice.Service
|
||||
aclList *syncacl.SyncAcl
|
||||
configuration nodeconf.Configuration
|
||||
@ -171,6 +171,7 @@ func (s *space) Init(ctx context.Context) (err error) {
|
||||
return
|
||||
}
|
||||
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool())
|
||||
s.cache.AddObject(s.aclList)
|
||||
|
||||
deletionState := deletionstate.NewDeletionState(s.storage)
|
||||
deps := settings.Deps{
|
||||
@ -191,14 +192,13 @@ func (s *space) Init(ctx context.Context) (err error) {
|
||||
DeletionState: deletionState,
|
||||
}
|
||||
s.settingsObject = settings.NewSettingsObject(deps, s.id)
|
||||
|
||||
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsObject)
|
||||
s.objectSync.Init(objectGetter)
|
||||
s.objectSync.Init()
|
||||
s.headSync.Init(initialIds, deletionState)
|
||||
err = s.settingsObject.Init(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.cache.AddObject(s.settingsObject)
|
||||
s.syncStatus.Run()
|
||||
|
||||
return nil
|
||||
@ -289,6 +289,11 @@ type BuildTreeOpts struct {
|
||||
WaitTreeRemoteSync bool
|
||||
}
|
||||
|
||||
type HistoryTreeOpts struct {
|
||||
BeforeId string
|
||||
Include bool
|
||||
}
|
||||
|
||||
func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) {
|
||||
if s.isClosed.Load() {
|
||||
err = ErrSpaceClosed
|
||||
@ -310,6 +315,24 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
|
||||
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
|
||||
}
|
||||
|
||||
func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) {
|
||||
if s.isClosed.Load() {
|
||||
err = ErrSpaceClosed
|
||||
return
|
||||
}
|
||||
|
||||
params := objecttree.HistoryTreeParams{
|
||||
AclList: s.aclList,
|
||||
BeforeId: opts.BeforeId,
|
||||
IncludeBeforeId: opts.Include,
|
||||
}
|
||||
params.TreeStorage, err = s.storage.TreeStorage(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return objecttree.BuildHistoryTree(params)
|
||||
}
|
||||
|
||||
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
|
||||
return s.settingsObject.DeleteObject(id)
|
||||
}
|
||||
|
||||
@ -2,12 +2,15 @@ package streampool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"go.uber.org/zap"
|
||||
"storj.io/drpc"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
var msgCounter atomic.Uint32
|
||||
|
||||
type stream struct {
|
||||
peerId string
|
||||
stream drpc.Stream
|
||||
@ -37,8 +40,8 @@ func (sr *stream) readLoop() error {
|
||||
return err
|
||||
}
|
||||
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId)
|
||||
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "streamMessage"), zap.String("peerId", sr.peerId))
|
||||
if err := sr.pool.HandleMessage(ctx, sr.peerId, msg); err != nil {
|
||||
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", fmt.Sprintf("streamMsg.%d", msgCounter.Add(1))), zap.String("peerId", sr.peerId))
|
||||
if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil {
|
||||
sr.l.Info("msg handle error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/pool"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/net/context"
|
||||
@ -49,7 +48,6 @@ type streamPool struct {
|
||||
streams map[uint32]*stream
|
||||
opening map[string]*openingProcess
|
||||
exec *sendPool
|
||||
handleQueue *mb.MB[handleMessage]
|
||||
mu sync.RWMutex
|
||||
lastStreamId uint32
|
||||
}
|
||||
@ -64,13 +62,6 @@ type handleMessage struct {
|
||||
peerId string
|
||||
}
|
||||
|
||||
func (s *streamPool) init() {
|
||||
// TODO: to config
|
||||
for i := 0; i < 10; i++ {
|
||||
go s.handleMessageLoop()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error {
|
||||
st := s.addStream(peerId, drpcStream, tags...)
|
||||
return st.readLoop()
|
||||
@ -308,26 +299,6 @@ func (s *streamPool) removeStream(streamId uint32) {
|
||||
st.l.Debug("stream removed", zap.Strings("tags", st.tags))
|
||||
}
|
||||
|
||||
func (s *streamPool) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error) {
|
||||
return s.handleQueue.Add(ctx, handleMessage{
|
||||
ctx: ctx,
|
||||
msg: msg,
|
||||
peerId: peerId,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *streamPool) handleMessageLoop() {
|
||||
for {
|
||||
hm, err := s.handleQueue.WaitOne(context.Background())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil {
|
||||
log.WarnCtx(hm.ctx, "handle message error", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamPool) Close() (err error) {
|
||||
return s.exec.Close()
|
||||
}
|
||||
|
||||
@ -3,7 +3,6 @@ package streampool
|
||||
import (
|
||||
"github.com/anytypeio/any-sync/app"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
)
|
||||
|
||||
const CName = "common.net.streampool"
|
||||
@ -30,9 +29,7 @@ func (s *service) NewStreamPool(h StreamHandler) StreamPool {
|
||||
streams: map[uint32]*stream{},
|
||||
opening: map[string]*openingProcess{},
|
||||
exec: newStreamSender(10, 100),
|
||||
handleQueue: mb.New[handleMessage](100),
|
||||
}
|
||||
sp.init()
|
||||
return sp
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user