Change obj tree logic to include their heads on add raw changes

This commit is contained in:
mcrakhman 2022-12-12 17:17:09 +01:00
parent 3807d8e6ef
commit b71360ec01
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
5 changed files with 46 additions and 27 deletions

View File

@ -247,11 +247,11 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
return return
} }
func (s *syncTree) AddRawChanges(ctx context.Context, changes ...*treechangeproto.RawTreeChangeWithId) (res tree.AddResult, err error) { func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload tree.RawChangesPayload) (res tree.AddResult, err error) {
if err = s.checkAlive(); err != nil { if err = s.checkAlive(); err != nil {
return return
} }
res, err = s.ObjectTree.AddRawChanges(ctx, changes...) res, err = s.ObjectTree.AddRawChanges(ctx, changesPayload)
if err != nil { if err != nil {
return return
} }

View File

@ -75,7 +75,10 @@ func (s *syncTreeHandler) handleHeadUpdate(
return nil return nil
} }
_, err = objTree.AddRawChanges(ctx, update.Changes...) _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: update.Heads,
RawChanges: update.Changes,
})
if err != nil { if err != nil {
return err return err
} }
@ -128,7 +131,10 @@ func (s *syncTreeHandler) handleFullSyncRequest(
defer objTree.Unlock() defer objTree.Unlock()
if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) { if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, request.Changes...) _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: request.Heads,
RawChanges: request.Changes,
})
if err != nil { if err != nil {
return err return err
} }
@ -168,7 +174,10 @@ func (s *syncTreeHandler) handleFullSyncResponse(
return nil return nil
} }
_, err = objTree.AddRawChanges(ctx, response.Changes...) _, err = objTree.AddRawChanges(ctx, tree.RawChangesPayload{
NewHeads: response.Heads,
RawChanges: response.Changes,
})
return err return err
}() }()
log.With("error", err != nil). log.With("error", err != nil).

View File

@ -33,6 +33,11 @@ type AddResult struct {
Mode Mode Mode Mode
} }
type RawChangesPayload struct {
NewHeads []string
RawChanges []*treechangeproto.RawTreeChangeWithId
}
type ChangeIterateFunc = func(change *Change) bool type ChangeIterateFunc = func(change *Change) bool
type ChangeConvertFunc = func(decrypted []byte) (any, error) type ChangeConvertFunc = func(decrypted []byte) (any, error)
@ -55,7 +60,7 @@ type ObjectTree interface {
Storage() storage.TreeStorage Storage() storage.TreeStorage
AddContent(ctx context.Context, content SignableChangeContent) (AddResult, error) AddContent(ctx context.Context, content SignableChangeContent) (AddResult, error)
AddRawChanges(ctx context.Context, changes ...*treechangeproto.RawTreeChangeWithId) (AddResult, error) AddRawChanges(ctx context.Context, changes RawChangesPayload) (AddResult, error)
Delete() error Delete() error
Close() error Close() error
@ -113,10 +118,10 @@ func defaultObjectTreeDeps(
} }
} }
func (ot *objectTree) rebuildFromStorage(newChanges []*Change) (err error) { func (ot *objectTree) rebuildFromStorage(theirHeads []string, newChanges []*Change) (err error) {
ot.treeBuilder.Reset() ot.treeBuilder.Reset()
ot.tree, err = ot.treeBuilder.Build(newChanges) ot.tree, err = ot.treeBuilder.Build(theirHeads, newChanges)
if err != nil { if err != nil {
return return
} }
@ -213,8 +218,8 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt
return return
} }
func (ot *objectTree) AddRawChanges(ctx context.Context, rawChanges ...*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) { func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChangesPayload) (addResult AddResult, err error) {
addResult, err = ot.addRawChanges(ctx, rawChanges...) addResult, err = ot.addRawChanges(ctx, changesPayload)
if err != nil { if err != nil {
return return
} }
@ -235,7 +240,7 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, rawChanges ...*treechan
return return
} }
func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) { func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChangesPayload) (addResult AddResult, err error) {
// resetting buffers // resetting buffers
ot.newChangesBuf = ot.newChangesBuf[:0] ot.newChangesBuf = ot.newChangesBuf[:0]
ot.notSeenIdxBuf = ot.notSeenIdxBuf[:0] ot.notSeenIdxBuf = ot.notSeenIdxBuf[:0]
@ -252,7 +257,7 @@ func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*treechan
prevHeadsCopy := headsCopy() prevHeadsCopy := headsCopy()
// filtering changes, verifying and unmarshalling them // filtering changes, verifying and unmarshalling them
for idx, ch := range rawChanges { for idx, ch := range changesPayload.RawChanges {
// not unmarshalling the changes if they were already added either as unattached or attached // not unmarshalling the changes if they were already added either as unattached or attached
if _, exists := ot.tree.attached[ch.Id]; exists { if _, exists := ot.tree.attached[ch.Id]; exists {
continue continue
@ -331,17 +336,17 @@ func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*treechan
ot.newChangesBuf = discardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil }) ot.newChangesBuf = discardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil })
if shouldRebuildFromStorage { if shouldRebuildFromStorage {
err = ot.rebuildFromStorage(ot.newChangesBuf) err = ot.rebuildFromStorage(changesPayload.NewHeads, ot.newChangesBuf)
if err != nil { if err != nil {
// rebuilding without new changes // rebuilding without new changes
ot.rebuildFromStorage(nil) ot.rebuildFromStorage(nil, nil)
return return
} }
addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, nil, rawChanges) addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, nil, changesPayload.RawChanges)
if err != nil { if err != nil {
// that means that some unattached changes were somehow corrupted in memory // that means that some unattached changes were somehow corrupted in memory
// this shouldn't happen but if that happens, then rebuilding from storage // this shouldn't happen but if that happens, then rebuilding from storage
ot.rebuildFromStorage(nil) ot.rebuildFromStorage(nil, nil)
return return
} }
return return
@ -366,11 +371,11 @@ func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*treechan
err = ErrHasInvalidChanges err = ErrHasInvalidChanges
return return
} }
addResult, err = ot.createAddResult(prevHeadsCopy, mode, treeChangesAdded, rawChanges) addResult, err = ot.createAddResult(prevHeadsCopy, mode, treeChangesAdded, changesPayload.RawChanges)
if err != nil { if err != nil {
// that means that some unattached changes were somehow corrupted in memory // that means that some unattached changes were somehow corrupted in memory
// this shouldn't happen but if that happens, then rebuilding from storage // this shouldn't happen but if that happens, then rebuilding from storage
ot.rebuildFromStorage(nil) ot.rebuildFromStorage(nil, nil)
return return
} }
return return

View File

@ -106,7 +106,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
newSnapshotsBuf: make([]*Change, 0, 10), newSnapshotsBuf: make([]*Change, 0, 10),
} }
err := objTree.rebuildFromStorage(nil) err := objTree.rebuildFromStorage(nil, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -40,27 +40,32 @@ func (tb *treeBuilder) Reset() {
tb.tree = &Tree{} tb.tree = &Tree{}
} }
func (tb *treeBuilder) Build(newChanges []*Change) (*Tree, error) { func (tb *treeBuilder) Build(theirHeads []string, newChanges []*Change) (*Tree, error) {
var headsAndNewChanges []string var proposedHeads []string
heads, err := tb.treeStorage.Heads() heads, err := tb.treeStorage.Heads()
if err != nil { if err != nil {
return nil, err return nil, err
} }
headsAndNewChanges = append(headsAndNewChanges, heads...)
tb.cache = make(map[string]*Change) tb.cache = make(map[string]*Change)
proposedHeads = append(proposedHeads, heads...)
if len(theirHeads) > 0 {
proposedHeads = append(proposedHeads, theirHeads...)
}
for _, ch := range newChanges { for _, ch := range newChanges {
headsAndNewChanges = append(headsAndNewChanges, ch.Id) // we don't know what new heads are, so every change can be head
if len(theirHeads) == 0 {
proposedHeads = append(proposedHeads, ch.Id)
}
tb.cache[ch.Id] = ch tb.cache[ch.Id] = ch
} }
log.With(zap.Strings("heads", heads)).Debug("building tree") log.With(zap.Strings("heads", proposedHeads)).Debug("building tree")
breakpoint, err := tb.findBreakpoint(headsAndNewChanges) breakpoint, err := tb.findBreakpoint(proposedHeads)
if err != nil { if err != nil {
return nil, fmt.Errorf("findBreakpoint error: %v", err) return nil, fmt.Errorf("findBreakpoint error: %v", err)
} }
if err = tb.buildTree(headsAndNewChanges, breakpoint); err != nil { if err = tb.buildTree(proposedHeads, breakpoint); err != nil {
return nil, fmt.Errorf("buildTree error: %v", err) return nil, fmt.Errorf("buildTree error: %v", err)
} }