package vfs import ( "context" "errors" "io" "sync" "github.com/dgraph-io/ristretto" lru "github.com/hashicorp/golang-lru/v2" "github.com/royalcat/ctxio" ) // TODO переделать кеш в демон const blockSize = 1024 * 16 // 16KB const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage const defaultBlockCount = cacheSize / blockSize type archiveFileIndex struct { archive string filename string } type blockIndex struct { index archiveFileIndex off int64 } type block struct { data [blockSize]byte len int } var blockCache *lru.Cache[blockIndex, block] func ChangeBufferSize(blockCount int) { blockCache.Resize(blockCount) } func init() { ristretto.NewCache(&ristretto.Config{}) var err error blockCache, err = lru.New[blockIndex, block](defaultBlockCount) if err != nil { panic(err) } } func newRandomReaderFromLinear(index archiveFileIndex, size int64, readerFactory archiveFileReaderFactory) *randomReaderFromLinear { return &randomReaderFromLinear{ index: index, size: size, readerFactory: readerFactory, } } type randomReaderFromLinear struct { index archiveFileIndex readerFactory archiveFileReaderFactory reader ctxio.ReadCloser readen int64 readerMutex sync.Mutex size int64 closed bool } var _ ctxio.ReaderAt = (*randomReaderFromLinear)(nil) var _ ctxio.Closer = (*randomReaderFromLinear)(nil) // ReadAt implements ctxio.ReaderAt. func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { ctx, span := tracer.Start(ctx, "archive.RandomReader.ReadAt") defer span.End() if a.closed { return 0, errors.New("reader is closed") } if off >= a.size { return 0, ctxio.EOF } aligntOff := (off / blockSize) * blockSize bI := blockIndex{index: a.index, off: aligntOff} block, ok := blockCache.Get(bI) if ok { n = copy(p, block.data[off-aligntOff:block.len]) if block.len < int(blockSize) { err = ctxio.EOF } return n, err } span.AddEvent("cache miss, reading from file") block, err = a.readBlock(ctx, bI) if err != nil && err != ctxio.EOF { return 0, err } return copy(p, block.data[off-aligntOff:block.len]), err } func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (block, error) { a.readerMutex.Lock() defer a.readerMutex.Unlock() if b, ok := blockCache.Get(bI); ok { // check again, maybe another goroutine already read this block return b, nil } if a.reader == nil || a.readen > bI.off { var err error a.reader, err = a.readerFactory(context.TODO()) if err != nil { return block{}, err } a.readen = 0 } for off := a.readen; off <= bI.off; off += blockSize { // TODO sync.Pool ? buf := [blockSize]byte{} n, err := a.reader.Read(ctx, buf[:]) if err != nil && err != ctxio.EOF { return block{}, err } a.readen += int64(n) if n == 0 { return block{}, io.EOF } blockCache.Add(blockIndex{bI.index, off}, block{len: n, data: buf}) if off == bI.off { return block{len: n, data: buf}, err } if n < int(blockSize) && errors.Is(err, ctxio.EOF) { return block{}, err } } return block{}, io.EOF } // Close implements ctxio.Closer. func (a *randomReaderFromLinear) Close(ctx context.Context) error { if a.closed { return nil } a.closed = true var errs []error if a.reader != nil { errs = append(errs, a.reader.Close(ctx)) } for _, block := range blockCache.Keys() { if block.index == a.index { blockCache.Remove(block) } } return errors.Join(errs...) }