189 lines
5.0 KiB
Go
189 lines
5.0 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/config"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const CName = "consensus.db"
|
|
|
|
var log = logger.NewNamed(CName)
|
|
|
|
func New() Service {
|
|
return &service{}
|
|
}
|
|
|
|
type ChangeReceiver func(logId []byte, records []consensus.Record)
|
|
|
|
type Service interface {
|
|
// AddLog adds new log db
|
|
AddLog(ctx context.Context, log consensus.Log) (err error)
|
|
// AddRecord adds new record to existing log
|
|
// returns consensuserr.ErrConflict if record didn't match or log not found
|
|
AddRecord(ctx context.Context, logId []byte, record consensus.Record) (err error)
|
|
// FetchLog gets log by id
|
|
FetchLog(ctx context.Context, logId []byte) (log consensus.Log, err error)
|
|
// SetChangeReceiver sets the receiver for updates, it must be called before app.Run stage
|
|
SetChangeReceiver(receiver ChangeReceiver) (err error)
|
|
app.ComponentRunnable
|
|
}
|
|
|
|
type service struct {
|
|
conf config.Mongo
|
|
logColl *mongo.Collection
|
|
running bool
|
|
changeReceiver ChangeReceiver
|
|
|
|
streamCtx context.Context
|
|
streamCancel context.CancelFunc
|
|
listenerDone chan struct{}
|
|
}
|
|
|
|
func (s *service) Init(a *app.App) (err error) {
|
|
s.conf = a.MustComponent(config.CName).(*config.Config).Mongo
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Name() (name string) {
|
|
return CName
|
|
}
|
|
|
|
func (s *service) Run(ctx context.Context) (err error) {
|
|
client, err := mongo.Connect(ctx, options.Client().ApplyURI(s.conf.Connect))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.logColl = client.Database(s.conf.Database).Collection(s.conf.LogCollection)
|
|
s.running = true
|
|
if s.changeReceiver != nil {
|
|
if err = s.runStreamListener(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *service) AddLog(ctx context.Context, l consensus.Log) (err error) {
|
|
_, err = s.logColl.InsertOne(ctx, l)
|
|
if mongo.IsDuplicateKeyError(err) {
|
|
return consensuserr.ErrLogExists
|
|
}
|
|
return
|
|
}
|
|
|
|
type findLogQuery struct {
|
|
Id []byte `bson:"_id"`
|
|
}
|
|
|
|
type findRecordQuery struct {
|
|
Id []byte `bson:"_id"`
|
|
LastRecordId []byte `bson:"records.0.id"`
|
|
}
|
|
|
|
type updateOp struct {
|
|
Push struct {
|
|
Records struct {
|
|
Each []consensus.Record `bson:"$each""`
|
|
Pos int `bson:"$position"`
|
|
} `bson:"records"`
|
|
} `bson:"$push"`
|
|
}
|
|
|
|
func (s *service) AddRecord(ctx context.Context, logId []byte, record consensus.Record) (err error) {
|
|
var upd updateOp
|
|
upd.Push.Records.Each = []consensus.Record{record}
|
|
result, err := s.logColl.UpdateOne(ctx, findRecordQuery{
|
|
Id: logId,
|
|
LastRecordId: record.PrevId,
|
|
}, upd)
|
|
if err != nil {
|
|
log.Error("addRecord update error", zap.Error(err))
|
|
return consensuserr.ErrUnexpected
|
|
}
|
|
if result.ModifiedCount == 0 {
|
|
return consensuserr.ErrConflict
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *service) FetchLog(ctx context.Context, logId []byte) (l consensus.Log, err error) {
|
|
if err = s.logColl.FindOne(ctx, findLogQuery{Id: logId}).Decode(&l); err != nil {
|
|
if err == mongo.ErrNoDocuments {
|
|
err = consensuserr.ErrLogNotFound
|
|
}
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *service) SetChangeReceiver(receiver ChangeReceiver) (err error) {
|
|
if s.running {
|
|
return fmt.Errorf("set receiver must be called before Run")
|
|
}
|
|
s.changeReceiver = receiver
|
|
return
|
|
}
|
|
|
|
type matchPipeline struct {
|
|
Match struct {
|
|
OT string `bson:"operationType"`
|
|
} `bson:"$match"`
|
|
}
|
|
|
|
func (s *service) runStreamListener(ctx context.Context) (err error) {
|
|
var mp matchPipeline
|
|
mp.Match.OT = "update"
|
|
stream, err := s.logColl.Watch(ctx, []matchPipeline{mp})
|
|
if err != nil {
|
|
return
|
|
}
|
|
s.listenerDone = make(chan struct{})
|
|
s.streamCtx, s.streamCancel = context.WithCancel(context.Background())
|
|
go s.streamListener(stream)
|
|
return
|
|
}
|
|
|
|
type streamResult struct {
|
|
DocumentKey struct {
|
|
Id []byte `bson:"_id"`
|
|
} `bson:"documentKey"`
|
|
UpdateDescription struct {
|
|
UpdateFields struct {
|
|
Records []consensus.Record `bson:"records"`
|
|
} `bson:"updatedFields"`
|
|
} `bson:"updateDescription"`
|
|
}
|
|
|
|
func (s *service) streamListener(stream *mongo.ChangeStream) {
|
|
defer close(s.listenerDone)
|
|
for stream.Next(s.streamCtx) {
|
|
var res streamResult
|
|
if err := stream.Decode(&res); err != nil {
|
|
// mongo driver maintains connections and handles reconnects so that the stream will work as usual in these cases
|
|
// here we have an unexpected error and should stop any operations to avoid an inconsistent state between db and cache
|
|
log.Fatal("stream decode error:", zap.Error(err))
|
|
}
|
|
s.changeReceiver(res.DocumentKey.Id, res.UpdateDescription.UpdateFields.Records)
|
|
}
|
|
}
|
|
|
|
func (s *service) Close(ctx context.Context) (err error) {
|
|
if s.logColl != nil {
|
|
err = s.logColl.Database().Client().Disconnect(ctx)
|
|
s.logColl = nil
|
|
}
|
|
if s.listenerDone != nil {
|
|
s.streamCancel()
|
|
<-s.listenerDone
|
|
}
|
|
return
|
|
}
|