Compare commits

...

2 commits

Author SHA1 Message Date
0fa3a91447 fs refactor
All checks were successful
docker / build-docker (linux/amd64) (push) Successful in 2m46s
docker / build-docker (linux/arm64) (push) Successful in 7m38s
2024-06-26 00:39:30 +03:00
3dcf27d900 dedupe errors 2024-06-26 00:39:10 +03:00
14 changed files with 87 additions and 82 deletions

1
go.mod
View file

@ -30,6 +30,7 @@ require (
github.com/knadh/koanf/v2 v2.1.1 github.com/knadh/koanf/v2 v2.1.1
github.com/labstack/echo-contrib v0.17.1 github.com/labstack/echo-contrib v0.17.1
github.com/labstack/echo/v4 v4.12.0 github.com/labstack/echo/v4 v4.12.0
github.com/mattetti/filebuffer v1.0.1
github.com/nwaples/rardecode/v2 v2.0.0-beta.2 github.com/nwaples/rardecode/v2 v2.0.0-beta.2
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93
github.com/ravilushqa/otelgqlgen v0.15.0 github.com/ravilushqa/otelgqlgen v0.15.0

10
go.sum
View file

@ -398,6 +398,8 @@ github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattetti/filebuffer v1.0.1 h1:gG7pyfnSIZCxdoKq+cPa8T0hhYtD9NxCdI4D7PTjRLM=
github.com/mattetti/filebuffer v1.0.1/go.mod h1:YdMURNDOttIiruleeVr6f56OrMc+MydEnTcXwtkxNVs=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
@ -543,16 +545,8 @@ github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkL
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI= github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6 h1:rGXhPFpOVLeOO/Da2qxBNZY5yaQdTCGxQV2dUDXXf7U=
github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv v0.0.0-20240617074755-245a773511b7 h1:ZtxuWjMwbFwaj5zcT0VZqvsyViYYzYJkprFe/9p5PUs=
github.com/royalcat/kv v0.0.0-20240617074755-245a773511b7/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f h1:bG8Pp/YXkpC2eFI7psTiTAL7QTBqHQcP/lEpiqmBXn4= github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f h1:bG8Pp/YXkpC2eFI7psTiTAL7QTBqHQcP/lEpiqmBXn4=
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950 h1:rKG2P4TNLgA4/Jl7LPayifjcw4txVGVSPkpHVhn3wnw=
github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6 h1:/TWa41uAL8Vk0MkvZc03EjA1/bS2otK5q0/+6bSWKJI=
github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f h1:wz3pvg7YJdibZXQRV6B5pVPeDK8bgnuJVnBf7OFtCWI= github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f h1:wz3pvg7YJdibZXQRV6B5pVPeDK8bgnuJVnBf7OFtCWI=
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8= github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8=

View file

@ -13,10 +13,6 @@ type Filesystem interface {
// it if it already exists. If successful, methods on the returned File can // it if it already exists. If successful, methods on the returned File can
// be used for I/O; the associated file descriptor has mode O_RDWR. // be used for I/O; the associated file descriptor has mode O_RDWR.
Create(ctx context.Context, filename string) (File, error) Create(ctx context.Context, filename string) (File, error)
// Open opens the named file for reading. If successful, methods on the
// returned file can be used for reading; the associated file descriptor has
// mode O_RDONLY.
Open(ctx context.Context, filename string) (File, error)
// OpenFile is the generalized open call; most users will use Open or Create // OpenFile is the generalized open call; most users will use Open or Create
// instead. It opens the named file with specified flag (O_RDONLY etc.) and // instead. It opens the named file with specified flag (O_RDONLY etc.) and
// perm, (0666 etc.) if applicable. If successful, methods on the returned // perm, (0666 etc.) if applicable. If successful, methods on the returned

View file

@ -117,10 +117,6 @@ func (fs *UringFS) MkdirAll(ctx context.Context, path string, perm os.FileMode)
return os.MkdirAll(dir, perm) return os.MkdirAll(dir, perm)
} }
func (fs *UringFS) Open(ctx context.Context, filename string) (File, error) {
return fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
}
func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) { func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) {
filename, err := fs.abs(filename) filename, err := fs.abs(filename)
if err != nil { if err != nil {

View file

@ -6,6 +6,7 @@ import (
"errors" "errors"
"io" "io"
"os" "os"
"sync"
"github.com/royalcat/ctxio" "github.com/royalcat/ctxio"
) )
@ -19,6 +20,7 @@ type FileBuffer struct {
// index indicates where in the buffer we are at // index indicates where in the buffer we are at
index int64 index int64
isClosed bool isClosed bool
mu sync.RWMutex
} }
var _ FileReader = (*FileBuffer)(nil) var _ FileReader = (*FileBuffer)(nil)
@ -53,14 +55,20 @@ func NewFileBufferFromIoReader(reader io.Reader) (*FileBuffer, error) {
// Bytes returns the bytes available until the end of the buffer. // Bytes returns the bytes available until the end of the buffer.
func (f *FileBuffer) Bytes() []byte { func (f *FileBuffer) Bytes() []byte {
f.mu.RLock()
defer f.mu.RUnlock()
if f.isClosed || f.index >= int64(f.buff.Len()) { if f.isClosed || f.index >= int64(f.buff.Len()) {
return []byte{} return []byte{}
} }
return f.buff.Bytes()[f.index:] return bytes.Clone(f.buff.Bytes()[f.index:])
} }
// String implements the Stringer interface // String implements the Stringer interface
func (f *FileBuffer) String() string { func (f *FileBuffer) String() string {
f.mu.RLock()
defer f.mu.RUnlock()
return string(f.buff.Bytes()[f.index:]) return string(f.buff.Bytes()[f.index:])
} }
@ -76,6 +84,9 @@ func (f *FileBuffer) String() string {
// that a Reader returning a non-zero number of bytes at the end of the input stream may return // that a Reader returning a non-zero number of bytes at the end of the input stream may return
// either err == EOF or err == nil. The next Read should return 0, EOF. // either err == EOF or err == nil. The next Read should return 0, EOF.
func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) { func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.isClosed { if f.isClosed {
return 0, os.ErrClosed return 0, os.ErrClosed
} }
@ -109,6 +120,9 @@ func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) {
// ReadAt should not affect nor be affected by the underlying seek offset. // ReadAt should not affect nor be affected by the underlying seek offset.
// Clients of ReadAt can execute parallel ReadAt calls on the same input source. // Clients of ReadAt can execute parallel ReadAt calls on the same input source.
func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.isClosed { if f.isClosed {
return 0, os.ErrClosed return 0, os.ErrClosed
} }
@ -131,6 +145,9 @@ func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, er
// Write implements io.Writer https://golang.org/pkg/io/#Writer // Write implements io.Writer https://golang.org/pkg/io/#Writer
// by appending the passed bytes to the buffer unless the buffer is closed or index negative. // by appending the passed bytes to the buffer unless the buffer is closed or index negative.
func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) { func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.isClosed { if f.isClosed {
return 0, os.ErrClosed return 0, os.ErrClosed
} }
@ -151,6 +168,9 @@ func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) {
// Seek implements io.Seeker https://golang.org/pkg/io/#Seeker // Seek implements io.Seeker https://golang.org/pkg/io/#Seeker
func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) { func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.isClosed { if f.isClosed {
return 0, os.ErrClosed return 0, os.ErrClosed
} }
@ -176,6 +196,9 @@ func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) {
// Close implements io.Closer https://golang.org/pkg/io/#Closer // Close implements io.Closer https://golang.org/pkg/io/#Closer
// It closes the buffer, rendering it unusable for I/O. It returns an error, if any. // It closes the buffer, rendering it unusable for I/O. It returns an error, if any.
func (f *FileBuffer) Close(ctx context.Context) error { func (f *FileBuffer) Close(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()
f.isClosed = true f.isClosed = true
f.buff = nil f.buff = nil
return nil return nil

View file

@ -58,22 +58,6 @@ func (*fsWrapper) MkdirAll(ctx context.Context, filename string, perm fs.FileMod
return billy.ErrNotSupported return billy.ErrNotSupported
} }
// Open implements billy.Filesystem.
func (fs *fsWrapper) Open(ctx context.Context, filename string) (nfs.File, error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
file, err := fs.fs.Open(ctx, filename)
if err != nil {
return nil, billyErr(ctx, err, fs.log)
}
return &billyFile{
name: filename,
file: file,
log: fs.log.With("filename", filename),
}, nil
}
// OpenFile implements billy.Filesystem. // OpenFile implements billy.Filesystem.
func (fs *fsWrapper) OpenFile(ctx context.Context, filename string, flag int, perm fs.FileMode) (nfs.File, error) { func (fs *fsWrapper) OpenFile(ctx context.Context, filename string, flag int, perm fs.FileMode) (nfs.File, error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout) ctx, cancel := context.WithTimeout(ctx, fs.timeout)

View file

@ -130,13 +130,11 @@ func (s *Daemon) Close(ctx context.Context) error {
)...) )...)
} }
func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, error) { func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
ctx, span := tracer.Start(ctx, "LoadTorrent") ctx, span := tracer.Start(ctx, "LoadTorrent")
defer span.End() defer span.End()
log := s.log log := s.log
defer f.Close(ctx)
stat, err := f.Info() stat, err := f.Info()
if err != nil { if err != nil {
return nil, fmt.Errorf("call stat failed: %w", err) return nil, fmt.Errorf("call stat failed: %w", err)
@ -274,7 +272,7 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file)) vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
_, err = s.LoadTorrent(ctx, vfile) _, err = s.loadTorrent(ctx, vfile)
if err != nil { if err != nil {
log.Error(ctx, "failed adding torrent", rlog.Error(err)) log.Error(ctx, "failed adding torrent", rlog.Error(err))
} }

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/fs" "io/fs"
"log/slog"
"path" "path"
"slices" "slices"
"strings" "strings"
@ -12,6 +13,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/vfs" "git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -38,11 +40,15 @@ const shortTimeout = time.Millisecond
const lowTimeout = time.Second * 5 const lowTimeout = time.Second * 5
func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) { func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
c, err := s.LoadTorrent(ctx, f) c, err := s.loadTorrent(ctx, f)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := f.Close(ctx); err != nil {
s.log.Error(ctx, "failed to close file", slog.String("name", f.Name()), rlog.Error(err))
}
return &TorrentFS{ return &TorrentFS{
name: f.Name(), name: f.Name(),
Torrent: c, Torrent: c,

View file

@ -118,12 +118,12 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
srcF, err := os.Open(paths[0]) srcF, err := os.Open(paths[0])
if err != nil { if err != nil {
return deduped, err return deduped, fmt.Errorf("error opening file %s: %w", paths[0], err)
} }
defer srcF.Close() defer srcF.Close()
srcStat, err := srcF.Stat() srcStat, err := srcF.Stat()
if err != nil { if err != nil {
return deduped, err return deduped, fmt.Errorf("error stat file %s: %w", paths[0], err)
} }
srcFd := int(srcF.Fd()) srcFd := int(srcF.Fd())
@ -133,12 +133,12 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
err = unix.Fstatfs(srcFd, &fsStat) err = unix.Fstatfs(srcFd, &fsStat)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
return deduped, err return deduped, fmt.Errorf("error statfs file %s: %w", paths[0], err)
} }
srcHash, err := filehash(srcF) srcHash, err := filehash(srcF)
if err != nil { if err != nil {
return deduped, err return deduped, fmt.Errorf("error hashing file %s: %w", paths[0], err)
} }
if int64(fsStat.Bsize) > srcSize { // for btrfs it means file in residing in not deduplicatable metadata if int64(fsStat.Bsize) > srcSize { // for btrfs it means file in residing in not deduplicatable metadata
@ -162,13 +162,13 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm) destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm)
if err != nil { if err != nil {
return deduped, err return deduped, fmt.Errorf("error opening file %s: %w", dst, err)
} }
defer destF.Close() defer destF.Close()
dstHash, err := filehash(destF) dstHash, err := filehash(destF)
if err != nil { if err != nil {
return deduped, err return deduped, fmt.Errorf("error hashing file %s: %w", dst, err)
} }
if srcHash != dstHash { if srcHash != dstHash {
@ -199,7 +199,7 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
err = unix.IoctlFileDedupeRange(srcFd, &rng) err = unix.IoctlFileDedupeRange(srcFd, &rng)
if err != nil { if err != nil {
return deduped, err return deduped, fmt.Errorf("error calling FIDEDUPERANGE: %w", err)
} }
for i := range rng.Info { for i := range rng.Info {
@ -223,7 +223,3 @@ func filehash(r io.Reader) ([20]byte, error) {
return sha1.Sum(buf), nil return sha1.Sum(buf), nil
} }
func ptr[D any](v D) *D {
return &v
}

View file

@ -3,6 +3,7 @@ package ytdlp
import ( import (
"context" "context"
"io/fs" "io/fs"
"os"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/src/vfs" "git.kmsign.ru/royalcat/tstor/src/vfs"
@ -35,7 +36,7 @@ func (s *SourceFS) Open(ctx context.Context, filename string) (vfs.File, error)
return nil, err return nil, err
} }
f, err := s.fs.Open(ctx, filename) f, err := s.fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -13,6 +13,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/ioutils" "git.kmsign.ru/royalcat/tstor/pkg/ioutils"
"github.com/bodgit/sevenzip" "github.com/bodgit/sevenzip"
"github.com/mattetti/filebuffer"
"github.com/nwaples/rardecode/v2" "github.com/nwaples/rardecode/v2"
"github.com/royalcat/ctxio" "github.com/royalcat/ctxio"
) )
@ -174,7 +175,7 @@ func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archi
size: size, size: size,
af: af, af: af,
buffer: ioutils.NewFileBuffer(nil), buffer: filebuffer.New(nil),
} }
} }
@ -189,7 +190,7 @@ type archiveFile struct {
offset int64 offset int64
readen int64 readen int64
buffer *ioutils.FileBuffer buffer *filebuffer.Buffer
} }
// Name implements File. // Name implements File.
@ -214,14 +215,7 @@ func (d *archiveFile) IsDir() bool {
return false return false
} }
func (d *archiveFile) Close(ctx context.Context) error {
return d.buffer.Close(ctx)
}
func (d *archiveFile) loadMore(ctx context.Context, to int64) error { func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
d.m.Lock()
defer d.m.Unlock()
if to < d.readen { if to < d.readen {
return nil return nil
} }
@ -230,55 +224,68 @@ func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to get file reader: %w", err) return fmt.Errorf("failed to get file reader: %w", err)
} }
defer reader.Close() defer reader.Close(ctx)
_, err = d.buffer.Seek(0, io.SeekStart) _, err = d.buffer.Seek(0, io.SeekStart)
if err != nil { if err != nil {
return fmt.Errorf("failed to seek to start of the file: %w", err) return err
} }
d.readen, err = ctxio.CopyN(ctx, d.buffer, ctxio.WrapIoReader(reader), to+readahead) d.readen, err = ctxio.CopyN(ctx, ctxio.WrapIoWriter(d.buffer), reader, to+readahead)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return fmt.Errorf("error copying from archive file reader: %w", err) return fmt.Errorf("error copying from archive file reader: %w", err)
} }
_, err = d.buffer.Seek(d.offset, io.SeekStart) _, err = d.buffer.Seek(d.offset, io.SeekStart)
if err != nil { if err != nil {
return fmt.Errorf("failed to seek to start of the file: %w", err) return err
} }
return nil return nil
} }
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) { func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
d.m.Lock()
defer d.m.Unlock()
err = d.loadMore(ctx, d.offset+int64(len(p))) err = d.loadMore(ctx, d.offset+int64(len(p)))
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err) return 0, fmt.Errorf("failed to load more from archive file: %w", err)
} }
n, err = d.buffer.Read(ctx, p) n, err = d.buffer.Read(p)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err) return n, fmt.Errorf("failed to read from buffer: %w", err)
} }
d.offset += int64(n) d.offset += int64(n)
return n, nil return n, err
} }
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
d.m.Lock()
defer d.m.Unlock()
err = d.loadMore(ctx, off+int64(len(p))) err = d.loadMore(ctx, off+int64(len(p)))
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err) return 0, fmt.Errorf("failed to load more from archive file: %w", err)
} }
n, err = d.buffer.ReadAt(ctx, p, off) n, err = d.buffer.ReadAt(p, off)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err) return n, fmt.Errorf("failed to read from buffer: %w", err)
} }
return n, nil return n, err
} }
type archiveFileReaderFactory func(ctx context.Context) (io.ReadCloser, error) func (d *archiveFile) Close(ctx context.Context) error {
d.m.Lock()
defer d.m.Unlock()
return d.buffer.Close()
}
type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error)
var _ archiveLoader = ZipLoader var _ archiveLoader = ZipLoader
func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader) reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := zip.NewReader(reader, size) zr, err := zip.NewReader(reader, size)
if err != nil { if err != nil {
return nil, err return nil, err
@ -292,7 +299,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
} }
i := i i := i
af := func(ctx context.Context) (io.ReadCloser, error) { af := func(ctx context.Context) (ctxio.ReadCloser, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader) reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := zip.NewReader(reader, size) zr, err := zip.NewReader(reader, size)
@ -305,7 +312,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
return nil, fmt.Errorf("failed to open file in zip archive: %w", err) return nil, fmt.Errorf("failed to open file in zip archive: %w", err)
} }
return rc, nil return ctxio.WrapIoReadCloser(rc), nil
} }
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af) out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af)
@ -317,8 +324,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
var _ archiveLoader = SevenZipLoader var _ archiveLoader = SevenZipLoader
func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
reader := ctxio.IoReaderAt(context.Background(), ctxreader) reader := ctxio.IoReaderAt(ctx, ctxreader)
r, err := sevenzip.NewReader(reader, size) r, err := sevenzip.NewReader(reader, size)
if err != nil { if err != nil {
return nil, err return nil, err
@ -332,7 +338,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
} }
i := i i := i
af := func(ctx context.Context) (io.ReadCloser, error) { af := func(ctx context.Context) (ctxio.ReadCloser, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader) reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := sevenzip.NewReader(reader, size) zr, err := sevenzip.NewReader(reader, size)
if err != nil { if err != nil {
@ -344,7 +350,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
return nil, err return nil, err
} }
return rc, nil return ctxio.WrapIoReadCloser(rc), nil
} }
out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af) out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af)
@ -374,7 +380,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
} }
name := header.Name name := header.Name
af := func(ctx context.Context) (io.ReadCloser, error) { af := func(ctx context.Context) (ctxio.ReadCloser, error) {
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
r, err := rardecode.NewReader(reader) r, err := rardecode.NewReader(reader)
if err != nil { if err != nil {
@ -386,7 +392,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
return nil, err return nil, err
} }
if header.Name == name { if header.Name == name {
return io.NopCloser(r), nil return ctxio.NopCloser(ctxio.WrapIoReader(r)), nil
} }
} }
return nil, fmt.Errorf("file with name '%s' not found", name) return nil, fmt.Errorf("file with name '%s' not found", name)

View file

@ -3,6 +3,7 @@ package vfs
import ( import (
"context" "context"
"io/fs" "io/fs"
"os"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
) )
@ -42,7 +43,7 @@ func (c *CtxBillyFs) Open(ctx context.Context, filename string) (File, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
bf, err := c.fs.Open(ctx, filename) bf, err := c.fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"io/fs" "io/fs"
"log/slog" "log/slog"
"reflect" "reflect"
@ -35,7 +36,7 @@ type LogFS struct {
} }
func isLoggableError(err error) bool { func isLoggableError(err error) bool {
return err != nil && !errors.Is(err, fs.ErrNotExist) return err != nil && !errors.Is(err, fs.ErrNotExist) && !errors.Is(err, io.EOF)
} }
var _ Filesystem = (*LogFS)(nil) var _ Filesystem = (*LogFS)(nil)
@ -307,7 +308,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e
n, err = f.f.ReadAt(ctx, p, off) n, err = f.f.ReadAt(ctx, p, off)
if isLoggableError(err) { if isLoggableError(err) {
f.log.Error(ctx, "Failed to read") f.log.Error(ctx, "Failed to read", rlog.Error(err))
} }
return n, err return n, err
} }

View file

@ -118,7 +118,7 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer file.Close(ctx) // it is factory responsibility to close file then needed
err = func() error { err = func() error {
factoryCtx, cancel := subTimeout(ctx) factoryCtx, cancel := subTimeout(ctx)
@ -208,6 +208,7 @@ func (r *ResolverFS) Type() fs.FileMode {
var _ Filesystem = &ResolverFS{} var _ Filesystem = &ResolverFS{}
// It factory responsobility to close file
type FsFactory func(ctx context.Context, f File) (Filesystem, error) type FsFactory func(ctx context.Context, f File) (Filesystem, error)
func NewResolver(factories map[string]FsFactory) *Resolver { func NewResolver(factories map[string]FsFactory) *Resolver {
@ -306,7 +307,8 @@ PARTS_LOOP:
if err != nil { if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
} }
defer fsFile.Close(ctx) // it is factory responsibility to close file then needed
nestedFs, err := nestFactory(ctx, fsFile) nestedFs, err := nestFactory(ctx, fsFile)
if err != nil { if err != nil {
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)