commit
3b837b4b6b
@ -15,7 +15,6 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/net/peer"
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
"github.com/anytypeio/any-sync/nodeconf"
|
"github.com/anytypeio/any-sync/nodeconf"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"sync/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -46,7 +45,7 @@ type syncTree struct {
|
|||||||
syncStatus syncstatus.StatusUpdater
|
syncStatus syncstatus.StatusUpdater
|
||||||
notifiable HeadNotifiable
|
notifiable HeadNotifiable
|
||||||
listener updatelistener.UpdateListener
|
listener updatelistener.UpdateListener
|
||||||
treeUsage *atomic.Int32
|
onClose func(id string)
|
||||||
isClosed bool
|
isClosed bool
|
||||||
isDeleted bool
|
isDeleted bool
|
||||||
}
|
}
|
||||||
@ -69,7 +68,7 @@ type BuildDeps struct {
|
|||||||
AclList list.AclList
|
AclList list.AclList
|
||||||
SpaceStorage spacestorage.SpaceStorage
|
SpaceStorage spacestorage.SpaceStorage
|
||||||
TreeStorage treestorage.TreeStorage
|
TreeStorage treestorage.TreeStorage
|
||||||
TreeUsage *atomic.Int32
|
OnClose func(id string)
|
||||||
SyncStatus syncstatus.StatusUpdater
|
SyncStatus syncstatus.StatusUpdater
|
||||||
PeerGetter ResponsiblePeersGetter
|
PeerGetter ResponsiblePeersGetter
|
||||||
WaitTreeRemoteSync bool
|
WaitTreeRemoteSync bool
|
||||||
@ -106,7 +105,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
|||||||
ObjectTree: objTree,
|
ObjectTree: objTree,
|
||||||
syncClient: syncClient,
|
syncClient: syncClient,
|
||||||
notifiable: deps.HeadNotifiable,
|
notifiable: deps.HeadNotifiable,
|
||||||
treeUsage: deps.TreeUsage,
|
onClose: deps.OnClose,
|
||||||
listener: deps.Listener,
|
listener: deps.Listener,
|
||||||
syncStatus: deps.SyncStatus,
|
syncStatus: deps.SyncStatus,
|
||||||
}
|
}
|
||||||
@ -213,7 +212,7 @@ func (s *syncTree) Close() (err error) {
|
|||||||
if s.isClosed {
|
if s.isClosed {
|
||||||
return ErrSyncTreeClosed
|
return ErrSyncTreeClosed
|
||||||
}
|
}
|
||||||
s.treeUsage.Add(-1)
|
s.onClose(s.Id())
|
||||||
s.isClosed = true
|
s.isClosed = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -239,7 +238,6 @@ func (s *syncTree) afterBuild() {
|
|||||||
if s.listener != nil {
|
if s.listener != nil {
|
||||||
s.listener.Rebuild(s)
|
s.listener.Rebuild(s)
|
||||||
}
|
}
|
||||||
s.treeUsage.Add(1)
|
|
||||||
if s.notifiable != nil {
|
if s.notifiable != nil {
|
||||||
s.notifiable.UpdateHeads(s.Id(), s.Heads())
|
s.notifiable.UpdateHeads(s.Id(), s.Heads())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -128,7 +128,7 @@ type space struct {
|
|||||||
handleQueue multiqueue.MultiQueue[HandleMessage]
|
handleQueue multiqueue.MultiQueue[HandleMessage]
|
||||||
|
|
||||||
isClosed *atomic.Bool
|
isClosed *atomic.Bool
|
||||||
treesUsed atomic.Int32
|
treesUsed *atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *space) LastUsage() time.Time {
|
func (s *space) LastUsage() time.Time {
|
||||||
@ -295,7 +295,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea
|
|||||||
Listener: listener,
|
Listener: listener,
|
||||||
AclList: s.aclList,
|
AclList: s.aclList,
|
||||||
SpaceStorage: s.storage,
|
SpaceStorage: s.storage,
|
||||||
TreeUsage: &s.treesUsed,
|
OnClose: func(id string) {},
|
||||||
SyncStatus: s.syncStatus,
|
SyncStatus: s.syncStatus,
|
||||||
PeerGetter: s.peerManager,
|
PeerGetter: s.peerManager,
|
||||||
}
|
}
|
||||||
@ -326,12 +326,16 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
|
|||||||
Listener: opts.Listener,
|
Listener: opts.Listener,
|
||||||
AclList: s.aclList,
|
AclList: s.aclList,
|
||||||
SpaceStorage: s.storage,
|
SpaceStorage: s.storage,
|
||||||
TreeUsage: &s.treesUsed,
|
OnClose: s.onObjectClose,
|
||||||
SyncStatus: s.syncStatus,
|
SyncStatus: s.syncStatus,
|
||||||
WaitTreeRemoteSync: opts.WaitTreeRemoteSync,
|
WaitTreeRemoteSync: opts.WaitTreeRemoteSync,
|
||||||
PeerGetter: s.peerManager,
|
PeerGetter: s.peerManager,
|
||||||
}
|
}
|
||||||
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
|
if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.treesUsed.Add(1)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) {
|
func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) {
|
||||||
@ -388,10 +392,19 @@ func (s *space) handleMessage(msg HandleMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
|
if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
|
||||||
|
if msg.Message.ObjectId != "" {
|
||||||
|
// cleanup thread on error
|
||||||
|
_ = s.handleQueue.CloseThread(msg.Message.ObjectId)
|
||||||
|
}
|
||||||
log.InfoCtx(ctx, "handleMessage error", zap.Error(err))
|
log.InfoCtx(ctx, "handleMessage error", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *space) onObjectClose(id string) {
|
||||||
|
s.treesUsed.Add(-1)
|
||||||
|
_ = s.handleQueue.CloseThread(id)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *space) Close() error {
|
func (s *space) Close() error {
|
||||||
if s.isClosed.Swap(true) {
|
if s.isClosed.Swap(true) {
|
||||||
log.Warn("call space.Close on closed space", zap.String("id", s.id))
|
log.Warn("call space.Close on closed space", zap.String("id", s.id))
|
||||||
|
|||||||
@ -143,6 +143,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
|||||||
configuration: lastConfiguration,
|
configuration: lastConfiguration,
|
||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
storage: st,
|
storage: st,
|
||||||
|
treesUsed: &atomic.Int32{},
|
||||||
isClosed: spaceIsClosed,
|
isClosed: spaceIsClosed,
|
||||||
}
|
}
|
||||||
return sp, nil
|
return sp, nil
|
||||||
|
|||||||
@ -100,7 +100,7 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) {
|
func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) {
|
||||||
tcpConn, err := net.Dial("tcp", addr)
|
tcpConn, err := net.DialTimeout("tcp", addr, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user