diff --git a/src/vfs/archive.go b/src/vfs/archive.go index c1eacc1..563f15f 100644 --- a/src/vfs/archive.go +++ b/src/vfs/archive.go @@ -13,45 +13,49 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/ioutils" "github.com/bodgit/sevenzip" - "github.com/mattetti/filebuffer" "github.com/nwaples/rardecode/v2" "github.com/royalcat/ctxio" ) var ArchiveFactories = map[string]FsFactory{ - ".zip": func(ctx context.Context, _ string, f File) (Filesystem, error) { + ".zip": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) { stat, err := f.Info() if err != nil { return nil, err } - return NewArchive(ctx, stat.Name(), f, stat.Size(), ZipLoader) + return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), ZipLoader) }, - ".rar": func(ctx context.Context, _ string, f File) (Filesystem, error) { + ".rar": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) { stat, err := f.Info() if err != nil { return nil, err } - return NewArchive(ctx, stat.Name(), f, stat.Size(), RarLoader) + return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), RarLoader) }, - ".7z": func(ctx context.Context, _ string, f File) (Filesystem, error) { + ".7z": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) { stat, err := f.Info() if err != nil { return nil, err } - return NewArchive(ctx, stat.Name(), f, stat.Size(), SevenZipLoader) + return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), SevenZipLoader) }, } -type archiveLoader func(ctx context.Context, r ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) +type archiveLoader func(ctx context.Context, archivePath string, r ctxio.ReaderAt, size int64) (map[string]fileEntry, error) var _ Filesystem = &ArchiveFS{} +type fileEntry struct { + fs.FileInfo + open func(ctx context.Context) (File, error) +} + type ArchiveFS struct { name string size int64 - files map[string]File + files map[string]fileEntry } // Rename implements Filesystem. @@ -84,8 +88,8 @@ func (a *ArchiveFS) FsName() string { return "archivefs" } -func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) { - archiveFiles, err := loader(ctx, r, size) +func NewArchive(ctx context.Context, archivePath, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) { + archiveFiles, err := loader(ctx, archivePath, r, size) if err != nil { return nil, err } @@ -99,7 +103,7 @@ func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, } } - files := make(map[string]File, len(archiveFiles)) + files := make(map[string]fileEntry, len(archiveFiles)) for k, v := range archiveFiles { // TODO make optional if strings.Contains(k, "/__MACOSX/") { @@ -113,8 +117,13 @@ func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, files[k] = v } - // FIXME - files["/.forcegallery"] = NewMemoryFile(".forcegallery", []byte{}) + // FIXME configurable + files["/.forcegallery"] = fileEntry{ + FileInfo: NewFileInfo("/.forcegallery", 0), + open: func(ctx context.Context) (File, error) { + return NewMemoryFile(".forcegallery", []byte{}), nil + }, + } return &ArchiveFS{ name: name, @@ -129,18 +138,37 @@ func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error { } func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) { - return GetFile(a.files, filename) + if filename == Separator { + return NewDirFile(filename), nil + } + + f, ok := a.files[filename] + if ok { + return f.open(ctx) + } + + for p := range a.files { + if strings.HasPrefix(p, filename) { + return NewDirFile(filename), nil + } + } + + return nil, ErrNotExist } func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { - return ListDirFromFiles(a.files, path) + infos := make(map[string]fs.FileInfo, len(a.files)) + for k, v := range a.files { + infos[k] = v + } + + return ListDirFromInfo(infos, path) } // Stat implements Filesystem. func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { - - if file, ok := afs.files[filename]; ok { - return file.Info() + if entry, ok := afs.files[filename]; ok { + return entry, nil } for p, _ := range afs.files { @@ -174,28 +202,22 @@ func (a *ArchiveFS) Type() fs.FileMode { var _ File = (*archiveFile)(nil) -func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archiveFile { +func newArchiveFile(name string, size int64, rr *randomReaderFromLinear) *archiveFile { return &archiveFile{ name: name, size: size, - af: af, - - buffer: filebuffer.New(nil), + rr: rr, } } -const readahead = 1024 * 16 - type archiveFile struct { name string size int64 - af archiveFileReaderFactory - - m sync.Mutex + m sync.Mutex offset int64 - readen int64 - buffer *filebuffer.Buffer + + rr *randomReaderFromLinear } // Seek implements File. @@ -234,48 +256,11 @@ func (d *archiveFile) IsDir() bool { return false } -func (d *archiveFile) loadMore(ctx context.Context, to int64) error { - if to < d.readen { - return nil - } - - reader, err := d.af(ctx) - if err != nil { - return fmt.Errorf("failed to get file reader: %w", err) - } - defer reader.Close(ctx) - - _, err = d.buffer.Seek(0, io.SeekStart) - if err != nil { - return err - } - d.readen, err = ctxio.CopyN(ctx, ctxio.WrapIoWriter(d.buffer), reader, to+readahead) - if err != nil && err != io.EOF { - return fmt.Errorf("error copying from archive file reader: %w", err) - } - _, err = d.buffer.Seek(d.offset, io.SeekStart) - if err != nil { - return err - } - - return nil -} - func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) { ctx, span := tracer.Start(ctx, "archive.File.Read") defer span.End() - d.m.Lock() - defer d.m.Unlock() - - err = d.loadMore(ctx, d.offset+int64(len(p))) - if err != nil { - return 0, fmt.Errorf("failed to load more from archive file: %w", err) - } - n, err = d.buffer.Read(p) - if err != nil && err != io.EOF { - return n, fmt.Errorf("failed to read from buffer: %w", err) - } + n, err = d.rr.ReadAt(ctx, p, d.offset) d.offset += int64(n) return n, err } @@ -284,37 +269,26 @@ func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, e d.m.Lock() defer d.m.Unlock() - err = d.loadMore(ctx, off+int64(len(p))) - if err != nil { - return 0, fmt.Errorf("failed to load more from archive file: %w", err) - } - n, err = d.buffer.ReadAt(p, off) - if err != nil && err != io.EOF { - return n, fmt.Errorf("failed to readAt from buffer: %w", err) - } - return n, err + return d.rr.ReadAt(ctx, p, off) } func (d *archiveFile) Close(ctx context.Context) error { + // FIXME close should do nothing as archive fs currently reuse the same file instances return nil - // d.m.Lock() - // defer d.m.Unlock() - - // return d.buffer.Close() } type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error) var _ archiveLoader = ZipLoader -func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { - reader := ctxio.IoReaderAt(ctx, ctxreader) +func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { + reader := ctxio.IoReaderAt(ctx, f) zr, err := zip.NewReader(reader, size) if err != nil { return nil, err } - out := make(map[string]*archiveFile) + out := make(map[string]fileEntry) for i := range zr.File { zipFile := zr.File[i] if zipFile.FileInfo().IsDir() { @@ -323,7 +297,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s i := i af := func(ctx context.Context) (ctxio.ReadCloser, error) { - reader := ctxio.IoReaderAt(ctx, ctxreader) + reader := ctxio.IoReaderAt(ctx, f) zr, err := zip.NewReader(reader, size) if err != nil { @@ -338,7 +312,16 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s return ctxio.WrapIoReadCloser(rc), nil } - out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af) + info := zipFile.FileInfo() + + rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: zipFile.Name}, info.Size(), af) + + out[AbsPath(zipFile.Name)] = fileEntry{ + FileInfo: info, + open: func(ctx context.Context) (File, error) { + return newArchiveFile(info.Name(), info.Size(), rr), nil + }, + } } return out, nil @@ -346,14 +329,14 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s var _ archiveLoader = SevenZipLoader -func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { +func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { reader := ctxio.IoReaderAt(ctx, ctxreader) r, err := sevenzip.NewReader(reader, size) if err != nil { return nil, err } - out := make(map[string]*archiveFile) + out := make(map[string]fileEntry) for i, f := range r.File { f := f if f.FileInfo().IsDir() { @@ -376,7 +359,16 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) ( return ctxio.WrapIoReadCloser(rc), nil } - out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af) + info := f.FileInfo() + + rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: f.Name}, info.Size(), af) + + out[AbsPath(f.Name)] = fileEntry{ + FileInfo: f.FileInfo(), + open: func(ctx context.Context) (File, error) { + return newArchiveFile(info.Name(), info.Size(), rr), nil + }, + } } return out, nil @@ -384,7 +376,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) ( var _ archiveLoader = RarLoader -func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { +func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) r, err := rardecode.NewReader(reader) @@ -392,7 +384,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s return nil, err } - out := make(map[string]*archiveFile) + out := make(map[string]fileEntry) for { header, err := r.Next() if err == io.EOF { @@ -421,7 +413,14 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s return nil, fmt.Errorf("file with name '%s' not found", name) } - out[AbsPath(header.Name)] = NewArchiveFile(header.Name, header.UnPackedSize, af) + rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: header.Name}, header.UnPackedSize, af) + + out[AbsPath(header.Name)] = fileEntry{ + FileInfo: NewFileInfo(header.Name, header.UnPackedSize), + open: func(ctx context.Context) (File, error) { + return newArchiveFile(header.Name, header.UnPackedSize, rr), nil + }, + } } return out, nil diff --git a/src/vfs/archive_cache.go b/src/vfs/archive_cache.go new file mode 100644 index 0000000..62bb621 --- /dev/null +++ b/src/vfs/archive_cache.go @@ -0,0 +1,150 @@ +package vfs + +import ( + "context" + "errors" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/royalcat/ctxio" +) + +// TODO переделать кеш в демон + +const blockSize int64 = 1024 * 16 // 16KB +const defaultBlockCount = 32768 // 512MB of total usage + +type archiveFileIndex struct { + archive string + filename string +} + +type blockIndex struct { + index archiveFileIndex + off int64 +} + +var blockCache *lru.Cache[blockIndex, []byte] + +func ChangeBufferSize(blockCount int) { + blockCache.Resize(blockCount) +} + +func init() { + var err error + blockCache, err = lru.New[blockIndex, []byte](defaultBlockCount) + if err != nil { + panic(err) + } +} + +func newRandomReaderFromLinear(index archiveFileIndex, size int64, readerFactory archiveFileReaderFactory) *randomReaderFromLinear { + return &randomReaderFromLinear{ + index: index, + size: size, + readerFactory: readerFactory, + } +} + +type randomReaderFromLinear struct { + index archiveFileIndex + readerFactory archiveFileReaderFactory + reader ctxio.ReadCloser + readen int64 + size int64 + closed bool +} + +var _ ctxio.ReaderAt = (*randomReaderFromLinear)(nil) +var _ ctxio.Closer = (*randomReaderFromLinear)(nil) + +// ReadAt implements ctxio.ReaderAt. +func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + ctx, span := tracer.Start(ctx, "archive.RandomReader.ReadAt") + defer span.End() + + if a.closed { + return 0, errors.New("reader is closed") + } + + if off >= a.size { + return 0, ctxio.EOF + } + + aligntOff := (off / blockSize) * blockSize + + block, ok := blockCache.Get(blockIndex{index: a.index, off: aligntOff}) + if ok { + n = copy(p, block[off-aligntOff:]) + if len(block) < int(blockSize) { + err = ctxio.EOF + } + + return n, err + } + + span.AddEvent("cache miss, reading from file") + if err := a.readTo(ctx, aligntOff+blockSize); err != nil && err != ctxio.EOF { + return 0, err + } + + block, ok = blockCache.Get(blockIndex{index: a.index, off: aligntOff}) + if !ok { + // WTF this theoretically shouldn't happen under normal scenarios + return 0, errors.New("block not found or block cache under too much pressure, try to increase the cache size") + } + + n = copy(p, block[off-aligntOff:]) + if len(block) < int(blockSize) { + err = ctxio.EOF + } + return n, err +} + +func (a *randomReaderFromLinear) readTo(ctx context.Context, targetOffset int64) (err error) { + if a.reader == nil || a.readen > targetOffset { + a.reader, err = a.readerFactory(context.TODO()) + if err != nil { + return err + } + a.readen = 0 + } + + for off := a.readen; off < targetOffset; off += blockSize { + // TODO sync.Pool ? + buf := make([]byte, blockSize) + n, err := a.reader.Read(ctx, buf) + if err != nil && err != ctxio.EOF { + return err + } + a.readen += int64(n) + if int64(n) < blockSize { + buf = buf[:n] + } + + blockCache.Add(blockIndex{index: a.index, off: off}, buf) + } + + return nil +} + +// Close implements ctxio.Closer. +func (a *randomReaderFromLinear) Close(ctx context.Context) error { + if a.closed { + return nil + } + a.closed = true + + var errs []error + + if a.reader != nil { + errs = append(errs, a.reader.Close(ctx)) + } + + for _, block := range blockCache.Keys() { + if block.index == a.index { + blockCache.Remove(block) + } + } + + return errors.Join(errs...) +} diff --git a/src/vfs/archive_test.go b/src/vfs/archive_test.go index 117ea49..b209264 100644 --- a/src/vfs/archive_test.go +++ b/src/vfs/archive_test.go @@ -47,7 +47,7 @@ func TestZipFilesystem(t *testing.T) { ctx := context.Background() // TODO add single dir collapse test - zfs, err := vfs.NewArchive(ctx, "test", zReader, size, vfs.ZipLoader) + zfs, err := vfs.NewArchive(ctx, "test", "test", zReader, size, vfs.ZipLoader) require.NoError(err) files, err := zfs.ReadDir(ctx, "/path/to/test/file")