From b77ce50a7b21fe0dda90b49c19b75b03c746525a Mon Sep 17 00:00:00 2001 From: royalcat Date: Mon, 9 Dec 2024 23:44:01 +0300 Subject: [PATCH] multithreader read dir --- src/vfs/archive.go | 37 ++++++++++++++++-------- src/vfs/archive_cache.go | 6 ++-- src/vfs/archive_test.go | 61 ++++++++++++++++++++++++++++++++++------ src/vfs/resolver.go | 48 ++++++++++++++++--------------- 4 files changed, 106 insertions(+), 46 deletions(-) diff --git a/src/vfs/archive.go b/src/vfs/archive.go index 8e39cf0..6bac4a9 100644 --- a/src/vfs/archive.go +++ b/src/vfs/archive.go @@ -41,7 +41,7 @@ var ArchiveFactories = map[string]FsFactory{ }, } -type archiveLoader func(ctx context.Context, archivePath string, r ctxio.ReaderAt, size int64) (map[string]fileEntry, error) +type archiveLoader func(ctx context.Context, archivePath string, r File, size int64) (map[string]fileEntry, error) var _ Filesystem = &ArchiveFS{} @@ -88,8 +88,8 @@ func (a *ArchiveFS) FsName() string { return "archivefs" } -func NewArchive(ctx context.Context, archivePath, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) { - archiveFiles, err := loader(ctx, archivePath, r, size) +func NewArchive(ctx context.Context, archivePath, name string, f File, size int64, loader archiveLoader) (*ArchiveFS, error) { + archiveFiles, err := loader(ctx, archivePath, f, size) if err != nil { return nil, err } @@ -281,7 +281,12 @@ type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error var _ archiveLoader = ZipLoader -func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { +func ZipLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) { + hash, err := FileHash(ctx, f) + if err != nil { + return nil, err + } + reader := ctxio.IoReaderAt(ctx, f) zr, err := zip.NewReader(reader, size) if err != nil { @@ -314,7 +319,7 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i info := zipFile.FileInfo() - rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: zipFile.Name}, info.Size(), af) + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af) out[AbsPath(zipFile.Name)] = fileEntry{ FileInfo: info, @@ -329,7 +334,12 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i var _ archiveLoader = SevenZipLoader -func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { +func SevenZipLoader(ctx context.Context, archivePath string, ctxreader File, size int64) (map[string]fileEntry, error) { + hash, err := FileHash(ctx, ctxreader) + if err != nil { + return nil, err + } + reader := ctxio.IoReaderAt(ctx, ctxreader) r, err := sevenzip.NewReader(reader, size) if err != nil { @@ -361,7 +371,7 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea info := f.FileInfo() - rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: f.Name}, info.Size(), af) + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af) out[AbsPath(f.Name)] = fileEntry{ FileInfo: f.FileInfo(), @@ -376,8 +386,13 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea var _ archiveLoader = RarLoader -func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { - reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) +func RarLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) { + hash, err := FileHash(ctx, f) + if err != nil { + return nil, err + } + + reader := ioutils.WrapIoReadSeeker(ctx, f, size) r, err := rardecode.NewReader(reader) if err != nil { @@ -396,7 +411,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt name := header.Name af := func(ctx context.Context) (ctxio.ReadCloser, error) { - reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) + reader := ioutils.WrapIoReadSeeker(ctx, f, size) r, err := rardecode.NewReader(reader) if err != nil { return nil, err @@ -413,7 +428,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt return nil, fmt.Errorf("file with name '%s' not found", name) } - rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: header.Name}, header.UnPackedSize, af) + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af) out[AbsPath(header.Name)] = fileEntry{ FileInfo: NewFileInfo(header.Name, header.UnPackedSize), diff --git a/src/vfs/archive_cache.go b/src/vfs/archive_cache.go index 41237e7..47e6fc8 100644 --- a/src/vfs/archive_cache.go +++ b/src/vfs/archive_cache.go @@ -18,8 +18,8 @@ const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage const defaultBlockCount = cacheSize / blockSize type archiveFileIndex struct { - archive string - filename string + archiveHash Hash + filename string } type blockIndex struct { @@ -107,7 +107,7 @@ func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) ( a.readerMutex.Lock() defer a.readerMutex.Unlock() - if b, ok := blockCache.Get(bI); ok { // check again, maybe another goroutine already read this block + if b, ok := blockCache.Get(bI); ok && b.len != 0 { // check again, maybe another goroutine already read this block return b, nil } diff --git a/src/vfs/archive_test.go b/src/vfs/archive_test.go index b209264..fcac485 100644 --- a/src/vfs/archive_test.go +++ b/src/vfs/archive_test.go @@ -4,10 +4,11 @@ import ( "archive/zip" "bytes" "context" + "io" + "io/fs" "testing" "git.kmsign.ru/royalcat/tstor/src/vfs" - "github.com/royalcat/ctxio" "github.com/stretchr/testify/require" ) @@ -62,24 +63,24 @@ func TestZipFilesystem(t *testing.T) { f, err := zfs.Open(ctx, "/path/to/test/file/1.txt") require.NoError(err) n, err := f.Read(ctx, out) - require.NoError(err) + require.ErrorIs(err, io.EOF) require.Equal(5, n) require.Equal([]byte("Hello"), out) outSpace := make([]byte, 1) n, err = f.Read(ctx, outSpace) - require.NoError(err) + require.ErrorIs(err, io.EOF) require.Equal(1, n) require.Equal([]byte(" "), outSpace) n, err = f.Read(ctx, out) - require.NoError(err) + require.ErrorIs(err, io.EOF) require.Equal(5, n) require.Equal([]byte("World"), out) } -func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) { +func createTestZip(require *require.Assertions) (vfs.File, int64) { buf := bytes.NewBuffer([]byte{}) zWriter := zip.NewWriter(buf) @@ -95,17 +96,59 @@ func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) { return newCBR(buf.Bytes()), int64(buf.Len()) } -type closeableByteReader struct { - data *bytes.Reader -} - func newCBR(b []byte) *closeableByteReader { return &closeableByteReader{ data: bytes.NewReader(b), } } +var _ vfs.File = &closeableByteReader{} + +type closeableByteReader struct { + data *bytes.Reader +} + // ReadAt implements ctxio.ReaderAt. func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { return c.data.ReadAt(p, off) } + +// Close implements vfs.File. +func (c *closeableByteReader) Close(ctx context.Context) error { + panic("unimplemented") +} + +// Info implements vfs.File. +func (c *closeableByteReader) Info() (fs.FileInfo, error) { + panic("unimplemented") +} + +// IsDir implements vfs.File. +func (c *closeableByteReader) IsDir() bool { + panic("unimplemented") +} + +// Name implements vfs.File. +func (c *closeableByteReader) Name() string { + panic("unimplemented") +} + +// Read implements vfs.File. +func (c *closeableByteReader) Read(ctx context.Context, p []byte) (n int, err error) { + return c.data.Read(p) +} + +// Seek implements vfs.File. +func (c *closeableByteReader) Seek(offset int64, whence int) (int64, error) { + return c.data.Seek(offset, whence) +} + +// Size implements vfs.File. +func (c *closeableByteReader) Size() int64 { + return c.data.Size() +} + +// Type implements vfs.File. +func (c *closeableByteReader) Type() fs.FileMode { + panic("unimplemented") +} diff --git a/src/vfs/resolver.go b/src/vfs/resolver.go index 9d4850e..7347215 100644 --- a/src/vfs/resolver.go +++ b/src/vfs/resolver.go @@ -14,6 +14,7 @@ import ( "time" "git.kmsign.ru/royalcat/tstor/pkg/rlog" + "github.com/sourcegraph/conc/iter" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" @@ -111,8 +112,8 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e if err != nil { return nil, err } - out := make([]fs.DirEntry, 0, len(entries)) - for _, e := range entries { + out, err := iter.MapErr(entries, func(pe *fs.DirEntry) (fs.DirEntry, error) { + e := *pe if r.resolver.IsNestedFs(e.Name()) { filepath := path.Join("/", name, e.Name()) file, err := r.rootFS.Open(ctx, filepath) @@ -125,16 +126,22 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e } if err != nil { log.Error(ctx, "error creating nested fs", rlog.Error(err)) - out = append(out, e) - continue + return nil, fmt.Errorf("error creating nested fs: %w", err) } - - out = append(out, nestedfs) + return nestedfs, nil } else { - out = append(out, e) + return e, nil } + }) + + if err != nil { + log.Error(ctx, "error mapping entries", rlog.Error(err)) + err = nil } - return out, nil + + out = slices.DeleteFunc(out, func(e fs.DirEntry) bool { return e == nil }) + + return out, err } // Stat implements Filesystem. @@ -228,14 +235,14 @@ type FsFactory func(ctx context.Context, sourcePath string, f File) (Filesystem, func NewResolver(factories map[string]FsFactory) *Resolver { return &Resolver{ factories: factories, - fsmap: map[Hash]Filesystem{}, + fsmap: map[string]Filesystem{}, } } type Resolver struct { m sync.Mutex factories map[string]FsFactory - fsmap map[Hash]Filesystem // filesystem cache + fsmap map[string]Filesystem // filesystem cache // TODO: add fsmap clean } @@ -255,15 +262,10 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File return nil, file.Close(ctx) } - fileHash, err := FileHash(ctx, file) - if err != nil { - return nil, fmt.Errorf("error calculating file hash: %w", err) - } - r.m.Lock() defer r.m.Unlock() - if nestedFs, ok := r.fsmap[fileHash]; ok { + if nestedFs, ok := r.fsmap[fsPath]; ok { return nestedFs, file.Close(ctx) } @@ -276,7 +278,7 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File if err != nil { return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err) } - r.fsmap[fileHash] = nestedFs + r.fsmap[fsPath] = nestedFs return nestedFs, nil @@ -319,10 +321,10 @@ PARTS_LOOP: if err != nil { return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) } - fileHash, err := FileHash(ctx, file) - if err != nil { - return "", nil, "", fmt.Errorf("error calculating file hash: %w", err) - } + // fileHash, err := FileHash(ctx, file) + // if err != nil { + // return "", nil, "", fmt.Errorf("error calculating file hash: %w", err) + // } err = file.Close(ctx) if err != nil { return "", nil, "", fmt.Errorf("error closing file: %w", err) @@ -335,7 +337,7 @@ PARTS_LOOP: r.m.Lock() defer r.m.Unlock() - if nestedFs, ok := r.fsmap[fileHash]; ok { + if nestedFs, ok := r.fsmap[fsPath]; ok { span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name()))) return fsPath, nestedFs, nestedFsPath, nil } else { @@ -352,7 +354,7 @@ PARTS_LOOP: if err != nil { return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) } - r.fsmap[fileHash] = nestedFs + r.fsmap[fsPath] = nestedFs span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))