tstor/daemons/archive/archive_cache.go

184 lines
4 KiB
Go
Raw Permalink Normal View History

2025-01-20 02:18:15 +00:00
package archive
2024-10-26 21:23:46 +00:00
import (
"context"
"errors"
2025-01-02 18:22:44 +00:00
"fmt"
2024-11-15 13:39:56 +00:00
"io"
"sync"
2024-10-26 21:23:46 +00:00
2025-01-20 02:18:15 +00:00
"git.kmsign.ru/royalcat/tstor/src/vfs"
2025-01-07 21:51:11 +00:00
"github.com/hashicorp/golang-lru/arc/v2"
2024-10-26 21:23:46 +00:00
"github.com/royalcat/ctxio"
2025-01-02 18:22:44 +00:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
2024-10-26 21:23:46 +00:00
)
// TODO переделать кеш в демон
2024-11-15 13:39:56 +00:00
const blockSize = 1024 * 16 // 16KB
const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage
const defaultBlockCount = cacheSize / blockSize
2024-10-26 21:23:46 +00:00
2025-01-20 02:18:15 +00:00
type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error)
2024-10-26 21:23:46 +00:00
type archiveFileIndex struct {
2025-01-20 02:18:15 +00:00
archiveHash vfs.Hash
2024-12-09 20:44:01 +00:00
filename string
2024-10-26 21:23:46 +00:00
}
type blockIndex struct {
index archiveFileIndex
off int64
}
2024-11-15 13:39:56 +00:00
type block struct {
data [blockSize]byte
len int
}
2025-01-07 21:51:11 +00:00
var blockCache *arc.ARCCache[blockIndex, block]
2024-10-26 21:23:46 +00:00
func init() {
var err error
2025-01-07 21:51:11 +00:00
blockCache, err = arc.NewARC[blockIndex, block](defaultBlockCount)
2024-10-26 21:23:46 +00:00
if err != nil {
panic(err)
}
}
func newRandomReaderFromLinear(index archiveFileIndex, size int64, readerFactory archiveFileReaderFactory) *randomReaderFromLinear {
return &randomReaderFromLinear{
index: index,
size: size,
readerFactory: readerFactory,
}
}
type randomReaderFromLinear struct {
index archiveFileIndex
readerFactory archiveFileReaderFactory
reader ctxio.ReadCloser
readen int64
2024-11-15 13:39:56 +00:00
readerMutex sync.Mutex
2024-10-26 21:23:46 +00:00
size int64
closed bool
}
var _ ctxio.ReaderAt = (*randomReaderFromLinear)(nil)
var _ ctxio.Closer = (*randomReaderFromLinear)(nil)
// ReadAt implements ctxio.ReaderAt.
func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
ctx, span := tracer.Start(ctx, "archive.RandomReader.ReadAt")
defer span.End()
if a.closed {
return 0, errors.New("reader is closed")
}
if off >= a.size {
return 0, ctxio.EOF
}
aligntOff := (off / blockSize) * blockSize
2024-11-15 13:39:56 +00:00
bI := blockIndex{index: a.index, off: aligntOff}
2024-10-26 21:23:46 +00:00
2025-01-02 18:22:44 +00:00
block, err := a.readBlock(ctx, bI)
2024-11-15 13:39:56 +00:00
if err != nil && err != ctxio.EOF {
2024-10-26 21:23:46 +00:00
return 0, err
}
2025-01-02 18:22:44 +00:00
if off-aligntOff >= int64(block.len) {
return 0, ctxio.EOF
}
2024-11-15 13:39:56 +00:00
return copy(p, block.data[off-aligntOff:block.len]), err
}
2024-10-26 21:23:46 +00:00
2024-11-15 13:39:56 +00:00
func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (block, error) {
2025-01-02 18:22:44 +00:00
ctx, span := tracer.Start(ctx, "archive.RandomReader.readBlock")
defer span.End()
// check block in cache before locking
if b, ok := blockCache.Get(bI); ok && b.len != 0 {
return b, nil
}
2024-11-15 13:39:56 +00:00
a.readerMutex.Lock()
defer a.readerMutex.Unlock()
2024-10-26 21:23:46 +00:00
2024-12-09 20:44:01 +00:00
if b, ok := blockCache.Get(bI); ok && b.len != 0 { // check again, maybe another goroutine already read this block
2024-11-15 13:39:56 +00:00
return b, nil
2024-10-26 21:23:46 +00:00
}
2024-11-15 13:39:56 +00:00
if a.reader == nil || a.readen > bI.off {
2025-01-02 18:22:44 +00:00
span.AddEvent("reader not valid, creating new reader", trace.WithAttributes(
attribute.Bool("reader_initialized", a.reader != nil),
attribute.Int64("readen", a.readen),
attribute.Int64("target_offset", bI.off),
))
if a.reader != nil {
if err := a.reader.Close(ctx); err != nil {
return block{}, fmt.Errorf("failed to close previous reader: %w", err)
}
}
2024-11-15 13:39:56 +00:00
var err error
2024-10-26 21:23:46 +00:00
a.reader, err = a.readerFactory(context.TODO())
if err != nil {
2024-11-15 13:39:56 +00:00
return block{}, err
2024-10-26 21:23:46 +00:00
}
a.readen = 0
}
2024-11-15 13:39:56 +00:00
for off := a.readen; off <= bI.off; off += blockSize {
2024-10-26 21:23:46 +00:00
// TODO sync.Pool ?
2024-11-15 13:39:56 +00:00
buf := [blockSize]byte{}
n, err := a.reader.Read(ctx, buf[:])
2024-10-26 21:23:46 +00:00
if err != nil && err != ctxio.EOF {
2024-11-15 13:39:56 +00:00
return block{}, err
2024-10-26 21:23:46 +00:00
}
a.readen += int64(n)
2024-11-15 13:39:56 +00:00
if n == 0 {
return block{}, io.EOF
2024-10-26 21:23:46 +00:00
}
2024-11-15 13:39:56 +00:00
blockCache.Add(blockIndex{bI.index, off}, block{len: n, data: buf})
if off == bI.off {
return block{len: n, data: buf}, err
}
if n < int(blockSize) && errors.Is(err, ctxio.EOF) {
return block{}, err
}
2024-10-26 21:23:46 +00:00
}
2024-11-15 13:39:56 +00:00
return block{}, io.EOF
2024-10-26 21:23:46 +00:00
}
// Close implements ctxio.Closer.
func (a *randomReaderFromLinear) Close(ctx context.Context) error {
if a.closed {
return nil
}
a.closed = true
var errs []error
if a.reader != nil {
errs = append(errs, a.reader.Close(ctx))
}
for _, block := range blockCache.Keys() {
if block.index == a.index {
blockCache.Remove(block)
}
}
return errors.Join(errs...)
}