From 0fa3a914470b0aeb3d6000e466243126682d0733 Mon Sep 17 00:00:00 2001 From: royalcat Date: Wed, 26 Jun 2024 00:39:30 +0300 Subject: [PATCH] fs refactor --- go.mod | 1 + go.sum | 10 ++---- pkg/ctxbilly/fs.go | 4 --- pkg/ctxbilly/uring.go | 4 --- pkg/ioutils/filebuffer.go | 25 ++++++++++++++- src/export/nfs/wrapper.go | 16 ---------- src/sources/torrent/daemon.go | 6 ++-- src/sources/torrent/fs.go | 8 ++++- src/sources/ytdlp/fs.go | 3 +- src/vfs/archive.go | 60 +++++++++++++++++++---------------- src/vfs/ctxbillyfs.go | 3 +- src/vfs/log.go | 5 +-- src/vfs/resolver.go | 6 ++-- 13 files changed, 80 insertions(+), 71 deletions(-) diff --git a/go.mod b/go.mod index da53300..d70e962 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/knadh/koanf/v2 v2.1.1 github.com/labstack/echo-contrib v0.17.1 github.com/labstack/echo/v4 v4.12.0 + github.com/mattetti/filebuffer v1.0.1 github.com/nwaples/rardecode/v2 v2.0.0-beta.2 github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 github.com/ravilushqa/otelgqlgen v0.15.0 diff --git a/go.sum b/go.sum index ed38701..32af796 100644 --- a/go.sum +++ b/go.sum @@ -398,6 +398,8 @@ github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0 github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattetti/filebuffer v1.0.1 h1:gG7pyfnSIZCxdoKq+cPa8T0hhYtD9NxCdI4D7PTjRLM= +github.com/mattetti/filebuffer v1.0.1/go.mod h1:YdMURNDOttIiruleeVr6f56OrMc+MydEnTcXwtkxNVs= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -543,16 +545,8 @@ github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkL github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA= -github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6 h1:rGXhPFpOVLeOO/Da2qxBNZY5yaQdTCGxQV2dUDXXf7U= -github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= -github.com/royalcat/kv v0.0.0-20240617074755-245a773511b7 h1:ZtxuWjMwbFwaj5zcT0VZqvsyViYYzYJkprFe/9p5PUs= -github.com/royalcat/kv v0.0.0-20240617074755-245a773511b7/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f h1:bG8Pp/YXkpC2eFI7psTiTAL7QTBqHQcP/lEpiqmBXn4= github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= -github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950 h1:rKG2P4TNLgA4/Jl7LPayifjcw4txVGVSPkpHVhn3wnw= -github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= -github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6 h1:/TWa41uAL8Vk0MkvZc03EjA1/bS2otK5q0/+6bSWKJI= -github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f h1:wz3pvg7YJdibZXQRV6B5pVPeDK8bgnuJVnBf7OFtCWI= github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8= diff --git a/pkg/ctxbilly/fs.go b/pkg/ctxbilly/fs.go index 7fa3b73..bd715af 100644 --- a/pkg/ctxbilly/fs.go +++ b/pkg/ctxbilly/fs.go @@ -13,10 +13,6 @@ type Filesystem interface { // it if it already exists. If successful, methods on the returned File can // be used for I/O; the associated file descriptor has mode O_RDWR. Create(ctx context.Context, filename string) (File, error) - // Open opens the named file for reading. If successful, methods on the - // returned file can be used for reading; the associated file descriptor has - // mode O_RDONLY. - Open(ctx context.Context, filename string) (File, error) // OpenFile is the generalized open call; most users will use Open or Create // instead. It opens the named file with specified flag (O_RDONLY etc.) and // perm, (0666 etc.) if applicable. If successful, methods on the returned diff --git a/pkg/ctxbilly/uring.go b/pkg/ctxbilly/uring.go index 01b9d50..3d54ae6 100644 --- a/pkg/ctxbilly/uring.go +++ b/pkg/ctxbilly/uring.go @@ -117,10 +117,6 @@ func (fs *UringFS) MkdirAll(ctx context.Context, path string, perm os.FileMode) return os.MkdirAll(dir, perm) } -func (fs *UringFS) Open(ctx context.Context, filename string) (File, error) { - return fs.OpenFile(ctx, filename, os.O_RDONLY, 0) -} - func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) { filename, err := fs.abs(filename) if err != nil { diff --git a/pkg/ioutils/filebuffer.go b/pkg/ioutils/filebuffer.go index 0866d88..db455d2 100644 --- a/pkg/ioutils/filebuffer.go +++ b/pkg/ioutils/filebuffer.go @@ -6,6 +6,7 @@ import ( "errors" "io" "os" + "sync" "github.com/royalcat/ctxio" ) @@ -19,6 +20,7 @@ type FileBuffer struct { // index indicates where in the buffer we are at index int64 isClosed bool + mu sync.RWMutex } var _ FileReader = (*FileBuffer)(nil) @@ -53,14 +55,20 @@ func NewFileBufferFromIoReader(reader io.Reader) (*FileBuffer, error) { // Bytes returns the bytes available until the end of the buffer. func (f *FileBuffer) Bytes() []byte { + f.mu.RLock() + defer f.mu.RUnlock() + if f.isClosed || f.index >= int64(f.buff.Len()) { return []byte{} } - return f.buff.Bytes()[f.index:] + return bytes.Clone(f.buff.Bytes()[f.index:]) } // String implements the Stringer interface func (f *FileBuffer) String() string { + f.mu.RLock() + defer f.mu.RUnlock() + return string(f.buff.Bytes()[f.index:]) } @@ -76,6 +84,9 @@ func (f *FileBuffer) String() string { // that a Reader returning a non-zero number of bytes at the end of the input stream may return // either err == EOF or err == nil. The next Read should return 0, EOF. func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) { + f.mu.RLock() + defer f.mu.RUnlock() + if f.isClosed { return 0, os.ErrClosed } @@ -109,6 +120,9 @@ func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) { // ReadAt should not affect nor be affected by the underlying seek offset. // Clients of ReadAt can execute parallel ReadAt calls on the same input source. func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + f.mu.RLock() + defer f.mu.RUnlock() + if f.isClosed { return 0, os.ErrClosed } @@ -131,6 +145,9 @@ func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, er // Write implements io.Writer https://golang.org/pkg/io/#Writer // by appending the passed bytes to the buffer unless the buffer is closed or index negative. func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) { + f.mu.Lock() + defer f.mu.Unlock() + if f.isClosed { return 0, os.ErrClosed } @@ -151,6 +168,9 @@ func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) { // Seek implements io.Seeker https://golang.org/pkg/io/#Seeker func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) { + f.mu.Lock() + defer f.mu.Unlock() + if f.isClosed { return 0, os.ErrClosed } @@ -176,6 +196,9 @@ func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) { // Close implements io.Closer https://golang.org/pkg/io/#Closer // It closes the buffer, rendering it unusable for I/O. It returns an error, if any. func (f *FileBuffer) Close(ctx context.Context) error { + f.mu.Lock() + defer f.mu.Unlock() + f.isClosed = true f.buff = nil return nil diff --git a/src/export/nfs/wrapper.go b/src/export/nfs/wrapper.go index eaf2784..cf19f5d 100644 --- a/src/export/nfs/wrapper.go +++ b/src/export/nfs/wrapper.go @@ -58,22 +58,6 @@ func (*fsWrapper) MkdirAll(ctx context.Context, filename string, perm fs.FileMod return billy.ErrNotSupported } -// Open implements billy.Filesystem. -func (fs *fsWrapper) Open(ctx context.Context, filename string) (nfs.File, error) { - ctx, cancel := context.WithTimeout(ctx, fs.timeout) - defer cancel() - - file, err := fs.fs.Open(ctx, filename) - if err != nil { - return nil, billyErr(ctx, err, fs.log) - } - return &billyFile{ - name: filename, - file: file, - log: fs.log.With("filename", filename), - }, nil -} - // OpenFile implements billy.Filesystem. func (fs *fsWrapper) OpenFile(ctx context.Context, filename string, flag int, perm fs.FileMode) (nfs.File, error) { ctx, cancel := context.WithTimeout(ctx, fs.timeout) diff --git a/src/sources/torrent/daemon.go b/src/sources/torrent/daemon.go index 756fb98..d7eeb89 100644 --- a/src/sources/torrent/daemon.go +++ b/src/sources/torrent/daemon.go @@ -130,13 +130,11 @@ func (s *Daemon) Close(ctx context.Context) error { )...) } -func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, error) { +func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) { ctx, span := tracer.Start(ctx, "LoadTorrent") defer span.End() log := s.log - defer f.Close(ctx) - stat, err := f.Info() if err != nil { return nil, fmt.Errorf("call stat failed: %w", err) @@ -274,7 +272,7 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error { vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file)) - _, err = s.LoadTorrent(ctx, vfile) + _, err = s.loadTorrent(ctx, vfile) if err != nil { log.Error(ctx, "failed adding torrent", rlog.Error(err)) } diff --git a/src/sources/torrent/fs.go b/src/sources/torrent/fs.go index fe77d1a..e77cf8d 100644 --- a/src/sources/torrent/fs.go +++ b/src/sources/torrent/fs.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "io/fs" + "log/slog" "path" "slices" "strings" @@ -12,6 +13,7 @@ import ( "sync/atomic" "time" + "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/anacrolix/torrent" "go.opentelemetry.io/otel/attribute" @@ -38,11 +40,15 @@ const shortTimeout = time.Millisecond const lowTimeout = time.Second * 5 func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) { - c, err := s.LoadTorrent(ctx, f) + c, err := s.loadTorrent(ctx, f) if err != nil { return nil, err } + if err := f.Close(ctx); err != nil { + s.log.Error(ctx, "failed to close file", slog.String("name", f.Name()), rlog.Error(err)) + } + return &TorrentFS{ name: f.Name(), Torrent: c, diff --git a/src/sources/ytdlp/fs.go b/src/sources/ytdlp/fs.go index 89efeb2..49bd929 100644 --- a/src/sources/ytdlp/fs.go +++ b/src/sources/ytdlp/fs.go @@ -3,6 +3,7 @@ package ytdlp import ( "context" "io/fs" + "os" "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" "git.kmsign.ru/royalcat/tstor/src/vfs" @@ -35,7 +36,7 @@ func (s *SourceFS) Open(ctx context.Context, filename string) (vfs.File, error) return nil, err } - f, err := s.fs.Open(ctx, filename) + f, err := s.fs.OpenFile(ctx, filename, os.O_RDONLY, 0) if err != nil { return nil, err } diff --git a/src/vfs/archive.go b/src/vfs/archive.go index 4dfd8f5..0375ef6 100644 --- a/src/vfs/archive.go +++ b/src/vfs/archive.go @@ -13,6 +13,7 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/ioutils" "github.com/bodgit/sevenzip" + "github.com/mattetti/filebuffer" "github.com/nwaples/rardecode/v2" "github.com/royalcat/ctxio" ) @@ -174,7 +175,7 @@ func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archi size: size, af: af, - buffer: ioutils.NewFileBuffer(nil), + buffer: filebuffer.New(nil), } } @@ -189,7 +190,7 @@ type archiveFile struct { offset int64 readen int64 - buffer *ioutils.FileBuffer + buffer *filebuffer.Buffer } // Name implements File. @@ -214,14 +215,7 @@ func (d *archiveFile) IsDir() bool { return false } -func (d *archiveFile) Close(ctx context.Context) error { - return d.buffer.Close(ctx) -} - func (d *archiveFile) loadMore(ctx context.Context, to int64) error { - d.m.Lock() - defer d.m.Unlock() - if to < d.readen { return nil } @@ -230,55 +224,68 @@ func (d *archiveFile) loadMore(ctx context.Context, to int64) error { if err != nil { return fmt.Errorf("failed to get file reader: %w", err) } - defer reader.Close() + defer reader.Close(ctx) + _, err = d.buffer.Seek(0, io.SeekStart) if err != nil { - return fmt.Errorf("failed to seek to start of the file: %w", err) + return err } - d.readen, err = ctxio.CopyN(ctx, d.buffer, ctxio.WrapIoReader(reader), to+readahead) + d.readen, err = ctxio.CopyN(ctx, ctxio.WrapIoWriter(d.buffer), reader, to+readahead) if err != nil && err != io.EOF { return fmt.Errorf("error copying from archive file reader: %w", err) } _, err = d.buffer.Seek(d.offset, io.SeekStart) if err != nil { - return fmt.Errorf("failed to seek to start of the file: %w", err) + return err } return nil } func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) { + d.m.Lock() + defer d.m.Unlock() + err = d.loadMore(ctx, d.offset+int64(len(p))) if err != nil { return 0, fmt.Errorf("failed to load more from archive file: %w", err) } - n, err = d.buffer.Read(ctx, p) + n, err = d.buffer.Read(p) if err != nil && err != io.EOF { return n, fmt.Errorf("failed to read from buffer: %w", err) } d.offset += int64(n) - return n, nil + return n, err } func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + d.m.Lock() + defer d.m.Unlock() + err = d.loadMore(ctx, off+int64(len(p))) if err != nil { return 0, fmt.Errorf("failed to load more from archive file: %w", err) } - n, err = d.buffer.ReadAt(ctx, p, off) + n, err = d.buffer.ReadAt(p, off) if err != nil && err != io.EOF { return n, fmt.Errorf("failed to read from buffer: %w", err) } - return n, nil + return n, err } -type archiveFileReaderFactory func(ctx context.Context) (io.ReadCloser, error) +func (d *archiveFile) Close(ctx context.Context) error { + d.m.Lock() + defer d.m.Unlock() + + return d.buffer.Close() +} + +type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error) var _ archiveLoader = ZipLoader func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { reader := ctxio.IoReaderAt(ctx, ctxreader) - zr, err := zip.NewReader(reader, size) if err != nil { return nil, err @@ -292,7 +299,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s } i := i - af := func(ctx context.Context) (io.ReadCloser, error) { + af := func(ctx context.Context) (ctxio.ReadCloser, error) { reader := ctxio.IoReaderAt(ctx, ctxreader) zr, err := zip.NewReader(reader, size) @@ -305,7 +312,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s return nil, fmt.Errorf("failed to open file in zip archive: %w", err) } - return rc, nil + return ctxio.WrapIoReadCloser(rc), nil } out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af) @@ -317,8 +324,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s var _ archiveLoader = SevenZipLoader func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { - reader := ctxio.IoReaderAt(context.Background(), ctxreader) - + reader := ctxio.IoReaderAt(ctx, ctxreader) r, err := sevenzip.NewReader(reader, size) if err != nil { return nil, err @@ -332,7 +338,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) ( } i := i - af := func(ctx context.Context) (io.ReadCloser, error) { + af := func(ctx context.Context) (ctxio.ReadCloser, error) { reader := ctxio.IoReaderAt(ctx, ctxreader) zr, err := sevenzip.NewReader(reader, size) if err != nil { @@ -344,7 +350,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) ( return nil, err } - return rc, nil + return ctxio.WrapIoReadCloser(rc), nil } out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af) @@ -374,7 +380,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s } name := header.Name - af := func(ctx context.Context) (io.ReadCloser, error) { + af := func(ctx context.Context) (ctxio.ReadCloser, error) { reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) r, err := rardecode.NewReader(reader) if err != nil { @@ -386,7 +392,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s return nil, err } if header.Name == name { - return io.NopCloser(r), nil + return ctxio.NopCloser(ctxio.WrapIoReader(r)), nil } } return nil, fmt.Errorf("file with name '%s' not found", name) diff --git a/src/vfs/ctxbillyfs.go b/src/vfs/ctxbillyfs.go index 10df733..6613eac 100644 --- a/src/vfs/ctxbillyfs.go +++ b/src/vfs/ctxbillyfs.go @@ -3,6 +3,7 @@ package vfs import ( "context" "io/fs" + "os" "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" ) @@ -42,7 +43,7 @@ func (c *CtxBillyFs) Open(ctx context.Context, filename string) (File, error) { if err != nil { return nil, err } - bf, err := c.fs.Open(ctx, filename) + bf, err := c.fs.OpenFile(ctx, filename, os.O_RDONLY, 0) if err != nil { return nil, err } diff --git a/src/vfs/log.go b/src/vfs/log.go index ece02aa..011c49b 100644 --- a/src/vfs/log.go +++ b/src/vfs/log.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "io/fs" "log/slog" "reflect" @@ -35,7 +36,7 @@ type LogFS struct { } func isLoggableError(err error) bool { - return err != nil && !errors.Is(err, fs.ErrNotExist) + return err != nil && !errors.Is(err, fs.ErrNotExist) && !errors.Is(err, io.EOF) } var _ Filesystem = (*LogFS)(nil) @@ -307,7 +308,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e n, err = f.f.ReadAt(ctx, p, off) if isLoggableError(err) { - f.log.Error(ctx, "Failed to read") + f.log.Error(ctx, "Failed to read", rlog.Error(err)) } return n, err } diff --git a/src/vfs/resolver.go b/src/vfs/resolver.go index 3b3c63b..bb5d4da 100644 --- a/src/vfs/resolver.go +++ b/src/vfs/resolver.go @@ -118,7 +118,7 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er if err != nil { return nil, err } - defer file.Close(ctx) + // it is factory responsibility to close file then needed err = func() error { factoryCtx, cancel := subTimeout(ctx) @@ -208,6 +208,7 @@ func (r *ResolverFS) Type() fs.FileMode { var _ Filesystem = &ResolverFS{} +// It factory responsobility to close file type FsFactory func(ctx context.Context, f File) (Filesystem, error) func NewResolver(factories map[string]FsFactory) *Resolver { @@ -306,7 +307,8 @@ PARTS_LOOP: if err != nil { return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) } - defer fsFile.Close(ctx) + // it is factory responsibility to close file then needed + nestedFs, err := nestFactory(ctx, fsFile) if err != nil { return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)