Merge pull request #2 from anytypeio/initial-structure

Initial structure
This commit is contained in:
Sergey Cherepanov 2022-07-13 21:11:10 +03:00 committed by GitHub
commit 84c2ee1e1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 1113 additions and 58 deletions

View File

@ -25,3 +25,7 @@ protos-go:
GOGO_NO_UNDERSCORE=1 GOGO_EXPORT_ONEOF_INTERFACE=1 protoc --gogofaster_out=$(PKGMAP):./aclchanges/pb aclchanges/pb/protos/*.*; mv aclchanges/pb/aclchanges/pb/protos/*.go aclchanges/pb; rm -rf aclchanges/pb/aclchanges
$(eval PKGMAP := $$(P_TIMESTAMP),$$(P_STRUCT),$$(P_THREAD))
GOGO_NO_UNDERSCORE=1 GOGO_EXPORT_ONEOF_INTERFACE=1 protoc --gogofaster_out=$(PKGMAP):./thread/pb thread/pb/protos/*.*; mv thread/pb/thread/pb/protos/*.go thread/pb; rm -rf thread/pb/thread
build:
@$(eval FLAGS := $$(shell govvv -flags -pkg github.com/anytypeio/go-anytype-infrastructure-experiments/app))
go build -o bin/anytype-node -ldflags "$(FLAGS)" cmd/node/node.go

View File

@ -1,2 +1,12 @@
# go-anytype-infrastructure-experiments
This repository will have the code for new infrastructure client and node prototypes
## Project structure
- **app** - DI, loggers, common engine
- **bin** - contains compiled binaries (under gitignore)
- **cmd** - main files by directories
- **config** - config component
- **etc** - default/example config files, keys, etc
- **service** - services, runtime components (these packages can use code from everywhere)
- **pkg** - some static packages that can be able to move to a separate repo, dependencies of these packages limited to this folder (maybe util)
- **util** - helpers

239
app/app.go Normal file
View File

@ -0,0 +1,239 @@
package app
import (
"context"
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"go.uber.org/zap"
"os"
"runtime"
"strings"
"sync"
"time"
)
var (
// values of this vars will be defined while compilation
GitCommit, GitBranch, GitState, GitSummary, BuildDate string
name string
)
var (
log = logger.NewNamed("app")
)
// Component is a minimal interface for a common app.Component
type Component interface {
// Init will be called first
// When returned error is not nil - app start will be aborted
Init(ctx context.Context, a *App) (err error)
// Name must return unique service name
Name() (name string)
}
// ComponentRunnable is an interface for realizing ability to start background processes or deep configure service
type ComponentRunnable interface {
Component
// Run will be called after init stage
// Non-nil error also will be aborted app start
Run(ctx context.Context) (err error)
// Close will be called when app shutting down
// Also will be called when service return error on Init or Run stage
// Non-nil error will be printed to log
Close(ctx context.Context) (err error)
}
type ComponentStatable interface {
StateChange(state int)
}
// App is the central part of the application
// It contains and manages all components
type App struct {
components []Component
mu sync.RWMutex
startStat StartStat
deviceState int
}
// Name returns app name
func (app *App) Name() string {
return name
}
// Version return app version
func (app *App) Version() string {
return GitSummary
}
type StartStat struct {
SpentMsPerComp map[string]int64
SpentMsTotal int64
}
// StartStat returns total time spent per comp
func (app *App) StartStat() StartStat {
app.mu.Lock()
defer app.mu.Unlock()
return app.startStat
}
// VersionDescription return the full info about the build
func (app *App) VersionDescription() string {
return VersionDescription()
}
func Version() string {
return GitSummary
}
func VersionDescription() string {
return fmt.Sprintf("build on %s from %s at #%s(%s)", BuildDate, GitBranch, GitCommit, GitState)
}
// Register adds service to registry
// All components will be started in the order they were registered
func (app *App) Register(s Component) *App {
app.mu.Lock()
defer app.mu.Unlock()
for _, es := range app.components {
if s.Name() == es.Name() {
panic(fmt.Errorf("component '%s' already registered", s.Name()))
}
}
app.components = append(app.components, s)
return app
}
// Component returns service by name
// If service with given name wasn't registered, nil will be returned
func (app *App) Component(name string) Component {
app.mu.RLock()
defer app.mu.RUnlock()
for _, s := range app.components {
if s.Name() == name {
return s
}
}
return nil
}
// MustComponent is like Component, but it will panic if service wasn't found
func (app *App) MustComponent(name string) Component {
s := app.Component(name)
if s == nil {
panic(fmt.Errorf("component '%s' not registered", name))
}
return s
}
// ComponentNames returns all registered names
func (app *App) ComponentNames() (names []string) {
app.mu.RLock()
defer app.mu.RUnlock()
names = make([]string, len(app.components))
for i, c := range app.components {
names[i] = c.Name()
}
return
}
// Start starts the application
// All registered services will be initialized and started
func (app *App) Start(ctx context.Context) (err error) {
app.mu.RLock()
defer app.mu.RUnlock()
app.startStat.SpentMsPerComp = make(map[string]int64)
closeServices := func(idx int) {
for i := idx; i >= 0; i-- {
if serviceClose, ok := app.components[i].(ComponentRunnable); ok {
if e := serviceClose.Close(ctx); e != nil {
log.Info("close error", zap.String("component", serviceClose.Name()), zap.Error(e))
}
}
}
}
for i, s := range app.components {
if err = s.Init(ctx, app); err != nil {
closeServices(i)
return fmt.Errorf("can't init service '%s': %v", s.Name(), err)
}
}
for i, s := range app.components {
if serviceRun, ok := s.(ComponentRunnable); ok {
start := time.Now()
if err = serviceRun.Run(ctx); err != nil {
closeServices(i)
return fmt.Errorf("can't run service '%s': %v", serviceRun.Name(), err)
}
spent := time.Since(start).Milliseconds()
app.startStat.SpentMsTotal += spent
app.startStat.SpentMsPerComp[s.Name()] = spent
}
}
log.Debug("all components started")
return
}
func stackAllGoroutines() []byte {
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
return buf[:n]
}
buf = make([]byte, 2*len(buf))
}
}
// Close stops the application
// All components with ComponentRunnable implementation will be closed in the reversed order
func (app *App) Close(ctx context.Context) error {
log.Debug("close components...")
app.mu.RLock()
defer app.mu.RUnlock()
done := make(chan struct{})
go func() {
select {
case <-done:
return
case <-time.After(time.Minute):
_, _ = os.Stderr.Write([]byte("app.Close timeout\n"))
_, _ = os.Stderr.Write(stackAllGoroutines())
panic("app.Close timeout")
}
}()
var errs []string
for i := len(app.components) - 1; i >= 0; i-- {
if serviceClose, ok := app.components[i].(ComponentRunnable); ok {
if e := serviceClose.Close(ctx); e != nil {
errs = append(errs, fmt.Sprintf("Component '%s' close error: %v", serviceClose.Name(), e))
}
}
}
close(done)
if len(errs) > 0 {
return errors.New(strings.Join(errs, "\n"))
}
log.Debug("all components have been closed")
return nil
}
func (app *App) SetDeviceState(state int) {
if app == nil {
return
}
app.mu.RLock()
defer app.mu.RUnlock()
app.deviceState = state
for _, component := range app.components {
if statable, ok := component.(ComponentStatable); ok {
statable.StateChange(state)
}
}
}

165
app/app_test.go Normal file
View File

@ -0,0 +1,165 @@
package app
import (
"context"
"fmt"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
)
func TestAppServiceRegistry(t *testing.T) {
app := new(App)
t.Run("Register", func(t *testing.T) {
app.Register(newTestService(testTypeRunnable, "c1", nil, nil))
app.Register(newTestService(testTypeRunnable, "r1", nil, nil))
app.Register(newTestService(testTypeComponent, "s1", nil, nil))
})
t.Run("Component", func(t *testing.T) {
assert.Nil(t, app.Component("not-registered"))
for _, name := range []string{"c1", "r1", "s1"} {
s := app.Component(name)
assert.NotNil(t, s, name)
assert.Equal(t, name, s.Name())
}
})
t.Run("MustComponent", func(t *testing.T) {
for _, name := range []string{"c1", "r1", "s1"} {
assert.NotPanics(t, func() { app.MustComponent(name) }, name)
}
assert.Panics(t, func() { app.MustComponent("not-registered") })
})
t.Run("ComponentNames", func(t *testing.T) {
names := app.ComponentNames()
assert.Equal(t, names, []string{"c1", "r1", "s1"})
})
}
func TestAppStart(t *testing.T) {
t.Run("SuccessStartStop", func(t *testing.T) {
app := new(App)
seq := new(testSeq)
services := [...]iTestService{
newTestService(testTypeRunnable, "c1", nil, seq),
newTestService(testTypeRunnable, "r1", nil, seq),
newTestService(testTypeComponent, "s1", nil, seq),
newTestService(testTypeRunnable, "c2", nil, seq),
}
for _, s := range services {
app.Register(s)
}
ctx := context.Background()
assert.Nil(t, app.Start(ctx))
assert.Nil(t, app.Close(ctx))
var actual []testIds
for _, s := range services {
actual = append(actual, s.Ids())
}
expected := []testIds{
{1, 5, 10},
{2, 6, 9},
{3, 0, 0},
{4, 7, 8},
}
assert.Equal(t, expected, actual)
})
t.Run("InitError", func(t *testing.T) {
app := new(App)
seq := new(testSeq)
expectedErr := fmt.Errorf("testError")
services := [...]iTestService{
newTestService(testTypeRunnable, "c1", nil, seq),
newTestService(testTypeRunnable, "c2", expectedErr, seq),
}
for _, s := range services {
app.Register(s)
}
err := app.Start(context.Background())
assert.NotNil(t, err)
assert.Contains(t, err.Error(), expectedErr.Error())
var actual []testIds
for _, s := range services {
actual = append(actual, s.Ids())
}
expected := []testIds{
{1, 0, 4},
{2, 0, 3},
}
assert.Equal(t, expected, actual)
})
}
const (
testTypeComponent int = iota
testTypeRunnable
)
func newTestService(componentType int, name string, err error, seq *testSeq) (s iTestService) {
ts := testComponent{name: name, err: err, seq: seq}
switch componentType {
case testTypeComponent:
return &ts
case testTypeRunnable:
return &testRunnable{testComponent: ts}
}
return nil
}
type iTestService interface {
Component
Ids() (ids testIds)
}
type testIds struct {
initId int64
runId int64
closeId int64
}
type testComponent struct {
name string
err error
seq *testSeq
ids testIds
}
func (t *testComponent) Init(ctx context.Context, a *App) error {
t.ids.initId = t.seq.New()
return t.err
}
func (t *testComponent) Name() string { return t.name }
func (t *testComponent) Ids() testIds {
return t.ids
}
type testRunnable struct {
testComponent
}
func (t *testRunnable) Run(ctx context.Context) error {
t.ids.runId = t.seq.New()
return t.err
}
func (t *testRunnable) Close(ctx context.Context) error {
t.ids.closeId = t.seq.New()
return t.err
}
type testSeq struct {
seq int64
}
func (ts *testSeq) New() int64 {
return atomic.AddInt64(&ts.seq, 1)
}

21
app/logger/log.go Normal file
View File

@ -0,0 +1,21 @@
package logger
import "go.uber.org/zap"
var DefaultLogger *zap.Logger
func init() {
DefaultLogger, _ = zap.NewDevelopment()
}
func Default() *zap.Logger {
return DefaultLogger
}
func NewNamed(name string, fields ...zap.Field) *zap.Logger {
l := DefaultLogger.Named(name)
if len(fields) > 0 {
l = l.With(fields...)
}
return l
}

2
bin/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*
!.gitignore

75
cmd/node/node.go Normal file
View File

@ -0,0 +1,75 @@
package main
import (
"context"
"flag"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"go.uber.org/zap"
"os"
"os/signal"
"syscall"
"time"
)
var log = logger.NewNamed("main")
var (
flagConfigFile = flag.String("c", "etc/config.yml", "path to config file")
flagVersion = flag.Bool("v", false, "show version and exit")
flagHelp = flag.Bool("h", false, "show help and exit")
)
func main() {
flag.Parse()
if *flagVersion {
fmt.Println(app.VersionDescription())
return
}
if *flagHelp {
flag.PrintDefaults()
return
}
// create app
ctx := context.Background()
a := new(app.App)
// open config file
conf, err := config.NewFromFile(*flagConfigFile)
if err != nil {
log.Fatal("can't open config file", zap.Error(err))
}
// bootstrap components
a.Register(conf)
Bootstrap(a)
// start app
if err := a.Start(ctx); err != nil {
log.Error("can't start app", zap.Error(err))
}
log.Info("app started", zap.String("version", a.Version()))
// wait exit signal
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-exit
log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig)))
// close app
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := a.Close(ctx); err != nil {
log.Fatal("close error", zap.Error(err))
} else {
log.Info("goodbye!")
}
}
func Bootstrap(a *app.App) {
//a.Register(mycomponent.New())
}

5
config/anytype.go Normal file
View File

@ -0,0 +1,5 @@
package config
type Anytype struct {
SwarmKey string `yaml:"swarmKey"`
}

38
config/config.go Normal file
View File

@ -0,0 +1,38 @@
package config
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"gopkg.in/yaml.v3"
"io/ioutil"
)
const CName = "config"
func NewFromFile(path string) (c *Config, err error) {
c = &Config{}
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
if err = yaml.Unmarshal(data, c); err != nil {
return nil, err
}
return
}
type Config struct {
Anytype Anytype `yaml:"anytype"`
GrpcServer GrpcServer `yaml:"grpcServer"`
}
func (c *Config) Init(ctx context.Context, a *app.App) (err error) {
logger.NewNamed("config").Info(fmt.Sprint(*c))
return
}
func (c Config) Name() (name string) {
return CName
}

5
config/grpc.go Normal file
View File

@ -0,0 +1,5 @@
package config
type GrpcServer struct {
ListenAddrs []string `yaml:"listenAddrs"`
}

6
etc/config.yml Normal file
View File

@ -0,0 +1,6 @@
anytype:
swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec"
grpcServer:
listenAddrs:
- ":443"

17
go.mod
View File

@ -8,15 +8,15 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/libp2p/go-libp2p-core v0.8.5
github.com/mr-tron/base58 v1.2.0
github.com/prometheus/common v0.18.0
github.com/stretchr/testify v1.7.0
github.com/textileio/go-threads v1.0.2-0.20210304072541-d0f91da84404
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.15
github.com/prometheus/common v0.18.0
github.com/stretchr/testify v1.7.0
github.com/textileio/go-threads v1.0.2-0.20210304072541-d0f91da84404
gopkg.in/yaml.v3 v3.0.1
)
require (
@ -38,19 +38,18 @@ require (
github.com/libp2p/go-openssl v0.0.7 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/multiformats/go-base32 v0.0.3 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multiaddr v0.3.3 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multihash v0.0.15 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20210426080607-c94f62235c83 // indirect
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.33.2 // indirect
google.golang.org/protobuf v1.25.0 // indirect

19
go.sum
View File

@ -49,6 +49,7 @@ github.com/aws/aws-sdk-go v1.29.15/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTg
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
@ -1026,6 +1027,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.mongodb.org/mongo-driver v1.4.0/go.mod h1:llVBH2pkj9HywK0Dtdt6lDikOjFLbceHVu/Rc0iMKLs=
@ -1043,17 +1045,24 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@ -1106,6 +1115,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -1142,6 +1152,7 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6 h1:0PC75Fz/kyMGhL0e1QnypqK2kQMqKt9csD1GnMJR+Zk=
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -1214,9 +1225,12 @@ golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426080607-c94f62235c83 h1:kHSDPqCtsHZOg0nVylfTo20DDhE9gG4Y0jn7hKQ0QAM=
golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -1257,6 +1271,7 @@ golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -1334,9 +1349,13 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -1,6 +1,8 @@
package aclchanges
import "github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
)
type Change interface {
ProtoChange() *pb.ACLChange

View File

@ -4,11 +4,10 @@ import (
"bytes"
"errors"
"fmt"
"hash/fnv"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/textileio/go-threads/crypto/symmetric"
"hash/fnv"
)
var ErrNoSuchUser = errors.New("no such user")

View File

@ -2,9 +2,9 @@ package acltree
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
)

View File

@ -1,10 +1,10 @@
package acltree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"sync"
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
)

View File

@ -1,13 +1,12 @@
package acltree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/threadbuilder"
"testing"
"github.com/stretchr/testify/assert"
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/threadbuilder"
)
type mockListener struct{}

View File

@ -2,8 +2,8 @@ package acltree
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)

View File

@ -2,10 +2,10 @@ package acltree
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"github.com/gogo/protobuf/proto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/textileio/go-threads/crypto/symmetric"
)

View File

@ -1,8 +1,8 @@
package acltree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/gogo/protobuf/proto"

View File

@ -3,10 +3,10 @@ package acltree
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"time"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/gogo/protobuf/proto"
)

View File

@ -2,8 +2,8 @@ package acltree
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
)

View File

@ -1,8 +1,8 @@
package acltree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
)

View File

@ -2,10 +2,10 @@ package acltree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/threadbuilder"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/threadbuilder"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"github.com/stretchr/testify/assert"
"testing"
)

View File

@ -3,8 +3,8 @@ package acltree
import (
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
//"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/logging"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"

View File

@ -2,12 +2,12 @@ package plaintextdocument
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
aclpb "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
aclpb "github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/testchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/gogo/protobuf/proto"
)

View File

@ -1,9 +1,9 @@
package plaintextdocument
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/threadbuilder"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/threadbuilder"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
"github.com/stretchr/testify/assert"
"testing"
)

View File

@ -2,8 +2,8 @@ package plaintextdocument
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/testchanges/pb"
"github.com/gogo/protobuf/proto"
)

View File

@ -3,8 +3,12 @@ package threadbuilder
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges"
"github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/yamltests"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/pb"
testpb "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/yamltests"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
threadpb "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"io/ioutil"
"path"
@ -12,10 +16,6 @@ import (
"github.com/gogo/protobuf/proto"
"gopkg.in/yaml.v3"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges/pb"
testpb "github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/testchanges/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread"
threadpb "github.com/anytypeio/go-anytype-infrastructure-experiments/thread/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
)

View File

@ -9,14 +9,13 @@ package threadbuilder
import (
"fmt"
testpb "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/pb"
"github.com/gogo/protobuf/proto"
"strings"
"unicode"
"github.com/awalterschulze/gographviz"
testpb "github.com/anytypeio/go-anytype-infrastructure-experiments/testutils/testchanges/pb"
)
// To quickly look at visualized string you can use https://dreampuf.github.io/GraphvizOnline

View File

@ -3,8 +3,8 @@ package thread
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"github.com/gogo/protobuf/proto"

View File

@ -2,9 +2,8 @@ package thread
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/thread/pb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/aclchanges"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread/pb"
)
// TODO: change methods to have errors as a return parameter, because we will be dealing with a real database

355
pkg/ocache/ocache.go Normal file
View File

@ -0,0 +1,355 @@
package ocache
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"go.uber.org/zap"
"sync"
"time"
)
var (
ErrClosed = errors.New("object cache closed")
ErrExists = errors.New("object exists")
ErrTimeout = errors.New("loading object timed out")
)
var (
defaultTTL = time.Minute
defaultGC = 20 * time.Second
)
type key int
const CacheTimeout key = 0
var log = logger.NewNamed("ocache")
type LoadFunc func(ctx context.Context, id string) (value Object, err error)
type Option func(*oCache)
var WithLogServiceName = func(name string) Option {
return func(cache *oCache) {
cache.log = cache.log.With("service_name", name)
}
}
var WithTTL = func(ttl time.Duration) Option {
return func(cache *oCache) {
cache.ttl = ttl
}
}
var WithGCPeriod = func(gcPeriod time.Duration) Option {
return func(cache *oCache) {
cache.gc = gcPeriod
}
}
func New(loadFunc LoadFunc, opts ...Option) OCache {
c := &oCache{
data: make(map[string]*entry),
loadFunc: loadFunc,
timeNow: time.Now,
ttl: defaultTTL,
gc: defaultGC,
closeCh: make(chan struct{}),
log: log.Sugar(),
}
for _, o := range opts {
o(c)
}
go c.ticker()
return c
}
type Object interface {
Close() (err error)
}
type ObjectLocker interface {
Object
Locked() bool
}
type entry struct {
id string
lastUsage time.Time
refCount uint32
load chan struct{}
loadErr error
value Object
}
func (e *entry) locked() bool {
if locker, ok := e.value.(ObjectLocker); ok {
return locker.Locked()
}
return false
}
type OCache interface {
// DoLockedIfNotExists does an action if the object with id is not in cache
// under a global lock, this will prevent a race which otherwise occurs
// when object is created in parallel with action
DoLockedIfNotExists(id string, action func() error) error
// Get gets an object from cache or create a new one via 'loadFunc'
// Increases the object refs counter on successful
// When 'loadFunc' returns a non-nil error, an object will not be stored to cache
Get(ctx context.Context, id string) (value Object, err error)
// Add adds new object to cache
// Returns error when object exists
Add(id string, value Object) (err error)
// Release decreases the refs counter
Release(id string) bool
// Reset sets refs counter to 0
Reset(id string) bool
// Remove closes and removes object
Remove(id string) (ok bool, err error)
// ForEach iterates over all loaded objects, breaks when callback returns false
ForEach(f func(v Object) (isContinue bool))
// GC frees not used and expired objects
// Will automatically called every 'gcPeriod'
GC()
// Len returns current cache size
Len() int
// Close closes all objects and cache
Close() (err error)
}
type oCache struct {
mu sync.Mutex
data map[string]*entry
loadFunc LoadFunc
timeNow func() time.Time
ttl time.Duration
gc time.Duration
closed bool
closeCh chan struct{}
log *zap.SugaredLogger
}
func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
var (
e *entry
ok bool
load bool
)
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, ErrClosed
}
if e, ok = c.data[id]; !ok {
load = true
e = &entry{
id: id,
load: make(chan struct{}),
}
c.data[id] = e
}
e.lastUsage = c.timeNow()
e.refCount++
c.mu.Unlock()
timeout := ctx.Value(CacheTimeout)
if load {
if timeout != nil {
go c.load(ctx, id, e)
} else {
c.load(ctx, id, e)
}
}
if timeout != nil {
duration := timeout.(time.Duration)
select {
case <-e.load:
return e.value, e.loadErr
case <-time.After(duration):
return nil, ErrTimeout
}
}
<-e.load
return e.value, e.loadErr
}
func (c *oCache) load(ctx context.Context, id string, e *entry) {
defer close(e.load)
value, err := c.loadFunc(ctx, id)
c.mu.Lock()
defer c.mu.Unlock()
if err != nil {
e.loadErr = err
delete(c.data, id)
} else {
e.value = value
}
}
func (c *oCache) Release(id string) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return false
}
if e, ok := c.data[id]; ok {
if e.refCount > 0 {
e.refCount--
return true
}
}
return false
}
func (c *oCache) Reset(id string) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return false
}
if e, ok := c.data[id]; ok {
e.refCount = 0
return true
}
return false
}
func (c *oCache) Remove(id string) (ok bool, err error) {
c.mu.Lock()
e, ok := c.data[id]
if ok {
delete(c.data, id)
}
c.mu.Unlock()
if ok {
<-e.load
if e.value != nil {
err = e.value.Close()
}
}
return
}
func (c *oCache) DoLockedIfNotExists(id string, action func() error) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return ErrClosed
}
if _, ok := c.data[id]; ok {
return ErrExists
}
return action()
}
func (c *oCache) Add(id string, value Object) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.data[id]; ok {
return ErrExists
}
e := &entry{
id: id,
lastUsage: time.Now(),
refCount: 0,
load: make(chan struct{}),
value: value,
}
close(e.load)
c.data[id] = e
return
}
func (c *oCache) ForEach(f func(obj Object) (isContinue bool)) {
var objects []Object
c.mu.Lock()
for _, v := range c.data {
select {
case <-v.load:
if v.value != nil {
objects = append(objects, v.value)
}
default:
}
}
c.mu.Unlock()
for _, obj := range objects {
if !f(obj) {
return
}
}
}
func (c *oCache) ticker() {
ticker := time.NewTicker(c.gc)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.GC()
case <-c.closeCh:
return
}
}
}
func (c *oCache) GC() {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return
}
deadline := c.timeNow().Add(-c.ttl)
var toClose []*entry
for k, e := range c.data {
if !e.locked() && e.refCount <= 0 && e.lastUsage.Before(deadline) {
delete(c.data, k)
toClose = append(toClose, e)
}
}
size := len(c.data)
c.mu.Unlock()
c.log.Infof("GC: removed %d; cache size: %d", len(toClose), size)
for _, e := range toClose {
<-e.load
if e.value != nil {
if err := e.value.Close(); err != nil {
c.log.With("object_id", e.id).Warnf("GC: object close error: %v", err)
}
}
}
}
func (c *oCache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.data)
}
func (c *oCache) Close() (err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return ErrClosed
}
c.closed = true
close(c.closeCh)
var toClose []*entry
for _, e := range c.data {
toClose = append(toClose, e)
}
c.mu.Unlock()
for _, e := range toClose {
<-e.load
if e.value != nil {
if clErr := e.value.Close(); clErr != nil {
c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", clErr)
}
}
}
return nil
}

114
pkg/ocache/ocache_test.go Normal file
View File

@ -0,0 +1,114 @@
package ocache
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testObject struct {
name string
closeErr error
}
func (t *testObject) Close() (err error) {
return t.closeErr
}
func (t *testObject) ShouldClose() bool {
return true
}
func TestOCache_Get(t *testing.T) {
t.Run("successful", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
return &testObject{name: "test"}, nil
})
val, err := c.Get(context.TODO(), "test")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, "test", val.(*testObject).name)
assert.Equal(t, 1, c.Len())
assert.NoError(t, c.Close())
})
t.Run("error", func(t *testing.T) {
tErr := errors.New("err")
c := New(func(ctx context.Context, id string) (value Object, err error) {
return nil, tErr
})
val, err := c.Get(context.TODO(), "test")
require.Equal(t, tErr, err)
require.Nil(t, val)
assert.Equal(t, 0, c.Len())
assert.NoError(t, c.Close())
})
t.Run("parallel load", func(t *testing.T) {
var waitCh = make(chan struct{})
var obj = &testObject{
name: "test",
}
var calls uint32
c := New(func(ctx context.Context, id string) (value Object, err error) {
atomic.AddUint32(&calls, 1)
<-waitCh
return obj, nil
})
var l = 10
var res = make(chan struct{}, l)
for i := 0; i < l; i++ {
go func() {
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
assert.Equal(t, obj, val)
res <- struct{}{}
}()
}
time.Sleep(time.Millisecond * 10)
close(waitCh)
var timeout = time.After(time.Second)
for i := 0; i < l; i++ {
select {
case <-res:
case <-timeout:
require.True(t, false, "timeout")
}
}
assert.Equal(t, 1, c.Len())
assert.Equal(t, uint32(1), calls)
assert.NoError(t, c.Close())
})
t.Run("errClosed", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
return nil, errors.New("test")
})
require.NoError(t, c.Close())
_, err := c.Get(context.TODO(), "id")
assert.Equal(t, ErrClosed, err)
})
}
func TestOCache_GC(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
return &testObject{name: id}, nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
c.GC()
assert.Equal(t, 1, c.Len())
time.Sleep(time.Millisecond * 30)
c.GC()
assert.Equal(t, 1, c.Len())
assert.True(t, c.Release("id"))
c.GC()
assert.Equal(t, 0, c.Len())
assert.False(t, c.Release("id"))
}