This commit is contained in:
parent
aa0affb019
commit
3e948f55a2
3 changed files with 242 additions and 93 deletions
|
@ -13,45 +13,49 @@ import (
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
||||||
"github.com/bodgit/sevenzip"
|
"github.com/bodgit/sevenzip"
|
||||||
"github.com/mattetti/filebuffer"
|
|
||||||
"github.com/nwaples/rardecode/v2"
|
"github.com/nwaples/rardecode/v2"
|
||||||
"github.com/royalcat/ctxio"
|
"github.com/royalcat/ctxio"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ArchiveFactories = map[string]FsFactory{
|
var ArchiveFactories = map[string]FsFactory{
|
||||||
".zip": func(ctx context.Context, _ string, f File) (Filesystem, error) {
|
".zip": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) {
|
||||||
stat, err := f.Info()
|
stat, err := f.Info()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewArchive(ctx, stat.Name(), f, stat.Size(), ZipLoader)
|
return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), ZipLoader)
|
||||||
},
|
},
|
||||||
".rar": func(ctx context.Context, _ string, f File) (Filesystem, error) {
|
".rar": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) {
|
||||||
stat, err := f.Info()
|
stat, err := f.Info()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewArchive(ctx, stat.Name(), f, stat.Size(), RarLoader)
|
return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), RarLoader)
|
||||||
},
|
},
|
||||||
".7z": func(ctx context.Context, _ string, f File) (Filesystem, error) {
|
".7z": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) {
|
||||||
stat, err := f.Info()
|
stat, err := f.Info()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewArchive(ctx, stat.Name(), f, stat.Size(), SevenZipLoader)
|
return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), SevenZipLoader)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
type archiveLoader func(ctx context.Context, r ctxio.ReaderAt, size int64) (map[string]*archiveFile, error)
|
type archiveLoader func(ctx context.Context, archivePath string, r ctxio.ReaderAt, size int64) (map[string]fileEntry, error)
|
||||||
|
|
||||||
var _ Filesystem = &ArchiveFS{}
|
var _ Filesystem = &ArchiveFS{}
|
||||||
|
|
||||||
|
type fileEntry struct {
|
||||||
|
fs.FileInfo
|
||||||
|
open func(ctx context.Context) (File, error)
|
||||||
|
}
|
||||||
|
|
||||||
type ArchiveFS struct {
|
type ArchiveFS struct {
|
||||||
name string
|
name string
|
||||||
|
|
||||||
size int64
|
size int64
|
||||||
|
|
||||||
files map[string]File
|
files map[string]fileEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rename implements Filesystem.
|
// Rename implements Filesystem.
|
||||||
|
@ -84,8 +88,8 @@ func (a *ArchiveFS) FsName() string {
|
||||||
return "archivefs"
|
return "archivefs"
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) {
|
func NewArchive(ctx context.Context, archivePath, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) {
|
||||||
archiveFiles, err := loader(ctx, r, size)
|
archiveFiles, err := loader(ctx, archivePath, r, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -99,7 +103,7 @@ func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
files := make(map[string]File, len(archiveFiles))
|
files := make(map[string]fileEntry, len(archiveFiles))
|
||||||
for k, v := range archiveFiles {
|
for k, v := range archiveFiles {
|
||||||
// TODO make optional
|
// TODO make optional
|
||||||
if strings.Contains(k, "/__MACOSX/") {
|
if strings.Contains(k, "/__MACOSX/") {
|
||||||
|
@ -113,8 +117,13 @@ func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64,
|
||||||
files[k] = v
|
files[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME
|
// FIXME configurable
|
||||||
files["/.forcegallery"] = NewMemoryFile(".forcegallery", []byte{})
|
files["/.forcegallery"] = fileEntry{
|
||||||
|
FileInfo: NewFileInfo("/.forcegallery", 0),
|
||||||
|
open: func(ctx context.Context) (File, error) {
|
||||||
|
return NewMemoryFile(".forcegallery", []byte{}), nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
return &ArchiveFS{
|
return &ArchiveFS{
|
||||||
name: name,
|
name: name,
|
||||||
|
@ -129,18 +138,37 @@ func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) {
|
func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) {
|
||||||
return GetFile(a.files, filename)
|
if filename == Separator {
|
||||||
|
return NewDirFile(filename), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
f, ok := a.files[filename]
|
||||||
|
if ok {
|
||||||
|
return f.open(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
for p := range a.files {
|
||||||
|
if strings.HasPrefix(p, filename) {
|
||||||
|
return NewDirFile(filename), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
|
func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
|
||||||
return ListDirFromFiles(a.files, path)
|
infos := make(map[string]fs.FileInfo, len(a.files))
|
||||||
|
for k, v := range a.files {
|
||||||
|
infos[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return ListDirFromInfo(infos, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat implements Filesystem.
|
// Stat implements Filesystem.
|
||||||
func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
|
func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
|
||||||
|
if entry, ok := afs.files[filename]; ok {
|
||||||
if file, ok := afs.files[filename]; ok {
|
return entry, nil
|
||||||
return file.Info()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for p, _ := range afs.files {
|
for p, _ := range afs.files {
|
||||||
|
@ -174,28 +202,22 @@ func (a *ArchiveFS) Type() fs.FileMode {
|
||||||
|
|
||||||
var _ File = (*archiveFile)(nil)
|
var _ File = (*archiveFile)(nil)
|
||||||
|
|
||||||
func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archiveFile {
|
func newArchiveFile(name string, size int64, rr *randomReaderFromLinear) *archiveFile {
|
||||||
return &archiveFile{
|
return &archiveFile{
|
||||||
name: name,
|
name: name,
|
||||||
size: size,
|
size: size,
|
||||||
af: af,
|
rr: rr,
|
||||||
|
|
||||||
buffer: filebuffer.New(nil),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const readahead = 1024 * 16
|
|
||||||
|
|
||||||
type archiveFile struct {
|
type archiveFile struct {
|
||||||
name string
|
name string
|
||||||
size int64
|
size int64
|
||||||
af archiveFileReaderFactory
|
|
||||||
|
|
||||||
m sync.Mutex
|
|
||||||
|
|
||||||
|
m sync.Mutex
|
||||||
offset int64
|
offset int64
|
||||||
readen int64
|
|
||||||
buffer *filebuffer.Buffer
|
rr *randomReaderFromLinear
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements File.
|
// Seek implements File.
|
||||||
|
@ -234,48 +256,11 @@ func (d *archiveFile) IsDir() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
|
|
||||||
if to < d.readen {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
reader, err := d.af(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get file reader: %w", err)
|
|
||||||
}
|
|
||||||
defer reader.Close(ctx)
|
|
||||||
|
|
||||||
_, err = d.buffer.Seek(0, io.SeekStart)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
d.readen, err = ctxio.CopyN(ctx, ctxio.WrapIoWriter(d.buffer), reader, to+readahead)
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
return fmt.Errorf("error copying from archive file reader: %w", err)
|
|
||||||
}
|
|
||||||
_, err = d.buffer.Seek(d.offset, io.SeekStart)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||||
ctx, span := tracer.Start(ctx, "archive.File.Read")
|
ctx, span := tracer.Start(ctx, "archive.File.Read")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
d.m.Lock()
|
n, err = d.rr.ReadAt(ctx, p, d.offset)
|
||||||
defer d.m.Unlock()
|
|
||||||
|
|
||||||
err = d.loadMore(ctx, d.offset+int64(len(p)))
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
|
||||||
}
|
|
||||||
n, err = d.buffer.Read(p)
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
return n, fmt.Errorf("failed to read from buffer: %w", err)
|
|
||||||
}
|
|
||||||
d.offset += int64(n)
|
d.offset += int64(n)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
@ -284,37 +269,26 @@ func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, e
|
||||||
d.m.Lock()
|
d.m.Lock()
|
||||||
defer d.m.Unlock()
|
defer d.m.Unlock()
|
||||||
|
|
||||||
err = d.loadMore(ctx, off+int64(len(p)))
|
return d.rr.ReadAt(ctx, p, off)
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
|
||||||
}
|
|
||||||
n, err = d.buffer.ReadAt(p, off)
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
return n, fmt.Errorf("failed to readAt from buffer: %w", err)
|
|
||||||
}
|
|
||||||
return n, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *archiveFile) Close(ctx context.Context) error {
|
func (d *archiveFile) Close(ctx context.Context) error {
|
||||||
|
// FIXME close should do nothing as archive fs currently reuse the same file instances
|
||||||
return nil
|
return nil
|
||||||
// d.m.Lock()
|
|
||||||
// defer d.m.Unlock()
|
|
||||||
|
|
||||||
// return d.buffer.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error)
|
type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error)
|
||||||
|
|
||||||
var _ archiveLoader = ZipLoader
|
var _ archiveLoader = ZipLoader
|
||||||
|
|
||||||
func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
|
||||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
reader := ctxio.IoReaderAt(ctx, f)
|
||||||
zr, err := zip.NewReader(reader, size)
|
zr, err := zip.NewReader(reader, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(map[string]*archiveFile)
|
out := make(map[string]fileEntry)
|
||||||
for i := range zr.File {
|
for i := range zr.File {
|
||||||
zipFile := zr.File[i]
|
zipFile := zr.File[i]
|
||||||
if zipFile.FileInfo().IsDir() {
|
if zipFile.FileInfo().IsDir() {
|
||||||
|
@ -323,7 +297,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
|
|
||||||
i := i
|
i := i
|
||||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
reader := ctxio.IoReaderAt(ctx, f)
|
||||||
|
|
||||||
zr, err := zip.NewReader(reader, size)
|
zr, err := zip.NewReader(reader, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -338,7 +312,16 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
return ctxio.WrapIoReadCloser(rc), nil
|
return ctxio.WrapIoReadCloser(rc), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af)
|
info := zipFile.FileInfo()
|
||||||
|
|
||||||
|
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: zipFile.Name}, info.Size(), af)
|
||||||
|
|
||||||
|
out[AbsPath(zipFile.Name)] = fileEntry{
|
||||||
|
FileInfo: info,
|
||||||
|
open: func(ctx context.Context) (File, error) {
|
||||||
|
return newArchiveFile(info.Name(), info.Size(), rr), nil
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
|
@ -346,14 +329,14 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
|
|
||||||
var _ archiveLoader = SevenZipLoader
|
var _ archiveLoader = SevenZipLoader
|
||||||
|
|
||||||
func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
|
||||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||||
r, err := sevenzip.NewReader(reader, size)
|
r, err := sevenzip.NewReader(reader, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(map[string]*archiveFile)
|
out := make(map[string]fileEntry)
|
||||||
for i, f := range r.File {
|
for i, f := range r.File {
|
||||||
f := f
|
f := f
|
||||||
if f.FileInfo().IsDir() {
|
if f.FileInfo().IsDir() {
|
||||||
|
@ -376,7 +359,16 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
|
||||||
return ctxio.WrapIoReadCloser(rc), nil
|
return ctxio.WrapIoReadCloser(rc), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af)
|
info := f.FileInfo()
|
||||||
|
|
||||||
|
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: f.Name}, info.Size(), af)
|
||||||
|
|
||||||
|
out[AbsPath(f.Name)] = fileEntry{
|
||||||
|
FileInfo: f.FileInfo(),
|
||||||
|
open: func(ctx context.Context) (File, error) {
|
||||||
|
return newArchiveFile(info.Name(), info.Size(), rr), nil
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
|
@ -384,7 +376,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
|
||||||
|
|
||||||
var _ archiveLoader = RarLoader
|
var _ archiveLoader = RarLoader
|
||||||
|
|
||||||
func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
|
||||||
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||||
|
|
||||||
r, err := rardecode.NewReader(reader)
|
r, err := rardecode.NewReader(reader)
|
||||||
|
@ -392,7 +384,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(map[string]*archiveFile)
|
out := make(map[string]fileEntry)
|
||||||
for {
|
for {
|
||||||
header, err := r.Next()
|
header, err := r.Next()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
@ -421,7 +413,14 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
return nil, fmt.Errorf("file with name '%s' not found", name)
|
return nil, fmt.Errorf("file with name '%s' not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
out[AbsPath(header.Name)] = NewArchiveFile(header.Name, header.UnPackedSize, af)
|
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: header.Name}, header.UnPackedSize, af)
|
||||||
|
|
||||||
|
out[AbsPath(header.Name)] = fileEntry{
|
||||||
|
FileInfo: NewFileInfo(header.Name, header.UnPackedSize),
|
||||||
|
open: func(ctx context.Context) (File, error) {
|
||||||
|
return newArchiveFile(header.Name, header.UnPackedSize, rr), nil
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
|
|
150
src/vfs/archive_cache.go
Normal file
150
src/vfs/archive_cache.go
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
package vfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
"github.com/royalcat/ctxio"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO переделать кеш в демон
|
||||||
|
|
||||||
|
const blockSize int64 = 1024 * 16 // 16KB
|
||||||
|
const defaultBlockCount = 32768 // 512MB of total usage
|
||||||
|
|
||||||
|
type archiveFileIndex struct {
|
||||||
|
archive string
|
||||||
|
filename string
|
||||||
|
}
|
||||||
|
|
||||||
|
type blockIndex struct {
|
||||||
|
index archiveFileIndex
|
||||||
|
off int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var blockCache *lru.Cache[blockIndex, []byte]
|
||||||
|
|
||||||
|
func ChangeBufferSize(blockCount int) {
|
||||||
|
blockCache.Resize(blockCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
var err error
|
||||||
|
blockCache, err = lru.New[blockIndex, []byte](defaultBlockCount)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRandomReaderFromLinear(index archiveFileIndex, size int64, readerFactory archiveFileReaderFactory) *randomReaderFromLinear {
|
||||||
|
return &randomReaderFromLinear{
|
||||||
|
index: index,
|
||||||
|
size: size,
|
||||||
|
readerFactory: readerFactory,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type randomReaderFromLinear struct {
|
||||||
|
index archiveFileIndex
|
||||||
|
readerFactory archiveFileReaderFactory
|
||||||
|
reader ctxio.ReadCloser
|
||||||
|
readen int64
|
||||||
|
size int64
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ ctxio.ReaderAt = (*randomReaderFromLinear)(nil)
|
||||||
|
var _ ctxio.Closer = (*randomReaderFromLinear)(nil)
|
||||||
|
|
||||||
|
// ReadAt implements ctxio.ReaderAt.
|
||||||
|
func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "archive.RandomReader.ReadAt")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if a.closed {
|
||||||
|
return 0, errors.New("reader is closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if off >= a.size {
|
||||||
|
return 0, ctxio.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
aligntOff := (off / blockSize) * blockSize
|
||||||
|
|
||||||
|
block, ok := blockCache.Get(blockIndex{index: a.index, off: aligntOff})
|
||||||
|
if ok {
|
||||||
|
n = copy(p, block[off-aligntOff:])
|
||||||
|
if len(block) < int(blockSize) {
|
||||||
|
err = ctxio.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
span.AddEvent("cache miss, reading from file")
|
||||||
|
if err := a.readTo(ctx, aligntOff+blockSize); err != nil && err != ctxio.EOF {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
block, ok = blockCache.Get(blockIndex{index: a.index, off: aligntOff})
|
||||||
|
if !ok {
|
||||||
|
// WTF this theoretically shouldn't happen under normal scenarios
|
||||||
|
return 0, errors.New("block not found or block cache under too much pressure, try to increase the cache size")
|
||||||
|
}
|
||||||
|
|
||||||
|
n = copy(p, block[off-aligntOff:])
|
||||||
|
if len(block) < int(blockSize) {
|
||||||
|
err = ctxio.EOF
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *randomReaderFromLinear) readTo(ctx context.Context, targetOffset int64) (err error) {
|
||||||
|
if a.reader == nil || a.readen > targetOffset {
|
||||||
|
a.reader, err = a.readerFactory(context.TODO())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.readen = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
for off := a.readen; off < targetOffset; off += blockSize {
|
||||||
|
// TODO sync.Pool ?
|
||||||
|
buf := make([]byte, blockSize)
|
||||||
|
n, err := a.reader.Read(ctx, buf)
|
||||||
|
if err != nil && err != ctxio.EOF {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.readen += int64(n)
|
||||||
|
if int64(n) < blockSize {
|
||||||
|
buf = buf[:n]
|
||||||
|
}
|
||||||
|
|
||||||
|
blockCache.Add(blockIndex{index: a.index, off: off}, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements ctxio.Closer.
|
||||||
|
func (a *randomReaderFromLinear) Close(ctx context.Context) error {
|
||||||
|
if a.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
a.closed = true
|
||||||
|
|
||||||
|
var errs []error
|
||||||
|
|
||||||
|
if a.reader != nil {
|
||||||
|
errs = append(errs, a.reader.Close(ctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, block := range blockCache.Keys() {
|
||||||
|
if block.index == a.index {
|
||||||
|
blockCache.Remove(block)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Join(errs...)
|
||||||
|
}
|
|
@ -47,7 +47,7 @@ func TestZipFilesystem(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// TODO add single dir collapse test
|
// TODO add single dir collapse test
|
||||||
zfs, err := vfs.NewArchive(ctx, "test", zReader, size, vfs.ZipLoader)
|
zfs, err := vfs.NewArchive(ctx, "test", "test", zReader, size, vfs.ZipLoader)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
|
||||||
files, err := zfs.ReadDir(ctx, "/path/to/test/file")
|
files, err := zfs.ReadDir(ctx, "/path/to/test/file")
|
||||||
|
|
Loading…
Reference in a new issue