multithreader read dir

This commit is contained in:
royalcat 2024-12-09 23:44:01 +03:00
parent 95016d54c1
commit b77ce50a7b
4 changed files with 106 additions and 46 deletions

View file

@ -41,7 +41,7 @@ var ArchiveFactories = map[string]FsFactory{
},
}
type archiveLoader func(ctx context.Context, archivePath string, r ctxio.ReaderAt, size int64) (map[string]fileEntry, error)
type archiveLoader func(ctx context.Context, archivePath string, r File, size int64) (map[string]fileEntry, error)
var _ Filesystem = &ArchiveFS{}
@ -88,8 +88,8 @@ func (a *ArchiveFS) FsName() string {
return "archivefs"
}
func NewArchive(ctx context.Context, archivePath, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) {
archiveFiles, err := loader(ctx, archivePath, r, size)
func NewArchive(ctx context.Context, archivePath, name string, f File, size int64, loader archiveLoader) (*ArchiveFS, error) {
archiveFiles, err := loader(ctx, archivePath, f, size)
if err != nil {
return nil, err
}
@ -281,7 +281,12 @@ type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error
var _ archiveLoader = ZipLoader
func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
func ZipLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) {
hash, err := FileHash(ctx, f)
if err != nil {
return nil, err
}
reader := ctxio.IoReaderAt(ctx, f)
zr, err := zip.NewReader(reader, size)
if err != nil {
@ -314,7 +319,7 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i
info := zipFile.FileInfo()
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: zipFile.Name}, info.Size(), af)
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af)
out[AbsPath(zipFile.Name)] = fileEntry{
FileInfo: info,
@ -329,7 +334,12 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i
var _ archiveLoader = SevenZipLoader
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader File, size int64) (map[string]fileEntry, error) {
hash, err := FileHash(ctx, ctxreader)
if err != nil {
return nil, err
}
reader := ctxio.IoReaderAt(ctx, ctxreader)
r, err := sevenzip.NewReader(reader, size)
if err != nil {
@ -361,7 +371,7 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea
info := f.FileInfo()
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: f.Name}, info.Size(), af)
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af)
out[AbsPath(f.Name)] = fileEntry{
FileInfo: f.FileInfo(),
@ -376,8 +386,13 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea
var _ archiveLoader = RarLoader
func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
func RarLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) {
hash, err := FileHash(ctx, f)
if err != nil {
return nil, err
}
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
r, err := rardecode.NewReader(reader)
if err != nil {
@ -396,7 +411,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt
name := header.Name
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
r, err := rardecode.NewReader(reader)
if err != nil {
return nil, err
@ -413,7 +428,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt
return nil, fmt.Errorf("file with name '%s' not found", name)
}
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: header.Name}, header.UnPackedSize, af)
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af)
out[AbsPath(header.Name)] = fileEntry{
FileInfo: NewFileInfo(header.Name, header.UnPackedSize),

View file

@ -18,7 +18,7 @@ const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage
const defaultBlockCount = cacheSize / blockSize
type archiveFileIndex struct {
archive string
archiveHash Hash
filename string
}
@ -107,7 +107,7 @@ func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (
a.readerMutex.Lock()
defer a.readerMutex.Unlock()
if b, ok := blockCache.Get(bI); ok { // check again, maybe another goroutine already read this block
if b, ok := blockCache.Get(bI); ok && b.len != 0 { // check again, maybe another goroutine already read this block
return b, nil
}

View file

@ -4,10 +4,11 @@ import (
"archive/zip"
"bytes"
"context"
"io"
"io/fs"
"testing"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/royalcat/ctxio"
"github.com/stretchr/testify/require"
)
@ -62,24 +63,24 @@ func TestZipFilesystem(t *testing.T) {
f, err := zfs.Open(ctx, "/path/to/test/file/1.txt")
require.NoError(err)
n, err := f.Read(ctx, out)
require.NoError(err)
require.ErrorIs(err, io.EOF)
require.Equal(5, n)
require.Equal([]byte("Hello"), out)
outSpace := make([]byte, 1)
n, err = f.Read(ctx, outSpace)
require.NoError(err)
require.ErrorIs(err, io.EOF)
require.Equal(1, n)
require.Equal([]byte(" "), outSpace)
n, err = f.Read(ctx, out)
require.NoError(err)
require.ErrorIs(err, io.EOF)
require.Equal(5, n)
require.Equal([]byte("World"), out)
}
func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) {
func createTestZip(require *require.Assertions) (vfs.File, int64) {
buf := bytes.NewBuffer([]byte{})
zWriter := zip.NewWriter(buf)
@ -95,17 +96,59 @@ func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) {
return newCBR(buf.Bytes()), int64(buf.Len())
}
type closeableByteReader struct {
data *bytes.Reader
}
func newCBR(b []byte) *closeableByteReader {
return &closeableByteReader{
data: bytes.NewReader(b),
}
}
var _ vfs.File = &closeableByteReader{}
type closeableByteReader struct {
data *bytes.Reader
}
// ReadAt implements ctxio.ReaderAt.
func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return c.data.ReadAt(p, off)
}
// Close implements vfs.File.
func (c *closeableByteReader) Close(ctx context.Context) error {
panic("unimplemented")
}
// Info implements vfs.File.
func (c *closeableByteReader) Info() (fs.FileInfo, error) {
panic("unimplemented")
}
// IsDir implements vfs.File.
func (c *closeableByteReader) IsDir() bool {
panic("unimplemented")
}
// Name implements vfs.File.
func (c *closeableByteReader) Name() string {
panic("unimplemented")
}
// Read implements vfs.File.
func (c *closeableByteReader) Read(ctx context.Context, p []byte) (n int, err error) {
return c.data.Read(p)
}
// Seek implements vfs.File.
func (c *closeableByteReader) Seek(offset int64, whence int) (int64, error) {
return c.data.Seek(offset, whence)
}
// Size implements vfs.File.
func (c *closeableByteReader) Size() int64 {
return c.data.Size()
}
// Type implements vfs.File.
func (c *closeableByteReader) Type() fs.FileMode {
panic("unimplemented")
}

View file

@ -14,6 +14,7 @@ import (
"time"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/sourcegraph/conc/iter"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
@ -111,8 +112,8 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e
if err != nil {
return nil, err
}
out := make([]fs.DirEntry, 0, len(entries))
for _, e := range entries {
out, err := iter.MapErr(entries, func(pe *fs.DirEntry) (fs.DirEntry, error) {
e := *pe
if r.resolver.IsNestedFs(e.Name()) {
filepath := path.Join("/", name, e.Name())
file, err := r.rootFS.Open(ctx, filepath)
@ -125,16 +126,22 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e
}
if err != nil {
log.Error(ctx, "error creating nested fs", rlog.Error(err))
out = append(out, e)
continue
return nil, fmt.Errorf("error creating nested fs: %w", err)
}
return nestedfs, nil
} else {
return e, nil
}
})
if err != nil {
log.Error(ctx, "error mapping entries", rlog.Error(err))
err = nil
}
out = append(out, nestedfs)
} else {
out = append(out, e)
}
}
return out, nil
out = slices.DeleteFunc(out, func(e fs.DirEntry) bool { return e == nil })
return out, err
}
// Stat implements Filesystem.
@ -228,14 +235,14 @@ type FsFactory func(ctx context.Context, sourcePath string, f File) (Filesystem,
func NewResolver(factories map[string]FsFactory) *Resolver {
return &Resolver{
factories: factories,
fsmap: map[Hash]Filesystem{},
fsmap: map[string]Filesystem{},
}
}
type Resolver struct {
m sync.Mutex
factories map[string]FsFactory
fsmap map[Hash]Filesystem // filesystem cache
fsmap map[string]Filesystem // filesystem cache
// TODO: add fsmap clean
}
@ -255,15 +262,10 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
return nil, file.Close(ctx)
}
fileHash, err := FileHash(ctx, file)
if err != nil {
return nil, fmt.Errorf("error calculating file hash: %w", err)
}
r.m.Lock()
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fileHash]; ok {
if nestedFs, ok := r.fsmap[fsPath]; ok {
return nestedFs, file.Close(ctx)
}
@ -276,7 +278,7 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
if err != nil {
return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err)
}
r.fsmap[fileHash] = nestedFs
r.fsmap[fsPath] = nestedFs
return nestedFs, nil
@ -319,10 +321,10 @@ PARTS_LOOP:
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
fileHash, err := FileHash(ctx, file)
if err != nil {
return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
}
// fileHash, err := FileHash(ctx, file)
// if err != nil {
// return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
// }
err = file.Close(ctx)
if err != nil {
return "", nil, "", fmt.Errorf("error closing file: %w", err)
@ -335,7 +337,7 @@ PARTS_LOOP:
r.m.Lock()
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fileHash]; ok {
if nestedFs, ok := r.fsmap[fsPath]; ok {
span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
return fsPath, nestedFs, nestedFsPath, nil
} else {
@ -352,7 +354,7 @@ PARTS_LOOP:
if err != nil {
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}
r.fsmap[fileHash] = nestedFs
r.fsmap[fsPath] = nestedFs
span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))