Update status service
This commit is contained in:
parent
322a8d144a
commit
ad87d5e545
@ -6,6 +6,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
|
||||
@ -100,8 +101,9 @@ func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ns.StatusService().SetUpdater(func(ctx context.Context, treeId string, status bool) (err error) {
|
||||
log.With(zap.String("treeId", treeId), zap.Bool("synced", status)).Debug("updating sync status")
|
||||
ns.StatusService().SetUpdater(func(ctx context.Context, treeId string, status statusservice.SyncStatus) (err error) {
|
||||
log.With(zap.String("treeId", treeId), zap.Bool("synced", status == statusservice.SyncStatusSynced)).
|
||||
Debug("updating sync status")
|
||||
return
|
||||
})
|
||||
if err = ns.Init(ctx); err != nil {
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
@ -49,12 +50,13 @@ func NewDiffService(
|
||||
storage storage.SpaceStorage,
|
||||
confConnector nodeconf.ConfConnector,
|
||||
cache treegetter.TreeGetter,
|
||||
statusService statusservice.StatusService,
|
||||
log *zap.Logger) DiffService {
|
||||
|
||||
diff := ldiff.New(16, 16)
|
||||
l := log.With(zap.String("spaceId", spaceId))
|
||||
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
|
||||
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, l)
|
||||
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, statusService, l)
|
||||
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
|
||||
|
||||
return &diffService{
|
||||
|
||||
@ -31,6 +31,7 @@ func newDiffSyncer(
|
||||
cache treegetter.TreeGetter,
|
||||
storage storage.SpaceStorage,
|
||||
clientFactory spacesyncproto.ClientFactory,
|
||||
statusService statusservice.StatusService,
|
||||
log *zap.Logger) DiffSyncer {
|
||||
return &diffSyncer{
|
||||
diff: diff,
|
||||
@ -40,6 +41,7 @@ func newDiffSyncer(
|
||||
confConnector: confConnector,
|
||||
clientFactory: clientFactory,
|
||||
log: log,
|
||||
statusService: statusService,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -110,7 +110,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
||||
spaceId := "spaceId"
|
||||
aclRootId := "aclRootId"
|
||||
l := logger.NewNamed(spaceId)
|
||||
diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, l)
|
||||
diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, nil, l)
|
||||
delState.EXPECT().AddObserver(gomock.Any())
|
||||
diffSyncer.Init(delState)
|
||||
|
||||
|
||||
@ -115,10 +115,10 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
|
||||
// TODO: maybe change this to dependency injection where we would inject the method `ProvideStatusService`
|
||||
// and for nodes there would be NoOpStatusService
|
||||
if !lastConfiguration.IsResponsible(st.Id()) {
|
||||
statusService = statusservice.NewStatusService(st.Id(), lastConfiguration)
|
||||
statusService = statusservice.NewStatusService(st.Id(), lastConfiguration, st)
|
||||
}
|
||||
|
||||
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log)
|
||||
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, statusService, log)
|
||||
syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod)
|
||||
sp := &space{
|
||||
id: id,
|
||||
|
||||
@ -2,9 +2,14 @@ package statusservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"sync"
|
||||
"time"
|
||||
@ -17,12 +22,12 @@ const (
|
||||
|
||||
var log = logger.NewNamed("commonspace.statusservice")
|
||||
|
||||
type Updater func(ctx context.Context, treeId string, status bool) (err error)
|
||||
type Updater func(ctx context.Context, treeId string, status SyncStatus) (err error)
|
||||
|
||||
type StatusService interface {
|
||||
HeadsChange(treeId string, heads []string)
|
||||
HeadsReceive(senderId, treeId string, heads []string)
|
||||
Watch(treeId string)
|
||||
Watch(treeId string) (err error)
|
||||
Unwatch(treeId string)
|
||||
StateCounter() uint64
|
||||
RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64)
|
||||
@ -32,14 +37,24 @@ type StatusService interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
type statusEntry struct {
|
||||
head string
|
||||
type SyncStatus int
|
||||
|
||||
const (
|
||||
SyncStatusUnknown SyncStatus = iota
|
||||
SyncStatusSynced
|
||||
SyncStatusNotSynced
|
||||
)
|
||||
|
||||
type treeHeadsEntry struct {
|
||||
heads []string
|
||||
stateCounter uint64
|
||||
syncStatus SyncStatus
|
||||
}
|
||||
|
||||
type treeStatus struct {
|
||||
treeId string
|
||||
status bool
|
||||
status SyncStatus
|
||||
heads []string
|
||||
}
|
||||
|
||||
type statusService struct {
|
||||
@ -47,9 +62,10 @@ type statusService struct {
|
||||
configuration nodeconf.Configuration
|
||||
periodicSync periodicsync.PeriodicSync
|
||||
updater Updater
|
||||
storage storage.SpaceStorage
|
||||
|
||||
spaceId string
|
||||
treeHeads map[string]statusEntry
|
||||
treeHeads map[string]treeHeadsEntry
|
||||
watchers map[string]struct{}
|
||||
stateCounter uint64
|
||||
closed bool
|
||||
@ -57,12 +73,13 @@ type statusService struct {
|
||||
treeStatusBuf []treeStatus
|
||||
}
|
||||
|
||||
func NewStatusService(spaceId string, configuration nodeconf.Configuration) StatusService {
|
||||
func NewStatusService(spaceId string, configuration nodeconf.Configuration, store storage.SpaceStorage) StatusService {
|
||||
return &statusService{
|
||||
spaceId: spaceId,
|
||||
treeHeads: map[string]statusEntry{},
|
||||
treeHeads: map[string]treeHeadsEntry{},
|
||||
watchers: map[string]struct{}{},
|
||||
configuration: configuration,
|
||||
storage: store,
|
||||
stateCounter: 0,
|
||||
}
|
||||
}
|
||||
@ -87,10 +104,13 @@ func (s *statusService) HeadsChange(treeId string, heads []string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// TODO: save to storage
|
||||
s.treeHeads[treeId] = statusEntry{
|
||||
head: heads[0],
|
||||
var headsCopy []string
|
||||
headsCopy = append(headsCopy, heads...)
|
||||
|
||||
s.treeHeads[treeId] = treeHeadsEntry{
|
||||
heads: headsCopy,
|
||||
stateCounter: s.stateCounter,
|
||||
syncStatus: SyncStatusNotSynced,
|
||||
}
|
||||
s.stateCounter++
|
||||
}
|
||||
@ -105,12 +125,18 @@ func (s *statusService) update(ctx context.Context) (err error) {
|
||||
}
|
||||
for treeId := range s.watchers {
|
||||
// that means that we haven't yet got the status update
|
||||
_, exists := s.treeHeads[treeId]
|
||||
s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, !exists})
|
||||
treeHeads, exists := s.treeHeads[treeId]
|
||||
if !exists {
|
||||
err = fmt.Errorf("treeHeads should always exist for watchers")
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus, treeHeads.heads})
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
for _, entry := range s.treeStatusBuf {
|
||||
log.With(zap.Bool("status", entry.status == SyncStatusSynced), zap.Strings("heads", entry.heads)).Debug("updating status")
|
||||
err = s.updater(ctx, entry.treeId, entry.status)
|
||||
if err != nil {
|
||||
return
|
||||
@ -123,28 +149,59 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
curHead, ok := s.treeHeads[treeId]
|
||||
if !ok {
|
||||
curTreeHeads, ok := s.treeHeads[treeId]
|
||||
if !ok || curTreeHeads.syncStatus == SyncStatusSynced {
|
||||
return
|
||||
}
|
||||
|
||||
// checking if other node is responsible
|
||||
if len(heads) == 0 || !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) {
|
||||
return
|
||||
}
|
||||
// checking if we received the head that we are interested in
|
||||
if !slices.Contains(heads, curHead.head) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: save to storage
|
||||
delete(s.treeHeads, treeId)
|
||||
// checking if we received the head that we are interested in
|
||||
for _, head := range heads {
|
||||
if idx, found := slices.BinarySearch(curTreeHeads.heads, head); found {
|
||||
curTreeHeads.heads[idx] = ""
|
||||
}
|
||||
}
|
||||
curTreeHeads.heads = slice.DiscardFromSlice(curTreeHeads.heads, func(h string) bool {
|
||||
return h == ""
|
||||
})
|
||||
if len(curTreeHeads.heads) == 0 {
|
||||
curTreeHeads.syncStatus = SyncStatusSynced
|
||||
}
|
||||
s.treeHeads[treeId] = curTreeHeads
|
||||
}
|
||||
|
||||
func (s *statusService) Watch(treeId string) {
|
||||
func (s *statusService) Watch(treeId string) (err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
_, ok := s.treeHeads[treeId]
|
||||
if !ok {
|
||||
var (
|
||||
st treestorage.TreeStorage
|
||||
heads []string
|
||||
)
|
||||
st, err = s.storage.TreeStorage(treeId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
heads, err = st.Heads()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
slices.Sort(heads)
|
||||
s.stateCounter++
|
||||
s.treeHeads[treeId] = treeHeadsEntry{
|
||||
heads: heads,
|
||||
stateCounter: s.stateCounter,
|
||||
syncStatus: SyncStatusUnknown,
|
||||
}
|
||||
}
|
||||
|
||||
s.watchers[treeId] = struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *statusService) Unwatch(treeId string) {
|
||||
@ -185,7 +242,8 @@ func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []st
|
||||
}
|
||||
// if we didn't find our treeId in heads ids which are different from us and node
|
||||
if _, found := slices.BinarySearch(differentRemoteIds, treeId); !found {
|
||||
delete(s.treeHeads, treeId)
|
||||
entry.syncStatus = SyncStatusSynced
|
||||
s.treeHeads[treeId] = entry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@ -341,7 +342,7 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang
|
||||
}
|
||||
}
|
||||
// discarding all previously seen changes
|
||||
ot.newChangesBuf = discardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil })
|
||||
ot.newChangesBuf = slice.DiscardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil })
|
||||
|
||||
if shouldRebuildFromStorage {
|
||||
err = ot.rebuildFromStorage(changesPayload.NewHeads, ot.newChangesBuf)
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -87,7 +88,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*treech
|
||||
return true
|
||||
},
|
||||
func(visited []*Change) {
|
||||
results = discardFromSlice(results, func(change *Change) bool {
|
||||
results = slice.DiscardFromSlice(results, func(change *Change) bool {
|
||||
return change.visited
|
||||
})
|
||||
},
|
||||
@ -209,7 +210,7 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
|
||||
})
|
||||
|
||||
// discarding visited
|
||||
buffer = discardFromSlice(buffer, func(change *treechangeproto.RawTreeChangeWithId) bool {
|
||||
buffer = slice.DiscardFromSlice(buffer, func(change *treechangeproto.RawTreeChangeWithId) bool {
|
||||
return change == nil
|
||||
})
|
||||
|
||||
|
||||
@ -27,21 +27,3 @@ OuterLoop:
|
||||
}
|
||||
return ourPath[i+1], nil
|
||||
}
|
||||
|
||||
func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) []T {
|
||||
var (
|
||||
finishedIdx = 0
|
||||
currentIdx = 0
|
||||
)
|
||||
for currentIdx < len(elements) {
|
||||
if !isDiscarded(elements[currentIdx]) {
|
||||
if finishedIdx != currentIdx {
|
||||
elements[finishedIdx] = elements[currentIdx]
|
||||
}
|
||||
finishedIdx++
|
||||
}
|
||||
currentIdx++
|
||||
}
|
||||
elements = elements[:finishedIdx]
|
||||
return elements
|
||||
}
|
||||
|
||||
@ -118,3 +118,21 @@ func UnsortedEquals(s1, s2 []string) bool {
|
||||
|
||||
return SortedEquals(s1Sorted, s2Sorted)
|
||||
}
|
||||
|
||||
func DiscardFromSlice[T any](elements []T, isDiscarded func(T) bool) []T {
|
||||
var (
|
||||
finishedIdx = 0
|
||||
currentIdx = 0
|
||||
)
|
||||
for currentIdx < len(elements) {
|
||||
if !isDiscarded(elements[currentIdx]) {
|
||||
if finishedIdx != currentIdx {
|
||||
elements[finishedIdx] = elements[currentIdx]
|
||||
}
|
||||
finishedIdx++
|
||||
}
|
||||
currentIdx++
|
||||
}
|
||||
elements = elements[:finishedIdx]
|
||||
return elements
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user