context fs

This commit is contained in:
royalcat 2024-03-21 00:47:51 +03:00
parent fd3beea874
commit 7b1863109c
25 changed files with 593 additions and 349 deletions

View file

@ -125,7 +125,10 @@ func run(configPath string) error {
c.AddDhtNodes(conf.TorrentClient.DHTNodes)
defer c.Close()
ts, err := service.NewService(conf.SourceDir, conf.TorrentClient, c, st, excludedFilesStore, infoBytesStore, conf.TorrentClient.AddTimeout, conf.TorrentClient.ReadTimeout)
ts, err := service.NewService(
conf.SourceDir, conf.TorrentClient,
c, st, excludedFilesStore, infoBytesStore,
)
if err != nil {
return fmt.Errorf("error creating service: %w", err)
}

4
go.mod
View file

@ -4,7 +4,6 @@ go 1.22.1
require (
github.com/99designs/gqlgen v0.17.43
github.com/RoaringBitmap/roaring v1.2.3
github.com/agoda-com/opentelemetry-go/otelslog v0.1.1
github.com/agoda-com/opentelemetry-logs-go v0.3.0
github.com/anacrolix/dht/v2 v2.21.1
@ -43,12 +42,14 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.46.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/sdk/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/multierr v1.11.0
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
golang.org/x/net v0.19.0
)
require (
github.com/RoaringBitmap/roaring v1.2.3 // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
@ -162,7 +163,6 @@ require (
go.opentelemetry.io/contrib v1.21.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/arch v0.3.0 // indirect

48
pkg/ctxio/reader.go Normal file
View file

@ -0,0 +1,48 @@
package ctxio
import (
"context"
"io"
)
type ReaderAtCloser interface {
ReaderAt
Closer
}
type ReaderAt interface {
ReadAt(ctx context.Context, p []byte, off int64) (n int, err error)
}
type Reader interface {
Read(ctx context.Context, p []byte) (n int, err error)
}
type Closer interface {
Close(ctx context.Context) error
}
type contextReader struct {
ctx context.Context
r Reader
}
func (r *contextReader) Read(p []byte) (n int, err error) {
return r.r.Read(r.ctx, p)
}
func IoReaderAt(ctx context.Context, r ReaderAt) io.ReaderAt {
return &contextReaderAt{ctx: ctx, r: r}
}
type contextReaderAt struct {
ctx context.Context
r ReaderAt
}
func (c *contextReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
return c.r.ReadAt(c.ctx, p, off)
}
func IoReader(ctx context.Context, r Reader) io.Reader {
return &contextReader{ctx: ctx, r: r}
}

102
pkg/ctxio/seeker.go Normal file
View file

@ -0,0 +1,102 @@
package ctxio
import (
"context"
"io"
"sync"
)
type ioSeekerWrapper struct {
ctx context.Context
mu sync.Mutex
pos int64
size int64
r ReaderAt
}
func IoReadSeekerWrapper(ctx context.Context, r ReaderAt, size int64) io.ReadSeeker {
return &ioSeekerWrapper{
ctx: ctx,
r: r,
size: size,
}
}
func (r *ioSeekerWrapper) Seek(offset int64, whence int) (int64, error) {
r.mu.Lock()
defer r.mu.Unlock()
switch whence {
case io.SeekStart:
r.pos = offset
case io.SeekCurrent:
r.pos = r.pos + offset
case io.SeekEnd:
r.pos = r.size + offset
}
return r.pos, nil
}
func (r *ioSeekerWrapper) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
n, err := r.r.ReadAt(r.ctx, p, r.pos)
r.pos += int64(n)
return n, err
}
var _ io.ReadSeekCloser = (*ioSeekerCloserWrapper)(nil)
type ioSeekerCloserWrapper struct {
ctx context.Context
mu sync.Mutex
pos int64
size int64
r ReaderAtCloser
}
func IoReadSeekCloserWrapper(ctx context.Context, r ReaderAtCloser, size int64) io.ReadSeekCloser {
return &ioSeekerCloserWrapper{
ctx: ctx,
r: r,
size: size,
}
}
func (r *ioSeekerCloserWrapper) Seek(offset int64, whence int) (int64, error) {
r.mu.Lock()
defer r.mu.Unlock()
switch whence {
case io.SeekStart:
r.pos = offset
case io.SeekCurrent:
r.pos = r.pos + offset
case io.SeekEnd:
r.pos = r.size + offset
}
return r.pos, nil
}
func (r *ioSeekerCloserWrapper) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
n, err := r.r.ReadAt(r.ctx, p, r.pos)
r.pos += int64(n)
return n, err
}
// Close implements io.ReadSeekCloser.
func (r *ioSeekerCloserWrapper) Close() error {
return r.r.Close(r.ctx)
}

View file

@ -33,8 +33,8 @@ var defaultConfig = Config{
// GlobalCacheSize: 2048,
AddTimeout: 60,
ReadTimeout: 120,
// AddTimeout: 60,
// ReadTimeout: 120,
},
Log: Log{

View file

@ -26,8 +26,8 @@ type Log struct {
}
type TorrentClient struct {
ReadTimeout int `koanf:"read_timeout,omitempty"`
AddTimeout int `koanf:"add_timeout,omitempty"`
// ReadTimeout int `koanf:"read_timeout,omitempty"`
// AddTimeout int `koanf:"add_timeout,omitempty"`
DHTNodes []string `koanf:"dhtnodes,omitempty"`
DisableIPv6 bool `koanf:"disable_ipv6,omitempty"`

View file

@ -66,7 +66,7 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt
// FsListDir is the resolver for the fsListDir field.
func (r *queryResolver) FsListDir(ctx context.Context, path string) ([]model.DirEntry, error) {
entries, err := r.VFS.ReadDir(path)
entries, err := r.VFS.ReadDir(ctx, path)
if err != nil {
return nil, err
}

View file

@ -3,6 +3,7 @@
package fuse
import (
"context"
"errors"
"io"
"log/slog"
@ -104,7 +105,7 @@ func (fs *fuseFS) Read(path string, dest []byte, off int64, fh uint64) int {
buf := dest[:end]
n, err := file.ReadAt(buf, off)
n, err := file.ReadAt(context.TODO(), buf, off)
if err != nil && err != io.EOF {
log.Error("error reading data")
return -fuse.EIO
@ -178,7 +179,7 @@ func (fh *fileHandler) ListDir(path string) ([]string, error) {
fh.mu.RLock()
defer fh.mu.RUnlock()
files, err := fh.fs.ReadDir(path)
files, err := fh.fs.ReadDir(context.TODO(), path)
if err != nil {
return nil, err
}
@ -237,7 +238,7 @@ func (fh *fileHandler) Remove(fhi uint64) error {
return ErrHolderEmpty
}
if err := f.Close(); err != nil {
if err := f.Close(context.TODO()); err != nil {
return err
}
@ -247,7 +248,7 @@ func (fh *fileHandler) Remove(fhi uint64) error {
}
func (fh *fileHandler) lookupFile(path string) (vfs.File, error) {
file, err := fh.fs.Open(path)
file, err := fh.fs.Open(context.TODO(), path)
if err != nil {
return nil, err
}

View file

@ -1,18 +1,24 @@
package httpfs
import (
"context"
"io"
"io/fs"
"net/http"
"os"
"sync"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/iio"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var _ http.FileSystem = &HTTPFS{}
var httpFsTracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/export/httpfs.HTTPFS")
type HTTPFS struct {
fs vfs.Filesystem
}
@ -21,8 +27,16 @@ func NewHTTPFS(fs vfs.Filesystem) *HTTPFS {
return &HTTPFS{fs: fs}
}
func (fs *HTTPFS) ctx() context.Context {
return context.Background()
}
func (hfs *HTTPFS) Open(name string) (http.File, error) {
f, err := hfs.fs.Open(name)
ctx, span := httpFsTracer.Start(hfs.ctx(), "Open",
trace.WithAttributes(attribute.String("name", name)),
)
defer span.End()
f, err := hfs.fs.Open(ctx, name)
if err != nil {
return nil, err
}
@ -36,11 +50,16 @@ func (hfs *HTTPFS) Open(name string) (http.File, error) {
}
}
return newHTTPFile(f, fis), nil
return newHTTPFile(ctx, f, fis), nil
}
func (hfs *HTTPFS) filesToFileInfo(name string) ([]fs.FileInfo, error) {
files, err := hfs.fs.ReadDir(name)
ctx, span := httpFsTracer.Start(hfs.ctx(), "Open",
trace.WithAttributes(attribute.String("name", name)),
)
defer span.End()
files, err := hfs.fs.ReadDir(ctx, name)
if err != nil {
return nil, err
}
@ -62,7 +81,7 @@ var _ http.File = &httpFile{}
type httpFile struct {
f vfs.File
iio.ReaderSeeker
io.ReadSeekCloser
mu sync.Mutex
// dirPos is protected by mu.
@ -70,11 +89,11 @@ type httpFile struct {
dirContent []os.FileInfo
}
func newHTTPFile(f vfs.File, dirContent []os.FileInfo) *httpFile {
func newHTTPFile(ctx context.Context, f vfs.File, dirContent []os.FileInfo) *httpFile {
return &httpFile{
f: f,
dirContent: dirContent,
ReaderSeeker: iio.NewSeekerWrapper(f, f.Size()),
f: f,
dirContent: dirContent,
ReadSeekCloser: ctxio.IoReadSeekCloserWrapper(ctx, f, f.Size()),
}
}

View file

@ -1,6 +1,7 @@
package nfs
import (
"context"
"errors"
"io/fs"
"log/slog"
@ -8,8 +9,13 @@ import (
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/go-git/go-billy/v5"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var billyFsTracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/export/nfs.billyFsWrapper")
type billyFsWrapper struct {
fs vfs.Filesystem
log *slog.Logger
@ -18,6 +24,10 @@ type billyFsWrapper struct {
var _ billy.Filesystem = (*billyFsWrapper)(nil)
var _ billy.Dir = (*billyFsWrapper)(nil)
func (*billyFsWrapper) ctx() context.Context {
return context.Background()
}
// Chroot implements billy.Filesystem.
func (*billyFsWrapper) Chroot(path string) (billy.Filesystem, error) {
return nil, billy.ErrNotSupported
@ -35,9 +45,12 @@ func (*billyFsWrapper) Join(elem ...string) string {
// Lstat implements billy.Filesystem.
func (fs *billyFsWrapper) Lstat(filename string) (fs.FileInfo, error) {
info, err := fs.fs.Stat(filename)
ctx, span := billyFsTracer.Start(fs.ctx(), "Lstat", trace.WithAttributes(attribute.String("filename", filename)))
defer span.End()
info, err := fs.fs.Stat(ctx, filename)
if err != nil {
return nil, billyErr(err, fs.log)
return nil, billyErr(ctx, err, fs.log)
}
return info, nil
}
@ -49,9 +62,14 @@ func (*billyFsWrapper) MkdirAll(filename string, perm fs.FileMode) error {
// Open implements billy.Filesystem.
func (fs *billyFsWrapper) Open(filename string) (billy.File, error) {
file, err := fs.fs.Open(filename)
ctx, span := billyFsTracer.Start(fs.ctx(), "Open",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
file, err := fs.fs.Open(ctx, filename)
if err != nil {
return nil, billyErr(err, fs.log)
return nil, billyErr(ctx, err, fs.log)
}
return &billyFile{
name: filename,
@ -62,9 +80,14 @@ func (fs *billyFsWrapper) Open(filename string) (billy.File, error) {
// OpenFile implements billy.Filesystem.
func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode) (billy.File, error) {
file, err := fs.fs.Open(filename)
ctx, span := billyFsTracer.Start(fs.ctx(), "OpenFile",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
file, err := fs.fs.Open(ctx, filename)
if err != nil {
return nil, billyErr(err, fs.log)
return nil, billyErr(ctx, err, fs.log)
}
return &billyFile{
name: filename,
@ -75,9 +98,14 @@ func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode)
// ReadDir implements billy.Filesystem.
func (bfs *billyFsWrapper) ReadDir(path string) ([]fs.FileInfo, error) {
ffs, err := bfs.fs.ReadDir(path)
ctx, span := billyFsTracer.Start(bfs.ctx(), "OpenFile",
trace.WithAttributes(attribute.String("path", path)),
)
defer span.End()
ffs, err := bfs.fs.ReadDir(ctx, path)
if err != nil {
return nil, billyErr(err, bfs.log)
return nil, billyErr(ctx, err, bfs.log)
}
out := make([]fs.FileInfo, 0, len(ffs))
@ -102,8 +130,13 @@ func (*billyFsWrapper) Readlink(link string) (string, error) {
}
// Remove implements billy.Filesystem.
func (s *billyFsWrapper) Remove(filename string) error {
return s.fs.Unlink(filename)
func (bfs *billyFsWrapper) Remove(filename string) error {
ctx, span := billyFsTracer.Start(bfs.ctx(), "Remove",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
return bfs.fs.Unlink(ctx, filename)
}
// Rename implements billy.Filesystem.
@ -117,25 +150,32 @@ func (*billyFsWrapper) Root() string {
}
// Stat implements billy.Filesystem.
func (fs *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) {
info, err := fs.fs.Stat(filename)
func (bfs *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) {
ctx, span := billyFsTracer.Start(bfs.ctx(), "Remove",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
info, err := bfs.fs.Stat(ctx, filename)
if err != nil {
return nil, billyErr(err, fs.log)
return nil, billyErr(ctx, err, bfs.log)
}
return info, nil
}
// Symlink implements billy.Filesystem.
func (fs *billyFsWrapper) Symlink(target string, link string) error {
return billyErr(vfs.ErrNotImplemented, fs.log)
return billyErr(nil, vfs.ErrNotImplemented, fs.log)
}
// TempFile implements billy.Filesystem.
func (fs *billyFsWrapper) TempFile(dir string, prefix string) (billy.File, error) {
return nil, billyErr(vfs.ErrNotImplemented, fs.log)
return nil, billyErr(nil, vfs.ErrNotImplemented, fs.log)
}
type billyFile struct {
ctx context.Context
name string
file vfs.File
log *slog.Logger
@ -154,28 +194,47 @@ func (f *billyFile) Name() string {
}
// Read implements billy.File.
func (f *billyFile) Read(p []byte) (n int, err error) {
return f.file.Read(p)
func (bf *billyFile) Read(p []byte) (n int, err error) {
ctx, span := billyFsTracer.Start(bf.ctx, "Read",
trace.WithAttributes(attribute.Int("length", len(p))),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
return bf.file.Read(ctx, p)
}
// ReadAt implements billy.File.
func (f *billyFile) ReadAt(p []byte, off int64) (n int, err error) {
return f.file.ReadAt(p, off)
func (bf *billyFile) ReadAt(p []byte, off int64) (n int, err error) {
ctx, span := billyFsTracer.Start(bf.ctx, "Read",
trace.WithAttributes(
attribute.Int("length", len(p)),
attribute.Int64("offset", off),
),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
return bf.file.ReadAt(ctx, p, off)
}
// Seek implements billy.File.
func (f *billyFile) Seek(offset int64, whence int) (int64, error) {
return 0, billyErr(vfs.ErrNotImplemented, f.log)
return 0, billyErr(nil, vfs.ErrNotImplemented, f.log)
}
// Truncate implements billy.File.
func (f *billyFile) Truncate(size int64) error {
return billyErr(vfs.ErrNotImplemented, f.log)
return billyErr(nil, vfs.ErrNotImplemented, f.log)
}
// Write implements billy.File.
func (f *billyFile) Write(p []byte) (n int, err error) {
return 0, billyErr(vfs.ErrNotImplemented, f.log)
return 0, billyErr(nil, vfs.ErrNotImplemented, f.log)
}
// Lock implements billy.File.
@ -188,13 +247,13 @@ func (*billyFile) Unlock() error {
return nil // TODO
}
func billyErr(err error, log *slog.Logger) error {
func billyErr(ctx context.Context, err error, log *slog.Logger) error {
if errors.Is(err, vfs.ErrNotImplemented) {
return billy.ErrNotSupported
}
if errors.Is(err, vfs.ErrNotExist) {
if err, ok := asErr[*fs.PathError](err); ok {
log.Error("file not found", "op", err.Op, "path", err.Path, "error", err.Err)
log.ErrorContext(ctx, "file not found", "op", err.Op, "path", err.Path, "error", err.Err)
}
return fs.ErrNotExist
}

View file

@ -10,7 +10,6 @@ import (
"time"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/iio"
"golang.org/x/net/webdav"
)
@ -28,19 +27,19 @@ func (wd *WebDAV) OpenFile(ctx context.Context, name string, flag int, perm os.F
name = vfs.AbsPath(name)
// TODO handle flag and permissions
f, err := wd.lookupFile(name)
f, err := wd.lookupFile(ctx, name)
if err != nil {
return nil, err
}
wdf := newFile(path.Base(name), f, func() ([]fs.FileInfo, error) {
return wd.listDir(name)
wdf := newFile(ctx, path.Base(name), f, func() ([]fs.FileInfo, error) {
return wd.listDir(ctx, name)
})
return wdf, nil
}
func (wd *WebDAV) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
return wd.fs.Stat(vfs.AbsPath(name))
return wd.fs.Stat(ctx, vfs.AbsPath(name))
}
func (wd *WebDAV) Mkdir(ctx context.Context, name string, perm fs.FileMode) error {
@ -48,19 +47,19 @@ func (wd *WebDAV) Mkdir(ctx context.Context, name string, perm fs.FileMode) erro
}
func (wd *WebDAV) RemoveAll(ctx context.Context, name string) error {
return wd.fs.Unlink(name)
return wd.fs.Unlink(ctx, name)
}
func (wd *WebDAV) Rename(ctx context.Context, oldName, newName string) error {
return webdav.ErrNotImplemented
}
func (wd *WebDAV) lookupFile(name string) (vfs.File, error) {
return wd.fs.Open(path.Clean(name))
func (wd *WebDAV) lookupFile(ctx context.Context, name string) (vfs.File, error) {
return wd.fs.Open(ctx, path.Clean(name))
}
func (wd *WebDAV) listDir(path string) ([]os.FileInfo, error) {
files, err := wd.fs.ReadDir(path)
func (wd *WebDAV) listDir(ctx context.Context, path string) ([]os.FileInfo, error) {
files, err := wd.fs.ReadDir(ctx, path)
if err != nil {
return nil, err
}
@ -80,9 +79,10 @@ func (wd *WebDAV) listDir(path string) ([]os.FileInfo, error) {
var _ webdav.File = &webDAVFile{}
type webDAVFile struct {
iio.Reader
ctx context.Context
fi os.FileInfo
f vfs.File
mudp sync.Mutex
dirPos int
@ -93,11 +93,11 @@ type webDAVFile struct {
dirContent []os.FileInfo
}
func newFile(name string, f vfs.File, df func() ([]os.FileInfo, error)) *webDAVFile {
func newFile(ctx context.Context, name string, f vfs.File, df func() ([]os.FileInfo, error)) *webDAVFile {
return &webDAVFile{
ctx: ctx,
fi: newFileInfo(name, f.Size(), f.IsDir()),
dirFunc: df,
Reader: f,
}
}
@ -147,7 +147,7 @@ func (wdf *webDAVFile) Read(p []byte) (int, error) {
wdf.mup.Lock()
defer wdf.mup.Unlock()
n, err := wdf.Reader.ReadAt(p, wdf.pos)
n, err := wdf.f.ReadAt(wdf.ctx, p, wdf.pos)
wdf.pos += int64(n)
return n, err
@ -173,6 +173,11 @@ func (wdf *webDAVFile) Write(p []byte) (n int, err error) {
return 0, webdav.ErrNotImplemented
}
// Close implements webdav.File.
func (wdf *webDAVFile) Close() error {
return wdf.f.Close(wdf.ctx)
}
type webDAVFileInfo struct {
name string
size int64

View file

@ -9,8 +9,8 @@ import (
"path/filepath"
"slices"
"strings"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"git.kmsign.ru/royalcat/tstor/src/host/datastorage"
@ -46,13 +46,11 @@ type Service struct {
dirsAquire kv.Store[string, DirAquire]
log *slog.Logger
addTimeout, readTimeout int
log *slog.Logger
}
func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
storage datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes,
addTimeout, readTimeout int,
) (*Service, error) {
dirsAcquire, err := kv.NewBadgerKV[string, DirAquire](path.Join(cfg.MetadataFolder, "dir-acquire"))
if err != nil {
@ -70,8 +68,6 @@ func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
torrentLoaded: make(chan struct{}),
dirsAquire: dirsAcquire,
// stats: newStats(), // TODO persistent
addTimeout: addTimeout,
readTimeout: readTimeout,
}
go func() {
@ -94,14 +90,14 @@ func (s *Service) Close() error {
}
func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) {
defer f.Close()
defer f.Close(ctx)
stat, err := f.Stat()
if err != nil {
return nil, fmt.Errorf("call stat failed: %w", err)
}
mi, err := metainfo.Load(f)
mi, err := metainfo.Load(ctxio.IoReader(ctx, f))
if err != nil {
return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err)
}
@ -291,10 +287,8 @@ func isValidInfoHashBytes(d []byte) bool {
return err == nil
}
func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*time.Duration(s.addTimeout))
defer cancel()
defer f.Close()
func (s *Service) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
defer f.Close(ctx)
info, err := f.Stat()
if err != nil {
@ -306,7 +300,7 @@ func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) {
return nil, err
}
return vfs.NewTorrentFs(info.Name(), controller.NewTorrent(t, s.excludedFiles), s.readTimeout), nil
return vfs.NewTorrentFs(info.Name(), controller.NewTorrent(t, s.excludedFiles)), nil
}
func (s *Service) Stats() (*Stats, error) {
@ -333,7 +327,7 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
if strings.HasSuffix(path, ".torrent") {
file := vfs.NewLazyOsFile(path)
defer file.Close()
defer file.Close(ctx)
_, err = s.AddTorrent(ctx, file)
if err != nil {

View file

@ -2,6 +2,7 @@ package vfs
import (
"archive/zip"
"context"
"io"
"io/fs"
"os"
@ -9,56 +10,57 @@ import (
"path/filepath"
"strings"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/bodgit/sevenzip"
"github.com/nwaples/rardecode/v2"
)
var ArchiveFactories = map[string]FsFactory{
".zip": func(f File) (Filesystem, error) {
".zip": func(ctx context.Context, f File) (Filesystem, error) {
stat, err := f.Stat()
if err != nil {
return nil, err
}
return NewArchive(stat.Name(), f, stat.Size(), ZipLoader), nil
return NewArchive(ctx, stat.Name(), f, stat.Size(), ZipLoader), nil
},
".rar": func(f File) (Filesystem, error) {
".rar": func(ctx context.Context, f File) (Filesystem, error) {
stat, err := f.Stat()
if err != nil {
return nil, err
}
return NewArchive(stat.Name(), f, stat.Size(), RarLoader), nil
return NewArchive(ctx, stat.Name(), f, stat.Size(), RarLoader), nil
},
".7z": func(f File) (Filesystem, error) {
".7z": func(ctx context.Context, f File) (Filesystem, error) {
stat, err := f.Stat()
if err != nil {
return nil, err
}
return NewArchive(stat.Name(), f, stat.Size(), SevenZipLoader), nil
return NewArchive(ctx, stat.Name(), f, stat.Size(), SevenZipLoader), nil
},
}
type archiveLoader func(r iio.Reader, size int64) (map[string]*archiveFile, error)
type archiveLoader func(ctx context.Context, r ctxio.ReaderAt, size int64) (map[string]*archiveFile, error)
var _ Filesystem = &ArchiveFS{}
type ArchiveFS struct {
name string
r iio.Reader
r ctxio.ReaderAt
Size int64
files func() (map[string]File, error)
}
func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *ArchiveFS {
func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) *ArchiveFS {
return &ArchiveFS{
name: name,
r: r,
Size: size,
files: OnceValueWOErr(func() (map[string]File, error) {
zipFiles, err := loader(r, size)
zipFiles, err := loader(ctx, r, size)
if err != nil {
return nil, err
}
@ -94,11 +96,11 @@ func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *Ar
}
// Unlink implements Filesystem.
func (a *ArchiveFS) Unlink(filename string) error {
func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error {
return ErrNotImplemented
}
func (a *ArchiveFS) Open(filename string) (File, error) {
func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) {
files, err := a.files()
if err != nil {
return nil, err
@ -107,7 +109,7 @@ func (a *ArchiveFS) Open(filename string) (File, error) {
return getFile(files, filename)
}
func (fs *ArchiveFS) ReadDir(path string) ([]fs.DirEntry, error) {
func (fs *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
files, err := fs.files()
if err != nil {
return nil, err
@ -117,7 +119,7 @@ func (fs *ArchiveFS) ReadDir(path string) ([]fs.DirEntry, error) {
}
// Stat implements Filesystem.
func (afs *ArchiveFS) Stat(filename string) (fs.FileInfo, error) {
func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
files, err := afs.files()
if err != nil {
return nil, err
@ -204,7 +206,7 @@ func (d *archiveFile) IsDir() bool {
return false
}
func (d *archiveFile) Close() (err error) {
func (d *archiveFile) Close(ctx context.Context) (err error) {
if d.reader != nil {
err = d.reader.Close()
d.reader = nil
@ -213,7 +215,7 @@ func (d *archiveFile) Close() (err error) {
return
}
func (d *archiveFile) Read(p []byte) (n int, err error) {
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
@ -221,7 +223,7 @@ func (d *archiveFile) Read(p []byte) (n int, err error) {
return d.reader.Read(p)
}
func (d *archiveFile) ReadAt(p []byte, off int64) (n int, err error) {
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
@ -231,7 +233,9 @@ func (d *archiveFile) ReadAt(p []byte, off int64) (n int, err error) {
var _ archiveLoader = ZipLoader
func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := zip.NewReader(reader, size)
if err != nil {
return nil, err
@ -261,7 +265,9 @@ func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
var _ archiveLoader = SevenZipLoader
func SevenZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader)
r, err := sevenzip.NewReader(reader, size)
if err != nil {
return nil, err
@ -294,8 +300,10 @@ func SevenZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, err
var _ archiveLoader = RarLoader
func RarLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
r, err := rardecode.NewReader(iio.NewSeekerWrapper(reader, size))
func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
reader := ctxio.IoReadSeekerWrapper(ctx, ctxreader, size)
r, err := rardecode.NewReader(reader)
if err != nil {
return nil, err
}

View file

@ -3,10 +3,11 @@ package vfs
import (
"archive/zip"
"bytes"
"context"
"io"
"testing"
"git.kmsign.ru/royalcat/tstor/src/iio"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"github.com/stretchr/testify/require"
)
@ -18,10 +19,12 @@ func TestZipFilesystem(t *testing.T) {
zReader, size := createTestZip(require)
// TODO add single dir collapse test
zfs := NewArchive("test", zReader, size, ZipLoader)
ctx := context.Background()
files, err := zfs.ReadDir("/path/to/test/file")
// TODO add single dir collapse test
zfs := NewArchive(ctx, "test", zReader, size, ZipLoader)
files, err := zfs.ReadDir(ctx, "/path/to/test/file")
require.NoError(err)
require.Len(files, 1)
@ -30,16 +33,16 @@ func TestZipFilesystem(t *testing.T) {
require.NotNil(e)
out := make([]byte, 11)
f, err := zfs.Open("/path/to/test/file/1.txt")
f, err := zfs.Open(ctx, "/path/to/test/file/1.txt")
require.NoError(err)
n, err := f.Read(out)
n, err := f.Read(ctx, out)
require.Equal(io.EOF, err)
require.Equal(11, n)
require.Equal(fileContent, out)
}
func createTestZip(require *require.Assertions) (iio.Reader, int64) {
func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) {
buf := bytes.NewBuffer([]byte{})
zWriter := zip.NewWriter(buf)
@ -56,15 +59,16 @@ func createTestZip(require *require.Assertions) (iio.Reader, int64) {
}
type closeableByteReader struct {
*bytes.Reader
data *bytes.Reader
}
func newCBR(b []byte) *closeableByteReader {
return &closeableByteReader{
Reader: bytes.NewReader(b),
data: bytes.NewReader(b),
}
}
func (*closeableByteReader) Close() error {
return nil
// ReadAt implements ctxio.ReaderAt.
func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return c.data.ReadAt(p, off)
}

View file

@ -1,6 +1,7 @@
package vfs
import (
"context"
"io/fs"
"path"
)
@ -30,14 +31,14 @@ func (d *dir) IsDir() bool {
return true
}
func (d *dir) Close() error {
func (d *dir) Close(ctx context.Context) error {
return nil
}
func (d *dir) Read(p []byte) (n int, err error) {
func (d *dir) Read(ctx context.Context, p []byte) (n int, err error) {
return 0, nil
}
func (d *dir) ReadAt(p []byte, off int64) (n int, err error) {
func (d *dir) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return 0, nil
}

View file

@ -1,12 +1,13 @@
package vfs
import (
"context"
"errors"
"io/fs"
"path"
"time"
"git.kmsign.ru/royalcat/tstor/src/iio"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
)
type File interface {
@ -14,7 +15,9 @@ type File interface {
Size() int64
Stat() (fs.FileInfo, error)
iio.Reader
ctxio.Reader
ctxio.ReaderAt
ctxio.Closer
}
var ErrNotImplemented = errors.New("not implemented")
@ -23,14 +26,14 @@ type Filesystem interface {
// Open opens the named file for reading. If successful, methods on the
// returned file can be used for reading; the associated file descriptor has
// mode O_RDONLY.
Open(filename string) (File, error)
Open(ctx context.Context, filename string) (File, error)
// ReadDir reads the directory named by dirname and returns a list of
// directory entries.
ReadDir(path string) ([]fs.DirEntry, error)
ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error)
Stat(filename string) (fs.FileInfo, error)
Unlink(filename string) error
Stat(ctx context.Context, filename string) (fs.FileInfo, error)
Unlink(ctx context.Context, filename string) error
fs.DirEntry
}

View file

@ -1,8 +1,10 @@
package vfs
import (
"context"
"io/fs"
"log/slog"
"reflect"
)
type LogFS struct {
@ -40,8 +42,8 @@ func (fs *LogFS) Type() fs.FileMode {
}
// Open implements Filesystem.
func (fs *LogFS) Open(filename string) (File, error) {
file, err := fs.fs.Open(filename)
func (fs *LogFS) Open(ctx context.Context, filename string) (File, error) {
file, err := fs.fs.Open(ctx, filename)
if err != nil {
fs.log.With("filename", filename).Error("Failed to open file")
}
@ -50,17 +52,17 @@ func (fs *LogFS) Open(filename string) (File, error) {
}
// ReadDir implements Filesystem.
func (fs *LogFS) ReadDir(path string) ([]fs.DirEntry, error) {
file, err := fs.fs.ReadDir(path)
func (fs *LogFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
file, err := fs.fs.ReadDir(ctx, path)
if err != nil {
fs.log.Error("Failed to read dir", "path", path, "error", err)
fs.log.ErrorContext(ctx, "Failed to read dir", "path", path, "error", err.Error(), "fs-type", reflect.TypeOf(fs.fs).Name())
}
return file, err
}
// Stat implements Filesystem.
func (fs *LogFS) Stat(filename string) (fs.FileInfo, error) {
file, err := fs.fs.Stat(filename)
func (fs *LogFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
file, err := fs.fs.Stat(ctx, filename)
if err != nil {
fs.log.Error("Failed to stat", "filename", filename, "error", err)
}
@ -68,8 +70,8 @@ func (fs *LogFS) Stat(filename string) (fs.FileInfo, error) {
}
// Unlink implements Filesystem.
func (fs *LogFS) Unlink(filename string) error {
err := fs.fs.Unlink(filename)
func (fs *LogFS) Unlink(ctx context.Context, filename string) error {
err := fs.fs.Unlink(ctx, filename)
if err != nil {
fs.log.Error("Failed to stat", "filename", filename, "error", err)
}
@ -91,8 +93,8 @@ func WrapLogFile(f File, filename string, log *slog.Logger) *LogFile {
}
// Close implements File.
func (f *LogFile) Close() error {
err := f.f.Close()
func (f *LogFile) Close(ctx context.Context) error {
err := f.f.Close(ctx)
if err != nil {
f.log.Error("Failed to close", "error", err)
}
@ -105,8 +107,8 @@ func (f *LogFile) IsDir() bool {
}
// Read implements File.
func (f *LogFile) Read(p []byte) (n int, err error) {
n, err = f.f.Read(p)
func (f *LogFile) Read(ctx context.Context, p []byte) (n int, err error) {
n, err = f.f.Read(ctx, p)
if err != nil {
f.log.Error("Failed to read", "error", err)
}
@ -114,8 +116,8 @@ func (f *LogFile) Read(p []byte) (n int, err error) {
}
// ReadAt implements File.
func (f *LogFile) ReadAt(p []byte, off int64) (n int, err error) {
n, err = f.f.ReadAt(p, off)
func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
n, err = f.f.ReadAt(ctx, p, off)
if err != nil {
f.log.Error("Failed to read", "offset", off, "error", err)
}

View file

@ -2,6 +2,7 @@ package vfs
import (
"bytes"
"context"
"io/fs"
"path"
)
@ -33,11 +34,6 @@ func (mfs *MemoryFs) Type() fs.FileMode {
return fs.ModeDir
}
// Unlink implements Filesystem.
func (fs *MemoryFs) Unlink(filename string) error {
return ErrNotImplemented
}
func NewMemoryFS(name string, files map[string]*MemoryFile) *MemoryFs {
return &MemoryFs{
name: name,
@ -45,16 +41,16 @@ func NewMemoryFS(name string, files map[string]*MemoryFile) *MemoryFs {
}
}
func (m *MemoryFs) Open(filename string) (File, error) {
func (m *MemoryFs) Open(ctx context.Context, filename string) (File, error) {
return getFile(m.files, filename)
}
func (fs *MemoryFs) ReadDir(path string) ([]fs.DirEntry, error) {
func (fs *MemoryFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
return listDirFromFiles(fs.files, path)
}
// Stat implements Filesystem.
func (mfs *MemoryFs) Stat(filename string) (fs.FileInfo, error) {
func (mfs *MemoryFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
file, ok := mfs.files[filename]
if !ok {
return nil, ErrNotExist
@ -62,32 +58,47 @@ func (mfs *MemoryFs) Stat(filename string) (fs.FileInfo, error) {
return newFileInfo(path.Base(filename), file.Size()), nil
}
var _ File = &MemoryFile{}
// Unlink implements Filesystem.
func (fs *MemoryFs) Unlink(ctx context.Context, filename string) error {
return ErrNotImplemented
}
var _ File = (*MemoryFile)(nil)
type MemoryFile struct {
name string
*bytes.Reader
data *bytes.Reader
}
func NewMemoryFile(name string, data []byte) *MemoryFile {
return &MemoryFile{
name: name,
Reader: bytes.NewReader(data),
name: name,
data: bytes.NewReader(data),
}
}
func (d *MemoryFile) Stat() (fs.FileInfo, error) {
return newFileInfo(d.name, int64(d.Reader.Len())), nil
return newFileInfo(d.name, int64(d.data.Len())), nil
}
func (d *MemoryFile) Size() int64 {
return int64(d.Reader.Len())
return int64(d.data.Len())
}
func (d *MemoryFile) IsDir() bool {
return false
}
func (d *MemoryFile) Close() (err error) {
func (d *MemoryFile) Close(ctx context.Context) (err error) {
return
}
// Read implements File.
func (d *MemoryFile) Read(ctx context.Context, p []byte) (n int, err error) {
return d.data.Read(p)
}
// ReadAt implements File.
func (d *MemoryFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return d.data.ReadAt(p, off)
}

View file

@ -1,6 +1,7 @@
package vfs
import (
"context"
"testing"
"github.com/stretchr/testify/require"
@ -11,6 +12,7 @@ func TestMemory(t *testing.T) {
require := require.New(t)
testData := "Hello"
ctx := context.Background()
c := NewMemoryFS("/", map[string]*MemoryFile{
"/dir/here": NewMemoryFile("here", []byte(testData)),
@ -23,23 +25,23 @@ func TestMemory(t *testing.T) {
// c, err := NewContainerFs(fss)
// require.NoError(err)
f, err := c.Open("/dir/here")
f, err := c.Open(ctx, "/dir/here")
require.NoError(err)
require.NotNil(f)
require.Equal(int64(5), f.Size())
require.NoError(f.Close())
require.NoError(f.Close(ctx))
data := make([]byte, 5)
n, err := f.Read(data)
n, err := f.Read(ctx, data)
require.NoError(err)
require.Equal(5, n)
require.Equal(string(data), testData)
files, err := c.ReadDir("/")
files, err := c.ReadDir(ctx, "/")
require.NoError(err)
require.Len(files, 1)
files, err = c.ReadDir("/dir")
files, err = c.ReadDir(ctx, "/dir")
require.NoError(err)
require.Len(files, 1)

View file

@ -1,6 +1,7 @@
package vfs
import (
"context"
"io/fs"
"os"
"path"
@ -12,7 +13,7 @@ type OsFS struct {
}
// Stat implements Filesystem.
func (fs *OsFS) Stat(filename string) (fs.FileInfo, error) {
func (fs *OsFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
if path.Clean(filename) == Separator {
return newDirInfo(Separator), nil
}
@ -21,12 +22,12 @@ func (fs *OsFS) Stat(filename string) (fs.FileInfo, error) {
}
// Unlink implements Filesystem.
func (fs *OsFS) Unlink(filename string) error {
func (fs *OsFS) Unlink(ctx context.Context, filename string) error {
return os.RemoveAll(path.Join(fs.hostDir, filename))
}
// Open implements Filesystem.
func (fs *OsFS) Open(filename string) (File, error) {
func (fs *OsFS) Open(ctx context.Context, filename string) (File, error) {
if path.Clean(filename) == Separator {
return NewDir(filename), nil
}
@ -35,7 +36,7 @@ func (fs *OsFS) Open(filename string) (File, error) {
}
// ReadDir implements Filesystem.
func (o *OsFS) ReadDir(dir string) ([]fs.DirEntry, error) {
func (o *OsFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) {
return os.ReadDir(path.Join(o.hostDir, dir))
}
@ -83,17 +84,17 @@ func (f *OsFile) Info() (fs.FileInfo, error) {
}
// Close implements File.
func (f *OsFile) Close() error {
func (f *OsFile) Close(ctx context.Context) error {
return f.f.Close()
}
// Read implements File.
func (f *OsFile) Read(p []byte) (n int, err error) {
func (f *OsFile) Read(ctx context.Context, p []byte) (n int, err error) {
return f.f.Read(p)
}
// ReadAt implements File.
func (f *OsFile) ReadAt(p []byte, off int64) (n int, err error) {
func (f *OsFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return f.f.ReadAt(p, off)
}
@ -151,7 +152,7 @@ func (f *LazyOsFile) open() error {
}
// Close implements File.
func (f *LazyOsFile) Close() error {
func (f *LazyOsFile) Close(ctx context.Context) error {
if f.file == nil {
return nil
}
@ -159,7 +160,7 @@ func (f *LazyOsFile) Close() error {
}
// Read implements File.
func (f *LazyOsFile) Read(p []byte) (n int, err error) {
func (f *LazyOsFile) Read(ctx context.Context, p []byte) (n int, err error) {
err = f.open()
if err != nil {
return 0, err
@ -168,7 +169,7 @@ func (f *LazyOsFile) Read(p []byte) (n int, err error) {
}
// ReadAt implements File.
func (f *LazyOsFile) ReadAt(p []byte, off int64) (n int, err error) {
func (f *LazyOsFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
err = f.open()
if err != nil {
return 0, err

View file

@ -1,6 +1,7 @@
package vfs
import (
"context"
"fmt"
"io/fs"
"path"
@ -22,41 +23,41 @@ func NewResolveFS(rootFs Filesystem, factories map[string]FsFactory) *ResolverFS
}
// Open implements Filesystem.
func (r *ResolverFS) Open(filename string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(filename, r.rootFS.Open)
func (r *ResolverFS) Open(ctx context.Context, filename string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.Open(nestedFsPath)
return nestedFs.Open(ctx, nestedFsPath)
}
return r.rootFS.Open(fsPath)
return r.rootFS.Open(ctx, fsPath)
}
// ReadDir implements Filesystem.
func (r *ResolverFS) ReadDir(dir string) ([]fs.DirEntry, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(dir, r.rootFS.Open)
func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, dir, r.rootFS.Open)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.ReadDir(nestedFsPath)
return nestedFs.ReadDir(ctx, nestedFsPath)
}
entries, err := r.rootFS.ReadDir(fsPath)
entries, err := r.rootFS.ReadDir(ctx, fsPath)
if err != nil {
return nil, err
}
out := make([]fs.DirEntry, 0, len(entries))
for _, e := range entries {
if r.resolver.isNestedFs(e.Name()) {
filepath := path.Join(dir, e.Name())
file, err := r.Open(filepath)
filepath := path.Join("/", dir, e.Name())
file, err := r.Open(ctx, filepath)
if err != nil {
return nil, err
}
nestedfs, err := r.resolver.nestedFs(filepath, file)
nestedfs, err := r.resolver.nestedFs(ctx, filepath, file)
if err != nil {
return nil, err
}
@ -70,29 +71,29 @@ func (r *ResolverFS) ReadDir(dir string) ([]fs.DirEntry, error) {
}
// Stat implements Filesystem.
func (r *ResolverFS) Stat(filename string) (fs.FileInfo, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(filename, r.rootFS.Open)
func (r *ResolverFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.Stat(nestedFsPath)
return nestedFs.Stat(ctx, nestedFsPath)
}
return r.rootFS.Stat(fsPath)
return r.rootFS.Stat(ctx, fsPath)
}
// Unlink implements Filesystem.
func (r *ResolverFS) Unlink(filename string) error {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(filename, r.rootFS.Open)
func (r *ResolverFS) Unlink(ctx context.Context, filename string) error {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open)
if err != nil {
return err
}
if nestedFs != nil {
return nestedFs.Unlink(nestedFsPath)
return nestedFs.Unlink(ctx, nestedFsPath)
}
return r.rootFS.Unlink(fsPath)
return r.rootFS.Unlink(ctx, fsPath)
}
// Info implements Filesystem.
@ -117,7 +118,7 @@ func (r *ResolverFS) Type() fs.FileMode {
var _ Filesystem = &ResolverFS{}
type FsFactory func(f File) (Filesystem, error)
type FsFactory func(ctx context.Context, f File) (Filesystem, error)
const Separator = "/"
@ -135,7 +136,7 @@ type resolver struct {
// TODO: add fsmap clean
}
type openFile func(path string) (File, error)
type openFile func(ctx context.Context, path string) (File, error)
func (r *resolver) isNestedFs(f string) bool {
for ext := range r.factories {
@ -146,7 +147,7 @@ func (r *resolver) isNestedFs(f string) bool {
return false
}
func (r *resolver) nestedFs(fsPath string, file File) (Filesystem, error) {
func (r *resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) {
for ext, nestFactory := range r.factories {
if !strings.HasSuffix(fsPath, ext) {
continue
@ -156,7 +157,7 @@ func (r *resolver) nestedFs(fsPath string, file File) (Filesystem, error) {
return nestedFs, nil
}
nestedFs, err := nestFactory(file)
nestedFs, err := nestFactory(ctx, file)
if err != nil {
return nil, fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}
@ -169,7 +170,7 @@ func (r *resolver) nestedFs(fsPath string, file File) (Filesystem, error) {
}
// open requeue raw open, without resolver call
func (r *resolver) resolvePath(name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) {
func (r *resolver) resolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) {
name = path.Clean(name)
name = strings.TrimPrefix(name, Separator)
parts := strings.Split(name, Separator)
@ -206,11 +207,11 @@ PARTS_LOOP:
if nestedFs, ok := r.fsmap[fsPath]; ok {
return fsPath, nestedFs, nestedFsPath, nil
} else {
fsFile, err := rawOpen(fsPath)
fsFile, err := rawOpen(ctx, fsPath)
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
nestedFs, err := nestFactory(fsFile)
nestedFs, err := nestFactory(ctx, fsFile)
if err != nil {
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}

View file

@ -1,6 +1,7 @@
package vfs
import (
"context"
"io/fs"
"os"
"path"
@ -26,15 +27,15 @@ func (d *Dummy) IsDir() bool {
return false
}
func (d *Dummy) Close() error {
func (d *Dummy) Close(ctx context.Context) error {
return nil
}
func (d *Dummy) Read(p []byte) (n int, err error) {
func (d *Dummy) Read(ctx context.Context, p []byte) (n int, err error) {
return 0, nil
}
func (d *Dummy) ReadAt(p []byte, off int64) (n int, err error) {
func (d *Dummy) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return 0, nil
}
@ -45,19 +46,19 @@ type DummyFs struct {
}
// Stat implements Filesystem.
func (*DummyFs) Stat(filename string) (fs.FileInfo, error) {
func (*DummyFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
return newFileInfo(path.Base(filename), 0), nil // TODO
}
func (d *DummyFs) Open(filename string) (File, error) {
func (d *DummyFs) Open(ctx context.Context, filename string) (File, error) {
return &Dummy{}, nil
}
func (d *DummyFs) Unlink(filename string) error {
func (d *DummyFs) Unlink(ctx context.Context, filename string) error {
return ErrNotImplemented
}
func (d *DummyFs) ReadDir(path string) ([]fs.DirEntry, error) {
func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
if path == "/dir/here" {
return []fs.DirEntry{
newFileInfo("file1.txt", 0),
@ -93,11 +94,13 @@ var _ Filesystem = &DummyFs{}
func TestResolver(t *testing.T) {
t.Parallel()
resolver := newResolver(ArchiveFactories)
ctx := context.Background()
t.Run("nested fs", func(t *testing.T) {
t.Parallel()
require := require.New(t)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("/f1.rar/f2.rar", func(path string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/f1.rar/f2.rar", func(_ context.Context, path string) (File, error) {
require.Equal("/f1.rar", path)
return &Dummy{}, nil
})
@ -110,7 +113,7 @@ func TestResolver(t *testing.T) {
t.Parallel()
require := require.New(t)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("/", func(path string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/", func(_ context.Context, path string) (File, error) {
require.Equal("/", path)
return &Dummy{}, nil
})
@ -124,7 +127,7 @@ func TestResolver(t *testing.T) {
t.Parallel()
require := require.New(t)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("//.//", func(path string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//", func(_ context.Context, path string) (File, error) {
require.Equal("/", path)
return &Dummy{}, nil
})
@ -137,7 +140,7 @@ func TestResolver(t *testing.T) {
t.Parallel()
require := require.New(t)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("//.//f1.rar", func(path string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//f1.rar", func(_ context.Context, path string) (File, error) {
require.Equal("/f1.rar", path)
return &Dummy{}, nil
})
@ -150,7 +153,7 @@ func TestResolver(t *testing.T) {
t.Parallel()
require := require.New(t)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("//test1/f1.rar", func(path string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//test1/f1.rar", func(_ context.Context, path string) (File, error) {
require.Equal("/test1/f1.rar", path)
return &Dummy{}, nil
})
@ -164,21 +167,23 @@ func TestResolver(t *testing.T) {
func TestArchiveFactories(t *testing.T) {
t.Parallel()
ctx := context.Background()
require := require.New(t)
require.Contains(ArchiveFactories, ".zip")
require.Contains(ArchiveFactories, ".rar")
require.Contains(ArchiveFactories, ".7z")
fs, err := ArchiveFactories[".zip"](&Dummy{})
fs, err := ArchiveFactories[".zip"](ctx, &Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = ArchiveFactories[".rar"](&Dummy{})
fs, err = ArchiveFactories[".rar"](ctx, &Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = ArchiveFactories[".7z"](&Dummy{})
fs, err = ArchiveFactories[".7z"](ctx, &Dummy{})
require.NoError(err)
require.NotNil(fs)
}

View file

@ -8,12 +8,9 @@ import (
"slices"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent"
"golang.org/x/exp/maps"
)
@ -26,19 +23,16 @@ type TorrentFs struct {
mu sync.Mutex
Torrent *controller.Torrent
readTimeout int
filesCache map[string]File
resolver *resolver
}
func NewTorrentFs(name string, c *controller.Torrent, readTimeout int) *TorrentFs {
func NewTorrentFs(name string, c *controller.Torrent) *TorrentFs {
return &TorrentFs{
name: name,
Torrent: c,
readTimeout: readTimeout,
resolver: newResolver(ArchiveFactories),
name: name,
Torrent: c,
resolver: newResolver(ArchiveFactories),
}
}
@ -64,7 +58,7 @@ func (tfs *TorrentFs) Type() fs.FileMode {
return fs.ModeDir
}
func (fs *TorrentFs) files() (map[string]File, error) {
func (fs *TorrentFs) files(ctx context.Context) (map[string]File, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
@ -81,26 +75,25 @@ func (fs *TorrentFs) files() (map[string]File, error) {
for _, file := range files {
file.Download()
p := AbsPath(file.Path())
fs.filesCache[p] = &torrentFile{
name: path.Base(p),
timeout: fs.readTimeout,
file: file,
tf, err := openTorrentFile(ctx, path.Base(p), file)
if err != nil {
return nil, err
}
fs.filesCache[p] = tf
}
// TODO optional
if len(fs.filesCache) == 1 && fs.resolver.isNestedFs(fs.Torrent.Name()) {
filepath := "/" + fs.Torrent.Name()
if file, ok := fs.filesCache[filepath]; ok {
nestedFs, err := fs.resolver.nestedFs(filepath, file)
nestedFs, err := fs.resolver.nestedFs(ctx, filepath, file)
if err != nil {
return nil, err
}
if nestedFs == nil {
goto DEFAULT_DIR // FIXME
}
fs.filesCache, err = listFilesRecursive(nestedFs, "/")
fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/")
if err != nil {
return nil, err
}
@ -130,40 +123,40 @@ DEFAULT_DIR:
return fs.filesCache, nil
}
func anyPeerHasFiles(file *torrent.File) bool {
for _, conn := range file.Torrent().PeerConns() {
if bitmapHaveFile(conn.PeerPieces(), file) {
return true
}
}
return false
}
// func anyPeerHasFiles(file *torrent.File) bool {
// for _, conn := range file.Torrent().PeerConns() {
// if bitmapHaveFile(conn.PeerPieces(), file) {
// return true
// }
// }
// return false
// }
func bitmapHaveFile(bitmap *roaring.Bitmap, file *torrent.File) bool {
for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
if !bitmap.ContainsInt(i) {
return false
}
}
return true
}
// func bitmapHaveFile(bitmap *roaring.Bitmap, file *torrent.File) bool {
// for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
// if !bitmap.ContainsInt(i) {
// return false
// }
// }
// return true
// }
func listFilesRecursive(vfs Filesystem, start string) (map[string]File, error) {
func listFilesRecursive(ctx context.Context, vfs Filesystem, start string) (map[string]File, error) {
out := make(map[string]File, 0)
entries, err := vfs.ReadDir(start)
entries, err := vfs.ReadDir(ctx, start)
if err != nil {
return nil, err
}
for _, entry := range entries {
filename := path.Join(start, entry.Name())
if entry.IsDir() {
rec, err := listFilesRecursive(vfs, filename)
rec, err := listFilesRecursive(ctx, vfs, filename)
if err != nil {
return nil, err
}
maps.Copy(out, rec)
} else {
file, err := vfs.Open(filename)
file, err := vfs.Open(ctx, filename)
if err != nil {
return nil, err
}
@ -174,8 +167,8 @@ func listFilesRecursive(vfs Filesystem, start string) (map[string]File, error) {
return out, nil
}
func (fs *TorrentFs) rawOpen(path string) (File, error) {
files, err := fs.files()
func (fs *TorrentFs) rawOpen(ctx context.Context, path string) (File, error) {
files, err := fs.files(ctx)
if err != nil {
return nil, err
}
@ -183,8 +176,8 @@ func (fs *TorrentFs) rawOpen(path string) (File, error) {
return file, err
}
func (fs *TorrentFs) rawStat(filename string) (fs.FileInfo, error) {
files, err := fs.files()
func (fs *TorrentFs) rawStat(ctx context.Context, filename string) (fs.FileInfo, error) {
files, err := fs.files(ctx)
if err != nil {
return nil, err
}
@ -196,43 +189,43 @@ func (fs *TorrentFs) rawStat(filename string) (fs.FileInfo, error) {
}
// Stat implements Filesystem.
func (fs *TorrentFs) Stat(filename string) (fs.FileInfo, error) {
func (fs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
if filename == Separator {
return newDirInfo(filename), nil
}
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(filename, fs.rawOpen)
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.Stat(nestedFsPath)
return nestedFs.Stat(ctx, nestedFsPath)
}
return fs.rawStat(fsPath)
return fs.rawStat(ctx, fsPath)
}
func (fs *TorrentFs) Open(filename string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(filename, fs.rawOpen)
func (fs *TorrentFs) Open(ctx context.Context, filename string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.Open(nestedFsPath)
return nestedFs.Open(ctx, nestedFsPath)
}
return fs.rawOpen(fsPath)
return fs.rawOpen(ctx, fsPath)
}
func (fs *TorrentFs) ReadDir(name string) ([]fs.DirEntry, error) {
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(name, fs.rawOpen)
func (fs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, name, fs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.ReadDir(nestedFsPath)
return nestedFs.ReadDir(ctx, nestedFsPath)
}
files, err := fs.files()
files, err := fs.files(ctx)
if err != nil {
return nil, err
}
@ -240,13 +233,13 @@ func (fs *TorrentFs) ReadDir(name string) ([]fs.DirEntry, error) {
return listDirFromFiles(files, fsPath)
}
func (fs *TorrentFs) Unlink(name string) error {
func (fs *TorrentFs) Unlink(ctx context.Context, name string) error {
name = AbsPath(name)
fs.mu.Lock()
defer fs.mu.Unlock()
files, err := fs.files()
files, err := fs.files(ctx)
if err != nil {
return err
}
@ -266,48 +259,84 @@ func (fs *TorrentFs) Unlink(name string) error {
return fs.Torrent.ExcludeFile(context.Background(), tfile.file)
}
type reader interface {
iio.Reader
missinggo.ReadContexter
var _ File = &torrentFile{}
type torrentFile struct {
name string
mu sync.Mutex
tr torrent.Reader
file *torrent.File
}
type readAtWrapper struct {
timeout int
mu sync.Mutex
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
select {
case <-file.Torrent().GotInfo():
break
case <-ctx.Done():
return nil, ctx.Err()
}
torrent.Reader
io.ReaderAt
io.Closer
r := file.NewReader()
r.SetReadahead(4096) // TODO configurable
r.SetResponsive()
return &torrentFile{
name: name,
tr: r,
file: file,
}, nil
}
func newReadAtWrapper(r torrent.Reader, timeout int) reader {
w := &readAtWrapper{Reader: r, timeout: timeout}
w.SetResponsive()
return w
func (tf *torrentFile) Stat() (fs.FileInfo, error) {
return newFileInfo(tf.name, tf.file.Length()), nil
}
func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) {
func (tf *torrentFile) Size() int64 {
return tf.file.Length()
}
func (tf *torrentFile) IsDir() bool {
return false
}
func (rw *torrentFile) Close(ctx context.Context) error {
rw.mu.Lock()
defer rw.mu.Unlock()
_, err := rw.Seek(off, io.SeekStart)
return rw.tr.Close()
}
// Read implements ctxio.Reader.
func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
tf.mu.Lock()
defer tf.mu.Unlock()
return tf.tr.ReadContext(ctx, p)
}
func (yf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
yf.mu.Lock()
defer yf.mu.Unlock()
_, err := yf.tr.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
return readAtLeast(rw, rw.timeout, p, len(p))
return readAtLeast(ctx, yf, p, len(p))
}
func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) {
func readAtLeast(ctx context.Context, r ctxio.Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()
nn, err = r.ReadContext(ctx, buf[n:])
nn, err = r.Read(ctx, buf[n:])
n += nn
}
if n >= min {
@ -317,63 +346,3 @@ func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n
}
return
}
func (rw *readAtWrapper) Close() error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.Reader.Close()
}
var _ File = &torrentFile{}
type torrentFile struct {
name string
reader reader
timeout int
file *torrent.File
}
func (d *torrentFile) Stat() (fs.FileInfo, error) {
return newFileInfo(d.name, d.file.Length()), nil
}
func (d *torrentFile) load() {
if d.reader != nil {
return
}
d.reader = newReadAtWrapper(d.file.NewReader(), d.timeout)
}
func (d *torrentFile) Size() int64 {
return d.file.Length()
}
func (d *torrentFile) IsDir() bool {
return false
}
func (d *torrentFile) Close() error {
var err error
if d.reader != nil {
err = d.reader.Close()
}
d.reader = nil
return err
}
func (d *torrentFile) Read(p []byte) (n int, err error) {
d.load()
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Second)
defer cancel()
return d.reader.ReadContext(ctx, p)
}
func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) {
d.load()
return d.reader.ReadAt(p, off)
}

View file

@ -1,6 +1,7 @@
package vfs
import (
"context"
"os"
"testing"
@ -87,6 +88,8 @@ func TestMain(m *testing.M) {
func TestReadAtTorrent(t *testing.T) {
t.Parallel()
ctx := context.Background()
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
@ -96,19 +99,18 @@ func TestReadAtTorrent(t *testing.T) {
torrFile := to.Files()[0]
tf := torrentFile{
file: torrFile,
timeout: 500,
file: torrFile,
}
defer tf.Close()
defer tf.Close(ctx)
toRead := make([]byte, 5)
n, err := tf.ReadAt(toRead, 6)
n, err := tf.ReadAt(ctx, toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = tf.ReadAt(toRead, 0)
n, err = tf.ReadAt(ctx, toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
@ -117,6 +119,8 @@ func TestReadAtTorrent(t *testing.T) {
func TestReadAtWrapper(t *testing.T) {
t.Parallel()
ctx := context.Background()
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
@ -125,16 +129,16 @@ func TestReadAtWrapper(t *testing.T) {
<-to.GotInfo()
torrFile := to.Files()[0]
r := newReadAtWrapper(torrFile.NewReader(), 10)
defer r.Close()
r, err := openTorrentFile(ctx, "file", torrFile)
defer r.Close(ctx)
toRead := make([]byte, 5)
n, err := r.ReadAt(toRead, 6)
n, err := r.ReadAt(ctx, toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = r.ReadAt(toRead, 0)
n, err = r.ReadAt(ctx, toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)

View file

@ -1,11 +1,12 @@
package iio_test
import (
"context"
"io"
"testing"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/stretchr/testify/require"
)
@ -14,11 +15,12 @@ var testData []byte = []byte("Hello World")
func TestSeekerWrapper(t *testing.T) {
t.Parallel()
ctx := context.Background()
require := require.New(t)
mf := vfs.NewMemoryFile("text.txt", testData)
r := iio.NewSeekerWrapper(mf, mf.Size())
r := ctxio.IoReadSeekCloserWrapper(ctx, mf, mf.Size())
defer r.Close()
n, err := r.Seek(6, io.SeekStart)