Compare commits
No commits in common. "0fa3a914470b0aeb3d6000e466243126682d0733" and "d11fa31023423115c18f93b14193840042f203cc" have entirely different histories.
0fa3a91447
...
d11fa31023
14 changed files with 82 additions and 87 deletions
1
go.mod
1
go.mod
|
@ -30,7 +30,6 @@ require (
|
||||||
github.com/knadh/koanf/v2 v2.1.1
|
github.com/knadh/koanf/v2 v2.1.1
|
||||||
github.com/labstack/echo-contrib v0.17.1
|
github.com/labstack/echo-contrib v0.17.1
|
||||||
github.com/labstack/echo/v4 v4.12.0
|
github.com/labstack/echo/v4 v4.12.0
|
||||||
github.com/mattetti/filebuffer v1.0.1
|
|
||||||
github.com/nwaples/rardecode/v2 v2.0.0-beta.2
|
github.com/nwaples/rardecode/v2 v2.0.0-beta.2
|
||||||
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93
|
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93
|
||||||
github.com/ravilushqa/otelgqlgen v0.15.0
|
github.com/ravilushqa/otelgqlgen v0.15.0
|
||||||
|
|
10
go.sum
10
go.sum
|
@ -398,8 +398,6 @@ github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0
|
||||||
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
|
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
|
||||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||||
github.com/mattetti/filebuffer v1.0.1 h1:gG7pyfnSIZCxdoKq+cPa8T0hhYtD9NxCdI4D7PTjRLM=
|
|
||||||
github.com/mattetti/filebuffer v1.0.1/go.mod h1:YdMURNDOttIiruleeVr6f56OrMc+MydEnTcXwtkxNVs=
|
|
||||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||||
|
@ -545,8 +543,16 @@ github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkL
|
||||||
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI=
|
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI=
|
||||||
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk=
|
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk=
|
||||||
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
|
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
|
||||||
|
github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6 h1:rGXhPFpOVLeOO/Da2qxBNZY5yaQdTCGxQV2dUDXXf7U=
|
||||||
|
github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
|
||||||
|
github.com/royalcat/kv v0.0.0-20240617074755-245a773511b7 h1:ZtxuWjMwbFwaj5zcT0VZqvsyViYYzYJkprFe/9p5PUs=
|
||||||
|
github.com/royalcat/kv v0.0.0-20240617074755-245a773511b7/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
|
||||||
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f h1:bG8Pp/YXkpC2eFI7psTiTAL7QTBqHQcP/lEpiqmBXn4=
|
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f h1:bG8Pp/YXkpC2eFI7psTiTAL7QTBqHQcP/lEpiqmBXn4=
|
||||||
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
|
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
|
||||||
|
github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950 h1:rKG2P4TNLgA4/Jl7LPayifjcw4txVGVSPkpHVhn3wnw=
|
||||||
|
github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
|
||||||
|
github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6 h1:/TWa41uAL8Vk0MkvZc03EjA1/bS2otK5q0/+6bSWKJI=
|
||||||
|
github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
|
||||||
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f h1:wz3pvg7YJdibZXQRV6B5pVPeDK8bgnuJVnBf7OFtCWI=
|
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f h1:wz3pvg7YJdibZXQRV6B5pVPeDK8bgnuJVnBf7OFtCWI=
|
||||||
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
|
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
|
||||||
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8=
|
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8=
|
||||||
|
|
|
@ -13,6 +13,10 @@ type Filesystem interface {
|
||||||
// it if it already exists. If successful, methods on the returned File can
|
// it if it already exists. If successful, methods on the returned File can
|
||||||
// be used for I/O; the associated file descriptor has mode O_RDWR.
|
// be used for I/O; the associated file descriptor has mode O_RDWR.
|
||||||
Create(ctx context.Context, filename string) (File, error)
|
Create(ctx context.Context, filename string) (File, error)
|
||||||
|
// Open opens the named file for reading. If successful, methods on the
|
||||||
|
// returned file can be used for reading; the associated file descriptor has
|
||||||
|
// mode O_RDONLY.
|
||||||
|
Open(ctx context.Context, filename string) (File, error)
|
||||||
// OpenFile is the generalized open call; most users will use Open or Create
|
// OpenFile is the generalized open call; most users will use Open or Create
|
||||||
// instead. It opens the named file with specified flag (O_RDONLY etc.) and
|
// instead. It opens the named file with specified flag (O_RDONLY etc.) and
|
||||||
// perm, (0666 etc.) if applicable. If successful, methods on the returned
|
// perm, (0666 etc.) if applicable. If successful, methods on the returned
|
||||||
|
|
|
@ -117,6 +117,10 @@ func (fs *UringFS) MkdirAll(ctx context.Context, path string, perm os.FileMode)
|
||||||
return os.MkdirAll(dir, perm)
|
return os.MkdirAll(dir, perm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *UringFS) Open(ctx context.Context, filename string) (File, error) {
|
||||||
|
return fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) {
|
func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) {
|
||||||
filename, err := fs.abs(filename)
|
filename, err := fs.abs(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/royalcat/ctxio"
|
"github.com/royalcat/ctxio"
|
||||||
)
|
)
|
||||||
|
@ -20,7 +19,6 @@ type FileBuffer struct {
|
||||||
// index indicates where in the buffer we are at
|
// index indicates where in the buffer we are at
|
||||||
index int64
|
index int64
|
||||||
isClosed bool
|
isClosed bool
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ FileReader = (*FileBuffer)(nil)
|
var _ FileReader = (*FileBuffer)(nil)
|
||||||
|
@ -55,20 +53,14 @@ func NewFileBufferFromIoReader(reader io.Reader) (*FileBuffer, error) {
|
||||||
|
|
||||||
// Bytes returns the bytes available until the end of the buffer.
|
// Bytes returns the bytes available until the end of the buffer.
|
||||||
func (f *FileBuffer) Bytes() []byte {
|
func (f *FileBuffer) Bytes() []byte {
|
||||||
f.mu.RLock()
|
|
||||||
defer f.mu.RUnlock()
|
|
||||||
|
|
||||||
if f.isClosed || f.index >= int64(f.buff.Len()) {
|
if f.isClosed || f.index >= int64(f.buff.Len()) {
|
||||||
return []byte{}
|
return []byte{}
|
||||||
}
|
}
|
||||||
return bytes.Clone(f.buff.Bytes()[f.index:])
|
return f.buff.Bytes()[f.index:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// String implements the Stringer interface
|
// String implements the Stringer interface
|
||||||
func (f *FileBuffer) String() string {
|
func (f *FileBuffer) String() string {
|
||||||
f.mu.RLock()
|
|
||||||
defer f.mu.RUnlock()
|
|
||||||
|
|
||||||
return string(f.buff.Bytes()[f.index:])
|
return string(f.buff.Bytes()[f.index:])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,9 +76,6 @@ func (f *FileBuffer) String() string {
|
||||||
// that a Reader returning a non-zero number of bytes at the end of the input stream may return
|
// that a Reader returning a non-zero number of bytes at the end of the input stream may return
|
||||||
// either err == EOF or err == nil. The next Read should return 0, EOF.
|
// either err == EOF or err == nil. The next Read should return 0, EOF.
|
||||||
func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) {
|
func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) {
|
||||||
f.mu.RLock()
|
|
||||||
defer f.mu.RUnlock()
|
|
||||||
|
|
||||||
if f.isClosed {
|
if f.isClosed {
|
||||||
return 0, os.ErrClosed
|
return 0, os.ErrClosed
|
||||||
}
|
}
|
||||||
|
@ -120,9 +109,6 @@ func (f *FileBuffer) Read(ctx context.Context, b []byte) (n int, err error) {
|
||||||
// ReadAt should not affect nor be affected by the underlying seek offset.
|
// ReadAt should not affect nor be affected by the underlying seek offset.
|
||||||
// Clients of ReadAt can execute parallel ReadAt calls on the same input source.
|
// Clients of ReadAt can execute parallel ReadAt calls on the same input source.
|
||||||
func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||||
f.mu.RLock()
|
|
||||||
defer f.mu.RUnlock()
|
|
||||||
|
|
||||||
if f.isClosed {
|
if f.isClosed {
|
||||||
return 0, os.ErrClosed
|
return 0, os.ErrClosed
|
||||||
}
|
}
|
||||||
|
@ -145,9 +131,6 @@ func (f *FileBuffer) ReadAt(ctx context.Context, p []byte, off int64) (n int, er
|
||||||
// Write implements io.Writer https://golang.org/pkg/io/#Writer
|
// Write implements io.Writer https://golang.org/pkg/io/#Writer
|
||||||
// by appending the passed bytes to the buffer unless the buffer is closed or index negative.
|
// by appending the passed bytes to the buffer unless the buffer is closed or index negative.
|
||||||
func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) {
|
func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) {
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
|
|
||||||
if f.isClosed {
|
if f.isClosed {
|
||||||
return 0, os.ErrClosed
|
return 0, os.ErrClosed
|
||||||
}
|
}
|
||||||
|
@ -168,9 +151,6 @@ func (f *FileBuffer) Write(ctx context.Context, p []byte) (n int, err error) {
|
||||||
|
|
||||||
// Seek implements io.Seeker https://golang.org/pkg/io/#Seeker
|
// Seek implements io.Seeker https://golang.org/pkg/io/#Seeker
|
||||||
func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) {
|
func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) {
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
|
|
||||||
if f.isClosed {
|
if f.isClosed {
|
||||||
return 0, os.ErrClosed
|
return 0, os.ErrClosed
|
||||||
}
|
}
|
||||||
|
@ -196,9 +176,6 @@ func (f *FileBuffer) Seek(offset int64, whence int) (idx int64, err error) {
|
||||||
// Close implements io.Closer https://golang.org/pkg/io/#Closer
|
// Close implements io.Closer https://golang.org/pkg/io/#Closer
|
||||||
// It closes the buffer, rendering it unusable for I/O. It returns an error, if any.
|
// It closes the buffer, rendering it unusable for I/O. It returns an error, if any.
|
||||||
func (f *FileBuffer) Close(ctx context.Context) error {
|
func (f *FileBuffer) Close(ctx context.Context) error {
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
|
|
||||||
f.isClosed = true
|
f.isClosed = true
|
||||||
f.buff = nil
|
f.buff = nil
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -58,6 +58,22 @@ func (*fsWrapper) MkdirAll(ctx context.Context, filename string, perm fs.FileMod
|
||||||
return billy.ErrNotSupported
|
return billy.ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open implements billy.Filesystem.
|
||||||
|
func (fs *fsWrapper) Open(ctx context.Context, filename string) (nfs.File, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
file, err := fs.fs.Open(ctx, filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, billyErr(ctx, err, fs.log)
|
||||||
|
}
|
||||||
|
return &billyFile{
|
||||||
|
name: filename,
|
||||||
|
file: file,
|
||||||
|
log: fs.log.With("filename", filename),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// OpenFile implements billy.Filesystem.
|
// OpenFile implements billy.Filesystem.
|
||||||
func (fs *fsWrapper) OpenFile(ctx context.Context, filename string, flag int, perm fs.FileMode) (nfs.File, error) {
|
func (fs *fsWrapper) OpenFile(ctx context.Context, filename string, flag int, perm fs.FileMode) (nfs.File, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
|
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
|
||||||
|
|
|
@ -130,11 +130,13 @@ func (s *Daemon) Close(ctx context.Context) error {
|
||||||
)...)
|
)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
|
func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
|
||||||
ctx, span := tracer.Start(ctx, "LoadTorrent")
|
ctx, span := tracer.Start(ctx, "LoadTorrent")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
log := s.log
|
log := s.log
|
||||||
|
|
||||||
|
defer f.Close(ctx)
|
||||||
|
|
||||||
stat, err := f.Info()
|
stat, err := f.Info()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("call stat failed: %w", err)
|
return nil, fmt.Errorf("call stat failed: %w", err)
|
||||||
|
@ -272,7 +274,7 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
|
||||||
|
|
||||||
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
|
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
|
||||||
|
|
||||||
_, err = s.loadTorrent(ctx, vfile)
|
_, err = s.LoadTorrent(ctx, vfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, "failed adding torrent", rlog.Error(err))
|
log.Error(ctx, "failed adding torrent", rlog.Error(err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"log/slog"
|
|
||||||
"path"
|
"path"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -13,7 +12,6 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
|
||||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||||
"github.com/anacrolix/torrent"
|
"github.com/anacrolix/torrent"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -40,15 +38,11 @@ const shortTimeout = time.Millisecond
|
||||||
const lowTimeout = time.Second * 5
|
const lowTimeout = time.Second * 5
|
||||||
|
|
||||||
func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
|
func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
|
||||||
c, err := s.loadTorrent(ctx, f)
|
c, err := s.LoadTorrent(ctx, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f.Close(ctx); err != nil {
|
|
||||||
s.log.Error(ctx, "failed to close file", slog.String("name", f.Name()), rlog.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return &TorrentFS{
|
return &TorrentFS{
|
||||||
name: f.Name(),
|
name: f.Name(),
|
||||||
Torrent: c,
|
Torrent: c,
|
||||||
|
|
|
@ -118,12 +118,12 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
|
||||||
|
|
||||||
srcF, err := os.Open(paths[0])
|
srcF, err := os.Open(paths[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deduped, fmt.Errorf("error opening file %s: %w", paths[0], err)
|
return deduped, err
|
||||||
}
|
}
|
||||||
defer srcF.Close()
|
defer srcF.Close()
|
||||||
srcStat, err := srcF.Stat()
|
srcStat, err := srcF.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deduped, fmt.Errorf("error stat file %s: %w", paths[0], err)
|
return deduped, err
|
||||||
}
|
}
|
||||||
|
|
||||||
srcFd := int(srcF.Fd())
|
srcFd := int(srcF.Fd())
|
||||||
|
@ -133,12 +133,12 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
|
||||||
err = unix.Fstatfs(srcFd, &fsStat)
|
err = unix.Fstatfs(srcFd, &fsStat)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return deduped, fmt.Errorf("error statfs file %s: %w", paths[0], err)
|
return deduped, err
|
||||||
}
|
}
|
||||||
|
|
||||||
srcHash, err := filehash(srcF)
|
srcHash, err := filehash(srcF)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deduped, fmt.Errorf("error hashing file %s: %w", paths[0], err)
|
return deduped, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if int64(fsStat.Bsize) > srcSize { // for btrfs it means file in residing in not deduplicatable metadata
|
if int64(fsStat.Bsize) > srcSize { // for btrfs it means file in residing in not deduplicatable metadata
|
||||||
|
@ -162,13 +162,13 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
|
||||||
|
|
||||||
destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm)
|
destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deduped, fmt.Errorf("error opening file %s: %w", dst, err)
|
return deduped, err
|
||||||
}
|
}
|
||||||
defer destF.Close()
|
defer destF.Close()
|
||||||
|
|
||||||
dstHash, err := filehash(destF)
|
dstHash, err := filehash(destF)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deduped, fmt.Errorf("error hashing file %s: %w", dst, err)
|
return deduped, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if srcHash != dstHash {
|
if srcHash != dstHash {
|
||||||
|
@ -199,7 +199,7 @@ func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
|
||||||
|
|
||||||
err = unix.IoctlFileDedupeRange(srcFd, &rng)
|
err = unix.IoctlFileDedupeRange(srcFd, &rng)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deduped, fmt.Errorf("error calling FIDEDUPERANGE: %w", err)
|
return deduped, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range rng.Info {
|
for i := range rng.Info {
|
||||||
|
@ -223,3 +223,7 @@ func filehash(r io.Reader) ([20]byte, error) {
|
||||||
|
|
||||||
return sha1.Sum(buf), nil
|
return sha1.Sum(buf), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ptr[D any](v D) *D {
|
||||||
|
return &v
|
||||||
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package ytdlp
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"os"
|
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
||||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||||
|
@ -36,7 +35,7 @@ func (s *SourceFS) Open(ctx context.Context, filename string) (vfs.File, error)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := s.fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
|
f, err := s.fs.Open(ctx, filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,6 @@ import (
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
||||||
"github.com/bodgit/sevenzip"
|
"github.com/bodgit/sevenzip"
|
||||||
"github.com/mattetti/filebuffer"
|
|
||||||
"github.com/nwaples/rardecode/v2"
|
"github.com/nwaples/rardecode/v2"
|
||||||
"github.com/royalcat/ctxio"
|
"github.com/royalcat/ctxio"
|
||||||
)
|
)
|
||||||
|
@ -175,7 +174,7 @@ func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archi
|
||||||
size: size,
|
size: size,
|
||||||
af: af,
|
af: af,
|
||||||
|
|
||||||
buffer: filebuffer.New(nil),
|
buffer: ioutils.NewFileBuffer(nil),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,7 +189,7 @@ type archiveFile struct {
|
||||||
|
|
||||||
offset int64
|
offset int64
|
||||||
readen int64
|
readen int64
|
||||||
buffer *filebuffer.Buffer
|
buffer *ioutils.FileBuffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name implements File.
|
// Name implements File.
|
||||||
|
@ -215,7 +214,14 @@ func (d *archiveFile) IsDir() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *archiveFile) Close(ctx context.Context) error {
|
||||||
|
return d.buffer.Close(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
|
func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
|
||||||
|
d.m.Lock()
|
||||||
|
defer d.m.Unlock()
|
||||||
|
|
||||||
if to < d.readen {
|
if to < d.readen {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -224,68 +230,55 @@ func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get file reader: %w", err)
|
return fmt.Errorf("failed to get file reader: %w", err)
|
||||||
}
|
}
|
||||||
defer reader.Close(ctx)
|
defer reader.Close()
|
||||||
|
|
||||||
_, err = d.buffer.Seek(0, io.SeekStart)
|
_, err = d.buffer.Seek(0, io.SeekStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("failed to seek to start of the file: %w", err)
|
||||||
}
|
}
|
||||||
d.readen, err = ctxio.CopyN(ctx, ctxio.WrapIoWriter(d.buffer), reader, to+readahead)
|
d.readen, err = ctxio.CopyN(ctx, d.buffer, ctxio.WrapIoReader(reader), to+readahead)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return fmt.Errorf("error copying from archive file reader: %w", err)
|
return fmt.Errorf("error copying from archive file reader: %w", err)
|
||||||
}
|
}
|
||||||
_, err = d.buffer.Seek(d.offset, io.SeekStart)
|
_, err = d.buffer.Seek(d.offset, io.SeekStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("failed to seek to start of the file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||||
d.m.Lock()
|
|
||||||
defer d.m.Unlock()
|
|
||||||
|
|
||||||
err = d.loadMore(ctx, d.offset+int64(len(p)))
|
err = d.loadMore(ctx, d.offset+int64(len(p)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
||||||
}
|
}
|
||||||
n, err = d.buffer.Read(p)
|
n, err = d.buffer.Read(ctx, p)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return n, fmt.Errorf("failed to read from buffer: %w", err)
|
return n, fmt.Errorf("failed to read from buffer: %w", err)
|
||||||
}
|
}
|
||||||
d.offset += int64(n)
|
d.offset += int64(n)
|
||||||
return n, err
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||||
d.m.Lock()
|
|
||||||
defer d.m.Unlock()
|
|
||||||
|
|
||||||
err = d.loadMore(ctx, off+int64(len(p)))
|
err = d.loadMore(ctx, off+int64(len(p)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
|
||||||
}
|
}
|
||||||
n, err = d.buffer.ReadAt(p, off)
|
n, err = d.buffer.ReadAt(ctx, p, off)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return n, fmt.Errorf("failed to read from buffer: %w", err)
|
return n, fmt.Errorf("failed to read from buffer: %w", err)
|
||||||
}
|
}
|
||||||
return n, err
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *archiveFile) Close(ctx context.Context) error {
|
type archiveFileReaderFactory func(ctx context.Context) (io.ReadCloser, error)
|
||||||
d.m.Lock()
|
|
||||||
defer d.m.Unlock()
|
|
||||||
|
|
||||||
return d.buffer.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error)
|
|
||||||
|
|
||||||
var _ archiveLoader = ZipLoader
|
var _ archiveLoader = ZipLoader
|
||||||
|
|
||||||
func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
||||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||||
|
|
||||||
zr, err := zip.NewReader(reader, size)
|
zr, err := zip.NewReader(reader, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -299,7 +292,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
}
|
}
|
||||||
|
|
||||||
i := i
|
i := i
|
||||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
af := func(ctx context.Context) (io.ReadCloser, error) {
|
||||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||||
|
|
||||||
zr, err := zip.NewReader(reader, size)
|
zr, err := zip.NewReader(reader, size)
|
||||||
|
@ -312,7 +305,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
return nil, fmt.Errorf("failed to open file in zip archive: %w", err)
|
return nil, fmt.Errorf("failed to open file in zip archive: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctxio.WrapIoReadCloser(rc), nil
|
return rc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af)
|
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af)
|
||||||
|
@ -324,7 +317,8 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
var _ archiveLoader = SevenZipLoader
|
var _ archiveLoader = SevenZipLoader
|
||||||
|
|
||||||
func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
||||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
reader := ctxio.IoReaderAt(context.Background(), ctxreader)
|
||||||
|
|
||||||
r, err := sevenzip.NewReader(reader, size)
|
r, err := sevenzip.NewReader(reader, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -338,7 +332,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
|
||||||
}
|
}
|
||||||
|
|
||||||
i := i
|
i := i
|
||||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
af := func(ctx context.Context) (io.ReadCloser, error) {
|
||||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||||
zr, err := sevenzip.NewReader(reader, size)
|
zr, err := sevenzip.NewReader(reader, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -350,7 +344,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctxio.WrapIoReadCloser(rc), nil
|
return rc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af)
|
out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af)
|
||||||
|
@ -380,7 +374,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
}
|
}
|
||||||
|
|
||||||
name := header.Name
|
name := header.Name
|
||||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
af := func(ctx context.Context) (io.ReadCloser, error) {
|
||||||
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||||
r, err := rardecode.NewReader(reader)
|
r, err := rardecode.NewReader(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -392,7 +386,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if header.Name == name {
|
if header.Name == name {
|
||||||
return ctxio.NopCloser(ctxio.WrapIoReader(r)), nil
|
return io.NopCloser(r), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("file with name '%s' not found", name)
|
return nil, fmt.Errorf("file with name '%s' not found", name)
|
||||||
|
|
|
@ -3,7 +3,6 @@ package vfs
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"os"
|
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
||||||
)
|
)
|
||||||
|
@ -43,7 +42,7 @@ func (c *CtxBillyFs) Open(ctx context.Context, filename string) (File, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
bf, err := c.fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
|
bf, err := c.fs.Open(ctx, filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -36,7 +35,7 @@ type LogFS struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func isLoggableError(err error) bool {
|
func isLoggableError(err error) bool {
|
||||||
return err != nil && !errors.Is(err, fs.ErrNotExist) && !errors.Is(err, io.EOF)
|
return err != nil && !errors.Is(err, fs.ErrNotExist)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Filesystem = (*LogFS)(nil)
|
var _ Filesystem = (*LogFS)(nil)
|
||||||
|
@ -308,7 +307,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e
|
||||||
|
|
||||||
n, err = f.f.ReadAt(ctx, p, off)
|
n, err = f.f.ReadAt(ctx, p, off)
|
||||||
if isLoggableError(err) {
|
if isLoggableError(err) {
|
||||||
f.log.Error(ctx, "Failed to read", rlog.Error(err))
|
f.log.Error(ctx, "Failed to read")
|
||||||
}
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// it is factory responsibility to close file then needed
|
defer file.Close(ctx)
|
||||||
|
|
||||||
err = func() error {
|
err = func() error {
|
||||||
factoryCtx, cancel := subTimeout(ctx)
|
factoryCtx, cancel := subTimeout(ctx)
|
||||||
|
@ -208,7 +208,6 @@ func (r *ResolverFS) Type() fs.FileMode {
|
||||||
|
|
||||||
var _ Filesystem = &ResolverFS{}
|
var _ Filesystem = &ResolverFS{}
|
||||||
|
|
||||||
// It factory responsobility to close file
|
|
||||||
type FsFactory func(ctx context.Context, f File) (Filesystem, error)
|
type FsFactory func(ctx context.Context, f File) (Filesystem, error)
|
||||||
|
|
||||||
func NewResolver(factories map[string]FsFactory) *Resolver {
|
func NewResolver(factories map[string]FsFactory) *Resolver {
|
||||||
|
@ -307,8 +306,7 @@ PARTS_LOOP:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
|
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
|
||||||
}
|
}
|
||||||
// it is factory responsibility to close file then needed
|
defer fsFile.Close(ctx)
|
||||||
|
|
||||||
nestedFs, err := nestFactory(ctx, fsFile)
|
nestedFs, err := nestFactory(ctx, fsFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
|
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
|
||||||
|
|
Loading…
Reference in a new issue