fs refactor
This commit is contained in:
parent
3dcf27d900
commit
0fa3a91447
13 changed files with 80 additions and 71 deletions
src
export/nfs
sources
vfs
|
@ -58,22 +58,6 @@ func (*fsWrapper) MkdirAll(ctx context.Context, filename string, perm fs.FileMod
|
|||
return billy.ErrNotSupported
|
||||
}
|
||||
|
||||
// Open implements billy.Filesystem.
|
||||
func (fs *fsWrapper) Open(ctx context.Context, filename string) (nfs.File, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
|
||||
defer cancel()
|
||||
|
||||
file, err := fs.fs.Open(ctx, filename)
|
||||
if err != nil {
|
||||
return nil, billyErr(ctx, err, fs.log)
|
||||
}
|
||||
return &billyFile{
|
||||
name: filename,
|
||||
file: file,
|
||||
log: fs.log.With("filename", filename),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// OpenFile implements billy.Filesystem.
|
||||
func (fs *fsWrapper) OpenFile(ctx context.Context, filename string, flag int, perm fs.FileMode) (nfs.File, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
|
||||
|
|
|
@ -130,13 +130,11 @@ func (s *Daemon) Close(ctx context.Context) error {
|
|||
)...)
|
||||
}
|
||||
|
||||
func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
|
||||
func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
|
||||
ctx, span := tracer.Start(ctx, "LoadTorrent")
|
||||
defer span.End()
|
||||
log := s.log
|
||||
|
||||
defer f.Close(ctx)
|
||||
|
||||
stat, err := f.Info()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("call stat failed: %w", err)
|
||||
|
@ -274,7 +272,7 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
|
|||
|
||||
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
|
||||
|
||||
_, err = s.LoadTorrent(ctx, vfile)
|
||||
_, err = s.loadTorrent(ctx, vfile)
|
||||
if err != nil {
|
||||
log.Error(ctx, "failed adding torrent", rlog.Error(err))
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"path"
|
||||
"slices"
|
||||
"strings"
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/anacrolix/torrent"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -38,11 +40,15 @@ const shortTimeout = time.Millisecond
|
|||
const lowTimeout = time.Second * 5
|
||||
|
||||
func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
|
||||
c, err := s.LoadTorrent(ctx, f)
|
||||
c, err := s.loadTorrent(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := f.Close(ctx); err != nil {
|
||||
s.log.Error(ctx, "failed to close file", slog.String("name", f.Name()), rlog.Error(err))
|
||||
}
|
||||
|
||||
return &TorrentFS{
|
||||
name: f.Name(),
|
||||
Torrent: c,
|
||||
|
|
|
@ -3,6 +3,7 @@ package ytdlp
|
|||
import (
|
||||
"context"
|
||||
"io/fs"
|
||||
"os"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
|
@ -35,7 +36,7 @@ func (s *SourceFS) Open(ctx context.Context, filename string) (vfs.File, error)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
f, err := s.fs.Open(ctx, filename)
|
||||
f, err := s.fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
||||
"github.com/bodgit/sevenzip"
|
||||
"github.com/mattetti/filebuffer"
|
||||
"github.com/nwaples/rardecode/v2"
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
@ -174,7 +175,7 @@ func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archi
|
|||
size: size,
|
||||
af: af,
|
||||
|
||||
buffer: ioutils.NewFileBuffer(nil),
|
||||
buffer: filebuffer.New(nil),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -189,7 +190,7 @@ type archiveFile struct {
|
|||
|
||||
offset int64
|
||||
readen int64
|
||||
buffer *ioutils.FileBuffer
|
||||
buffer *filebuffer.Buffer
|
||||
}
|
||||
|
||||
// Name implements File.
|
||||
|
@ -214,14 +215,7 @@ func (d *archiveFile) IsDir() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (d *archiveFile) Close(ctx context.Context) error {
|
||||
return d.buffer.Close(ctx)
|
||||
}
|
||||
|
||||
func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
|
||||
d.m.Lock()
|
||||
defer d.m.Unlock()
|
||||
|
||||
if to < d.readen {
|
||||
return nil
|
||||
}
|
||||
|
@ -230,55 +224,68 @@ func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to get file reader: %w", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
defer reader.Close(ctx)
|
||||
|
||||
_, err = d.buffer.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to seek to start of the file: %w", err)
|
||||
return err
|
||||
}
|
||||
d.readen, err = ctxio.CopyN(ctx, d.buffer, ctxio.WrapIoReader(reader), to+readahead)
|
||||
d.readen, err = ctxio.CopyN(ctx, ctxio.WrapIoWriter(d.buffer), reader, to+readahead)
|
||||
if err != nil && err != io.EOF {
|
||||
return fmt.Errorf("error copying from archive file reader: %w", err)
|
||||
}
|
||||
_, err = d.buffer.Seek(d.offset, io.SeekStart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to seek to start of the file: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
d.m.Lock()
|
||||
defer d.m.Unlock()
|
||||
|
||||
err = d.loadMore(ctx, d.offset+int64(len(p)))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
||||
}
|
||||
n, err = d.buffer.Read(ctx, p)
|
||||
n, err = d.buffer.Read(p)
|
||||
if err != nil && err != io.EOF {
|
||||
return n, fmt.Errorf("failed to read from buffer: %w", err)
|
||||
}
|
||||
d.offset += int64(n)
|
||||
return n, nil
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
d.m.Lock()
|
||||
defer d.m.Unlock()
|
||||
|
||||
err = d.loadMore(ctx, off+int64(len(p)))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
||||
}
|
||||
n, err = d.buffer.ReadAt(ctx, p, off)
|
||||
n, err = d.buffer.ReadAt(p, off)
|
||||
if err != nil && err != io.EOF {
|
||||
return n, fmt.Errorf("failed to read from buffer: %w", err)
|
||||
}
|
||||
return n, nil
|
||||
return n, err
|
||||
}
|
||||
|
||||
type archiveFileReaderFactory func(ctx context.Context) (io.ReadCloser, error)
|
||||
func (d *archiveFile) Close(ctx context.Context) error {
|
||||
d.m.Lock()
|
||||
defer d.m.Unlock()
|
||||
|
||||
return d.buffer.Close()
|
||||
}
|
||||
|
||||
type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error)
|
||||
|
||||
var _ archiveLoader = ZipLoader
|
||||
|
||||
func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||
|
||||
zr, err := zip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -292,7 +299,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
|||
}
|
||||
|
||||
i := i
|
||||
af := func(ctx context.Context) (io.ReadCloser, error) {
|
||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||
|
||||
zr, err := zip.NewReader(reader, size)
|
||||
|
@ -305,7 +312,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
|||
return nil, fmt.Errorf("failed to open file in zip archive: %w", err)
|
||||
}
|
||||
|
||||
return rc, nil
|
||||
return ctxio.WrapIoReadCloser(rc), nil
|
||||
}
|
||||
|
||||
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af)
|
||||
|
@ -317,8 +324,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
|||
var _ archiveLoader = SevenZipLoader
|
||||
|
||||
func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
||||
reader := ctxio.IoReaderAt(context.Background(), ctxreader)
|
||||
|
||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||
r, err := sevenzip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -332,7 +338,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
|
|||
}
|
||||
|
||||
i := i
|
||||
af := func(ctx context.Context) (io.ReadCloser, error) {
|
||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||
zr, err := sevenzip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
|
@ -344,7 +350,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return rc, nil
|
||||
return ctxio.WrapIoReadCloser(rc), nil
|
||||
}
|
||||
|
||||
out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af)
|
||||
|
@ -374,7 +380,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
|||
}
|
||||
|
||||
name := header.Name
|
||||
af := func(ctx context.Context) (io.ReadCloser, error) {
|
||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||
r, err := rardecode.NewReader(reader)
|
||||
if err != nil {
|
||||
|
@ -386,7 +392,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
|||
return nil, err
|
||||
}
|
||||
if header.Name == name {
|
||||
return io.NopCloser(r), nil
|
||||
return ctxio.NopCloser(ctxio.WrapIoReader(r)), nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("file with name '%s' not found", name)
|
||||
|
|
|
@ -3,6 +3,7 @@ package vfs
|
|||
import (
|
||||
"context"
|
||||
"io/fs"
|
||||
"os"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
||||
)
|
||||
|
@ -42,7 +43,7 @@ func (c *CtxBillyFs) Open(ctx context.Context, filename string) (File, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bf, err := c.fs.Open(ctx, filename)
|
||||
bf, err := c.fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"reflect"
|
||||
|
@ -35,7 +36,7 @@ type LogFS struct {
|
|||
}
|
||||
|
||||
func isLoggableError(err error) bool {
|
||||
return err != nil && !errors.Is(err, fs.ErrNotExist)
|
||||
return err != nil && !errors.Is(err, fs.ErrNotExist) && !errors.Is(err, io.EOF)
|
||||
}
|
||||
|
||||
var _ Filesystem = (*LogFS)(nil)
|
||||
|
@ -307,7 +308,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e
|
|||
|
||||
n, err = f.f.ReadAt(ctx, p, off)
|
||||
if isLoggableError(err) {
|
||||
f.log.Error(ctx, "Failed to read")
|
||||
f.log.Error(ctx, "Failed to read", rlog.Error(err))
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
|
|
@ -118,7 +118,7 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close(ctx)
|
||||
// it is factory responsibility to close file then needed
|
||||
|
||||
err = func() error {
|
||||
factoryCtx, cancel := subTimeout(ctx)
|
||||
|
@ -208,6 +208,7 @@ func (r *ResolverFS) Type() fs.FileMode {
|
|||
|
||||
var _ Filesystem = &ResolverFS{}
|
||||
|
||||
// It factory responsobility to close file
|
||||
type FsFactory func(ctx context.Context, f File) (Filesystem, error)
|
||||
|
||||
func NewResolver(factories map[string]FsFactory) *Resolver {
|
||||
|
@ -306,7 +307,8 @@ PARTS_LOOP:
|
|||
if err != nil {
|
||||
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
|
||||
}
|
||||
defer fsFile.Close(ctx)
|
||||
// it is factory responsibility to close file then needed
|
||||
|
||||
nestedFs, err := nestFactory(ctx, fsFile)
|
||||
if err != nil {
|
||||
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue