any-sync/app/ocache/ocache_test.go
2023-03-08 22:52:43 +01:00

262 lines
6.4 KiB
Go

package ocache
import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testObject struct {
name string
closeErr error
closeCh chan struct{}
tryReturn bool
closeCalled bool
tryCloseCalled bool
}
func NewTestObject(name string, tryReturn bool, closeCh chan struct{}) *testObject {
return &testObject{
name: name,
closeCh: closeCh,
tryReturn: tryReturn,
}
}
func (t *testObject) Close() (err error) {
t.closeCalled = true
if t.closeCh != nil {
<-t.closeCh
}
return t.closeErr
}
func (t *testObject) TryClose() (res bool, err error) {
t.tryCloseCalled = true
if t.closeCh != nil {
<-t.closeCh
return true, t.closeErr
}
return t.tryReturn, nil
}
func TestOCache_Get(t *testing.T) {
t.Run("successful", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
return &testObject{name: "test"}, nil
})
val, err := c.Get(context.TODO(), "test")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, "test", val.(*testObject).name)
assert.Equal(t, 1, c.Len())
assert.NoError(t, c.Close())
})
t.Run("error", func(t *testing.T) {
tErr := errors.New("err")
c := New(func(ctx context.Context, id string) (value Object, err error) {
return nil, tErr
})
val, err := c.Get(context.TODO(), "test")
require.Equal(t, tErr, err)
require.Nil(t, val)
assert.Equal(t, 0, c.Len())
assert.NoError(t, c.Close())
})
t.Run("parallel load", func(t *testing.T) {
var waitCh = make(chan struct{})
var obj = &testObject{
name: "test",
}
var calls uint32
c := New(func(ctx context.Context, id string) (value Object, err error) {
atomic.AddUint32(&calls, 1)
<-waitCh
return obj, nil
})
var l = 10
var res = make(chan struct{}, l)
for i := 0; i < l; i++ {
go func() {
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
assert.Equal(t, obj, val)
res <- struct{}{}
}()
}
time.Sleep(time.Millisecond * 10)
close(waitCh)
var timeout = time.After(time.Second)
for i := 0; i < l; i++ {
select {
case <-res:
case <-timeout:
require.True(t, false, "timeout")
}
}
assert.Equal(t, 1, c.Len())
assert.Equal(t, uint32(1), calls)
assert.NoError(t, c.Close())
})
t.Run("errClosed", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
return nil, errors.New("test")
})
require.NoError(t, c.Close())
_, err := c.Get(context.TODO(), "id")
assert.Equal(t, ErrClosed, err)
})
t.Run("context cancel", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
time.Sleep(time.Second / 3)
return &testObject{
name: "id",
}, nil
})
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := c.Get(ctx, "id")
assert.Equal(t, context.Canceled, err)
assert.NoError(t, c.Close())
})
}
func TestOCache_GC(t *testing.T) {
t.Run("test gc expired object", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, true, nil), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
c.GC()
assert.Equal(t, 1, c.Len())
time.Sleep(time.Millisecond * 30)
c.GC()
assert.Equal(t, 0, c.Len())
})
t.Run("test gc tryClose true, close before get", func(t *testing.T) {
closeCh := make(chan struct{})
getCh := make(chan struct{})
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, true, closeCh), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
// making ttl pass
time.Sleep(time.Millisecond * 40)
// first gc will be run after 20 secs, so calling it manually
go c.GC()
// waiting until all objects are marked as closing
time.Sleep(time.Millisecond * 40)
var events []string
go func() {
_, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
events = append(events, "get")
close(getCh)
}()
// sleeping to make sure that Get is called
time.Sleep(time.Millisecond * 40)
events = append(events, "close")
close(closeCh)
<-getCh
require.Equal(t, []string{"close", "get"}, events)
})
t.Run("test gc tryClose false, many get", func(t *testing.T) {
timesCalled := &atomic.Int32{}
obj := NewTestObject("id", false, nil)
c := New(func(ctx context.Context, id string) (value Object, err error) {
timesCalled.Add(1)
return obj, nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
// making ttl pass
time.Sleep(time.Millisecond * 40)
// first gc will be run after 20 secs, so calling it manually
begin := make(chan struct{})
wg := sync.WaitGroup{}
once := sync.Once{}
wg.Add(1)
go func() {
<-begin
c.GC()
wg.Done()
}()
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
once.Do(func() {
close(begin)
})
if i > 0 {
time.Sleep(time.Duration(i) * time.Millisecond)
}
_, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
wg.Done()
}(i)
}
require.NoError(t, err)
wg.Wait()
require.Equal(t, timesCalled.Load(), int32(1))
require.True(t, obj.tryCloseCalled)
})
}
func Test_OCache_Remove(t *testing.T) {
closeCh := make(chan struct{})
getCh := make(chan struct{})
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, false, closeCh), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
// removing the object, so we will wait on closing
go func() {
_, err := c.Remove("id")
require.NoError(t, err)
}()
time.Sleep(time.Millisecond * 40)
var events []string
go func() {
_, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
events = append(events, "get")
close(getCh)
}()
// sleeping to make sure that Get is called
time.Sleep(time.Millisecond * 40)
events = append(events, "close")
close(closeCh)
<-getCh
require.Equal(t, []string{"close", "get"}, events)
}