From 7b1863109c3a6b8d60320020cf4b062c335d697c Mon Sep 17 00:00:00 2001 From: royalcat Date: Thu, 21 Mar 2024 00:47:51 +0300 Subject: [PATCH] context fs --- cmd/tstor/main.go | 5 +- go.mod | 4 +- pkg/ctxio/reader.go | 48 ++++ pkg/ctxio/seeker.go | 102 ++++++++ src/config/default.go | 4 +- src/config/model.go | 4 +- .../graphql/resolver/query.resolvers.go | 2 +- src/export/fuse/mount.go | 9 +- src/export/httpfs/httpfs.go | 37 ++- src/export/nfs/wrapper-v3.go | 107 ++++++-- src/export/webdav/fs.go | 33 ++- src/host/service/service.go | 22 +- src/host/vfs/archive.go | 50 ++-- src/host/vfs/archive_test.go | 26 +- src/host/vfs/dir.go | 7 +- src/host/vfs/fs.go | 15 +- src/host/vfs/log.go | 32 +-- src/host/vfs/memory.go | 41 +-- src/host/vfs/memory_test.go | 12 +- src/host/vfs/os.go | 21 +- src/host/vfs/resolver.go | 53 ++-- src/host/vfs/resolver_test.go | 35 +-- src/host/vfs/torrent.go | 245 ++++++++---------- src/host/vfs/torrent_test.go | 22 +- src/iio/wrapper_test.go | 6 +- 25 files changed, 593 insertions(+), 349 deletions(-) create mode 100644 pkg/ctxio/reader.go create mode 100644 pkg/ctxio/seeker.go diff --git a/cmd/tstor/main.go b/cmd/tstor/main.go index 94979f5..f36f761 100644 --- a/cmd/tstor/main.go +++ b/cmd/tstor/main.go @@ -125,7 +125,10 @@ func run(configPath string) error { c.AddDhtNodes(conf.TorrentClient.DHTNodes) defer c.Close() - ts, err := service.NewService(conf.SourceDir, conf.TorrentClient, c, st, excludedFilesStore, infoBytesStore, conf.TorrentClient.AddTimeout, conf.TorrentClient.ReadTimeout) + ts, err := service.NewService( + conf.SourceDir, conf.TorrentClient, + c, st, excludedFilesStore, infoBytesStore, + ) if err != nil { return fmt.Errorf("error creating service: %w", err) } diff --git a/go.mod b/go.mod index 4d9c47f..e39f509 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.22.1 require ( github.com/99designs/gqlgen v0.17.43 - github.com/RoaringBitmap/roaring v1.2.3 github.com/agoda-com/opentelemetry-go/otelslog v0.1.1 github.com/agoda-com/opentelemetry-logs-go v0.3.0 github.com/anacrolix/dht/v2 v2.21.1 @@ -43,12 +42,14 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.46.0 go.opentelemetry.io/otel/sdk v1.24.0 go.opentelemetry.io/otel/sdk/metric v1.24.0 + go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/multierr v1.11.0 golang.org/x/exp v0.0.0-20231226003508-02704c960a9b golang.org/x/net v0.19.0 ) require ( + github.com/RoaringBitmap/roaring v1.2.3 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect github.com/alecthomas/atomic v0.1.0-alpha2 // indirect @@ -162,7 +163,6 @@ require ( go.opentelemetry.io/contrib v1.21.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/arch v0.3.0 // indirect diff --git a/pkg/ctxio/reader.go b/pkg/ctxio/reader.go new file mode 100644 index 0000000..2ba9f7d --- /dev/null +++ b/pkg/ctxio/reader.go @@ -0,0 +1,48 @@ +package ctxio + +import ( + "context" + "io" +) + +type ReaderAtCloser interface { + ReaderAt + Closer +} + +type ReaderAt interface { + ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) +} + +type Reader interface { + Read(ctx context.Context, p []byte) (n int, err error) +} + +type Closer interface { + Close(ctx context.Context) error +} +type contextReader struct { + ctx context.Context + r Reader +} + +func (r *contextReader) Read(p []byte) (n int, err error) { + return r.r.Read(r.ctx, p) +} + +func IoReaderAt(ctx context.Context, r ReaderAt) io.ReaderAt { + return &contextReaderAt{ctx: ctx, r: r} +} + +type contextReaderAt struct { + ctx context.Context + r ReaderAt +} + +func (c *contextReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + return c.r.ReadAt(c.ctx, p, off) +} + +func IoReader(ctx context.Context, r Reader) io.Reader { + return &contextReader{ctx: ctx, r: r} +} diff --git a/pkg/ctxio/seeker.go b/pkg/ctxio/seeker.go new file mode 100644 index 0000000..5284ecc --- /dev/null +++ b/pkg/ctxio/seeker.go @@ -0,0 +1,102 @@ +package ctxio + +import ( + "context" + "io" + "sync" +) + +type ioSeekerWrapper struct { + ctx context.Context + + mu sync.Mutex + pos int64 + size int64 + + r ReaderAt +} + +func IoReadSeekerWrapper(ctx context.Context, r ReaderAt, size int64) io.ReadSeeker { + return &ioSeekerWrapper{ + ctx: ctx, + r: r, + size: size, + } +} + +func (r *ioSeekerWrapper) Seek(offset int64, whence int) (int64, error) { + r.mu.Lock() + defer r.mu.Unlock() + + switch whence { + case io.SeekStart: + r.pos = offset + case io.SeekCurrent: + r.pos = r.pos + offset + case io.SeekEnd: + r.pos = r.size + offset + } + + return r.pos, nil +} + +func (r *ioSeekerWrapper) Read(p []byte) (int, error) { + r.mu.Lock() + defer r.mu.Unlock() + + n, err := r.r.ReadAt(r.ctx, p, r.pos) + r.pos += int64(n) + + return n, err +} + +var _ io.ReadSeekCloser = (*ioSeekerCloserWrapper)(nil) + +type ioSeekerCloserWrapper struct { + ctx context.Context + + mu sync.Mutex + pos int64 + size int64 + + r ReaderAtCloser +} + +func IoReadSeekCloserWrapper(ctx context.Context, r ReaderAtCloser, size int64) io.ReadSeekCloser { + return &ioSeekerCloserWrapper{ + ctx: ctx, + r: r, + size: size, + } +} + +func (r *ioSeekerCloserWrapper) Seek(offset int64, whence int) (int64, error) { + r.mu.Lock() + defer r.mu.Unlock() + + switch whence { + case io.SeekStart: + r.pos = offset + case io.SeekCurrent: + r.pos = r.pos + offset + case io.SeekEnd: + r.pos = r.size + offset + } + + return r.pos, nil +} + +func (r *ioSeekerCloserWrapper) Read(p []byte) (int, error) { + r.mu.Lock() + defer r.mu.Unlock() + + n, err := r.r.ReadAt(r.ctx, p, r.pos) + r.pos += int64(n) + + return n, err +} + +// Close implements io.ReadSeekCloser. +func (r *ioSeekerCloserWrapper) Close() error { + return r.r.Close(r.ctx) +} diff --git a/src/config/default.go b/src/config/default.go index 9a3ebcb..e75be06 100644 --- a/src/config/default.go +++ b/src/config/default.go @@ -33,8 +33,8 @@ var defaultConfig = Config{ // GlobalCacheSize: 2048, - AddTimeout: 60, - ReadTimeout: 120, + // AddTimeout: 60, + // ReadTimeout: 120, }, Log: Log{ diff --git a/src/config/model.go b/src/config/model.go index 3aaf665..d8e8c03 100644 --- a/src/config/model.go +++ b/src/config/model.go @@ -26,8 +26,8 @@ type Log struct { } type TorrentClient struct { - ReadTimeout int `koanf:"read_timeout,omitempty"` - AddTimeout int `koanf:"add_timeout,omitempty"` + // ReadTimeout int `koanf:"read_timeout,omitempty"` + // AddTimeout int `koanf:"add_timeout,omitempty"` DHTNodes []string `koanf:"dhtnodes,omitempty"` DisableIPv6 bool `koanf:"disable_ipv6,omitempty"` diff --git a/src/delivery/graphql/resolver/query.resolvers.go b/src/delivery/graphql/resolver/query.resolvers.go index 42a2726..a0a826d 100644 --- a/src/delivery/graphql/resolver/query.resolvers.go +++ b/src/delivery/graphql/resolver/query.resolvers.go @@ -66,7 +66,7 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt // FsListDir is the resolver for the fsListDir field. func (r *queryResolver) FsListDir(ctx context.Context, path string) ([]model.DirEntry, error) { - entries, err := r.VFS.ReadDir(path) + entries, err := r.VFS.ReadDir(ctx, path) if err != nil { return nil, err } diff --git a/src/export/fuse/mount.go b/src/export/fuse/mount.go index e6590be..9248b70 100644 --- a/src/export/fuse/mount.go +++ b/src/export/fuse/mount.go @@ -3,6 +3,7 @@ package fuse import ( + "context" "errors" "io" "log/slog" @@ -104,7 +105,7 @@ func (fs *fuseFS) Read(path string, dest []byte, off int64, fh uint64) int { buf := dest[:end] - n, err := file.ReadAt(buf, off) + n, err := file.ReadAt(context.TODO(), buf, off) if err != nil && err != io.EOF { log.Error("error reading data") return -fuse.EIO @@ -178,7 +179,7 @@ func (fh *fileHandler) ListDir(path string) ([]string, error) { fh.mu.RLock() defer fh.mu.RUnlock() - files, err := fh.fs.ReadDir(path) + files, err := fh.fs.ReadDir(context.TODO(), path) if err != nil { return nil, err } @@ -237,7 +238,7 @@ func (fh *fileHandler) Remove(fhi uint64) error { return ErrHolderEmpty } - if err := f.Close(); err != nil { + if err := f.Close(context.TODO()); err != nil { return err } @@ -247,7 +248,7 @@ func (fh *fileHandler) Remove(fhi uint64) error { } func (fh *fileHandler) lookupFile(path string) (vfs.File, error) { - file, err := fh.fs.Open(path) + file, err := fh.fs.Open(context.TODO(), path) if err != nil { return nil, err } diff --git a/src/export/httpfs/httpfs.go b/src/export/httpfs/httpfs.go index 303cf2d..8a7c188 100644 --- a/src/export/httpfs/httpfs.go +++ b/src/export/httpfs/httpfs.go @@ -1,18 +1,24 @@ package httpfs import ( + "context" "io" "io/fs" "net/http" "os" "sync" + "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/src/host/vfs" - "git.kmsign.ru/royalcat/tstor/src/iio" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var _ http.FileSystem = &HTTPFS{} +var httpFsTracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/export/httpfs.HTTPFS") + type HTTPFS struct { fs vfs.Filesystem } @@ -21,8 +27,16 @@ func NewHTTPFS(fs vfs.Filesystem) *HTTPFS { return &HTTPFS{fs: fs} } +func (fs *HTTPFS) ctx() context.Context { + return context.Background() +} + func (hfs *HTTPFS) Open(name string) (http.File, error) { - f, err := hfs.fs.Open(name) + ctx, span := httpFsTracer.Start(hfs.ctx(), "Open", + trace.WithAttributes(attribute.String("name", name)), + ) + defer span.End() + f, err := hfs.fs.Open(ctx, name) if err != nil { return nil, err } @@ -36,11 +50,16 @@ func (hfs *HTTPFS) Open(name string) (http.File, error) { } } - return newHTTPFile(f, fis), nil + return newHTTPFile(ctx, f, fis), nil } func (hfs *HTTPFS) filesToFileInfo(name string) ([]fs.FileInfo, error) { - files, err := hfs.fs.ReadDir(name) + ctx, span := httpFsTracer.Start(hfs.ctx(), "Open", + trace.WithAttributes(attribute.String("name", name)), + ) + defer span.End() + + files, err := hfs.fs.ReadDir(ctx, name) if err != nil { return nil, err } @@ -62,7 +81,7 @@ var _ http.File = &httpFile{} type httpFile struct { f vfs.File - iio.ReaderSeeker + io.ReadSeekCloser mu sync.Mutex // dirPos is protected by mu. @@ -70,11 +89,11 @@ type httpFile struct { dirContent []os.FileInfo } -func newHTTPFile(f vfs.File, dirContent []os.FileInfo) *httpFile { +func newHTTPFile(ctx context.Context, f vfs.File, dirContent []os.FileInfo) *httpFile { return &httpFile{ - f: f, - dirContent: dirContent, - ReaderSeeker: iio.NewSeekerWrapper(f, f.Size()), + f: f, + dirContent: dirContent, + ReadSeekCloser: ctxio.IoReadSeekCloserWrapper(ctx, f, f.Size()), } } diff --git a/src/export/nfs/wrapper-v3.go b/src/export/nfs/wrapper-v3.go index fc4f6b7..9891deb 100644 --- a/src/export/nfs/wrapper-v3.go +++ b/src/export/nfs/wrapper-v3.go @@ -1,6 +1,7 @@ package nfs import ( + "context" "errors" "io/fs" "log/slog" @@ -8,8 +9,13 @@ import ( "git.kmsign.ru/royalcat/tstor/src/host/vfs" "github.com/go-git/go-billy/v5" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) +var billyFsTracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/export/nfs.billyFsWrapper") + type billyFsWrapper struct { fs vfs.Filesystem log *slog.Logger @@ -18,6 +24,10 @@ type billyFsWrapper struct { var _ billy.Filesystem = (*billyFsWrapper)(nil) var _ billy.Dir = (*billyFsWrapper)(nil) +func (*billyFsWrapper) ctx() context.Context { + return context.Background() +} + // Chroot implements billy.Filesystem. func (*billyFsWrapper) Chroot(path string) (billy.Filesystem, error) { return nil, billy.ErrNotSupported @@ -35,9 +45,12 @@ func (*billyFsWrapper) Join(elem ...string) string { // Lstat implements billy.Filesystem. func (fs *billyFsWrapper) Lstat(filename string) (fs.FileInfo, error) { - info, err := fs.fs.Stat(filename) + ctx, span := billyFsTracer.Start(fs.ctx(), "Lstat", trace.WithAttributes(attribute.String("filename", filename))) + defer span.End() + + info, err := fs.fs.Stat(ctx, filename) if err != nil { - return nil, billyErr(err, fs.log) + return nil, billyErr(ctx, err, fs.log) } return info, nil } @@ -49,9 +62,14 @@ func (*billyFsWrapper) MkdirAll(filename string, perm fs.FileMode) error { // Open implements billy.Filesystem. func (fs *billyFsWrapper) Open(filename string) (billy.File, error) { - file, err := fs.fs.Open(filename) + ctx, span := billyFsTracer.Start(fs.ctx(), "Open", + trace.WithAttributes(attribute.String("filename", filename)), + ) + defer span.End() + + file, err := fs.fs.Open(ctx, filename) if err != nil { - return nil, billyErr(err, fs.log) + return nil, billyErr(ctx, err, fs.log) } return &billyFile{ name: filename, @@ -62,9 +80,14 @@ func (fs *billyFsWrapper) Open(filename string) (billy.File, error) { // OpenFile implements billy.Filesystem. func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode) (billy.File, error) { - file, err := fs.fs.Open(filename) + ctx, span := billyFsTracer.Start(fs.ctx(), "OpenFile", + trace.WithAttributes(attribute.String("filename", filename)), + ) + defer span.End() + + file, err := fs.fs.Open(ctx, filename) if err != nil { - return nil, billyErr(err, fs.log) + return nil, billyErr(ctx, err, fs.log) } return &billyFile{ name: filename, @@ -75,9 +98,14 @@ func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode) // ReadDir implements billy.Filesystem. func (bfs *billyFsWrapper) ReadDir(path string) ([]fs.FileInfo, error) { - ffs, err := bfs.fs.ReadDir(path) + ctx, span := billyFsTracer.Start(bfs.ctx(), "OpenFile", + trace.WithAttributes(attribute.String("path", path)), + ) + defer span.End() + + ffs, err := bfs.fs.ReadDir(ctx, path) if err != nil { - return nil, billyErr(err, bfs.log) + return nil, billyErr(ctx, err, bfs.log) } out := make([]fs.FileInfo, 0, len(ffs)) @@ -102,8 +130,13 @@ func (*billyFsWrapper) Readlink(link string) (string, error) { } // Remove implements billy.Filesystem. -func (s *billyFsWrapper) Remove(filename string) error { - return s.fs.Unlink(filename) +func (bfs *billyFsWrapper) Remove(filename string) error { + ctx, span := billyFsTracer.Start(bfs.ctx(), "Remove", + trace.WithAttributes(attribute.String("filename", filename)), + ) + defer span.End() + + return bfs.fs.Unlink(ctx, filename) } // Rename implements billy.Filesystem. @@ -117,25 +150,32 @@ func (*billyFsWrapper) Root() string { } // Stat implements billy.Filesystem. -func (fs *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) { - info, err := fs.fs.Stat(filename) +func (bfs *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) { + ctx, span := billyFsTracer.Start(bfs.ctx(), "Remove", + trace.WithAttributes(attribute.String("filename", filename)), + ) + defer span.End() + + info, err := bfs.fs.Stat(ctx, filename) if err != nil { - return nil, billyErr(err, fs.log) + return nil, billyErr(ctx, err, bfs.log) } return info, nil } // Symlink implements billy.Filesystem. func (fs *billyFsWrapper) Symlink(target string, link string) error { - return billyErr(vfs.ErrNotImplemented, fs.log) + return billyErr(nil, vfs.ErrNotImplemented, fs.log) } // TempFile implements billy.Filesystem. func (fs *billyFsWrapper) TempFile(dir string, prefix string) (billy.File, error) { - return nil, billyErr(vfs.ErrNotImplemented, fs.log) + return nil, billyErr(nil, vfs.ErrNotImplemented, fs.log) } type billyFile struct { + ctx context.Context + name string file vfs.File log *slog.Logger @@ -154,28 +194,47 @@ func (f *billyFile) Name() string { } // Read implements billy.File. -func (f *billyFile) Read(p []byte) (n int, err error) { - return f.file.Read(p) +func (bf *billyFile) Read(p []byte) (n int, err error) { + ctx, span := billyFsTracer.Start(bf.ctx, "Read", + trace.WithAttributes(attribute.Int("length", len(p))), + ) + defer func() { + span.SetAttributes(attribute.Int("read", n)) + span.End() + }() + + return bf.file.Read(ctx, p) } // ReadAt implements billy.File. -func (f *billyFile) ReadAt(p []byte, off int64) (n int, err error) { - return f.file.ReadAt(p, off) +func (bf *billyFile) ReadAt(p []byte, off int64) (n int, err error) { + ctx, span := billyFsTracer.Start(bf.ctx, "Read", + trace.WithAttributes( + attribute.Int("length", len(p)), + attribute.Int64("offset", off), + ), + ) + defer func() { + span.SetAttributes(attribute.Int("read", n)) + span.End() + }() + + return bf.file.ReadAt(ctx, p, off) } // Seek implements billy.File. func (f *billyFile) Seek(offset int64, whence int) (int64, error) { - return 0, billyErr(vfs.ErrNotImplemented, f.log) + return 0, billyErr(nil, vfs.ErrNotImplemented, f.log) } // Truncate implements billy.File. func (f *billyFile) Truncate(size int64) error { - return billyErr(vfs.ErrNotImplemented, f.log) + return billyErr(nil, vfs.ErrNotImplemented, f.log) } // Write implements billy.File. func (f *billyFile) Write(p []byte) (n int, err error) { - return 0, billyErr(vfs.ErrNotImplemented, f.log) + return 0, billyErr(nil, vfs.ErrNotImplemented, f.log) } // Lock implements billy.File. @@ -188,13 +247,13 @@ func (*billyFile) Unlock() error { return nil // TODO } -func billyErr(err error, log *slog.Logger) error { +func billyErr(ctx context.Context, err error, log *slog.Logger) error { if errors.Is(err, vfs.ErrNotImplemented) { return billy.ErrNotSupported } if errors.Is(err, vfs.ErrNotExist) { if err, ok := asErr[*fs.PathError](err); ok { - log.Error("file not found", "op", err.Op, "path", err.Path, "error", err.Err) + log.ErrorContext(ctx, "file not found", "op", err.Op, "path", err.Path, "error", err.Err) } return fs.ErrNotExist } diff --git a/src/export/webdav/fs.go b/src/export/webdav/fs.go index e6411ab..1c8fc83 100644 --- a/src/export/webdav/fs.go +++ b/src/export/webdav/fs.go @@ -10,7 +10,6 @@ import ( "time" "git.kmsign.ru/royalcat/tstor/src/host/vfs" - "git.kmsign.ru/royalcat/tstor/src/iio" "golang.org/x/net/webdav" ) @@ -28,19 +27,19 @@ func (wd *WebDAV) OpenFile(ctx context.Context, name string, flag int, perm os.F name = vfs.AbsPath(name) // TODO handle flag and permissions - f, err := wd.lookupFile(name) + f, err := wd.lookupFile(ctx, name) if err != nil { return nil, err } - wdf := newFile(path.Base(name), f, func() ([]fs.FileInfo, error) { - return wd.listDir(name) + wdf := newFile(ctx, path.Base(name), f, func() ([]fs.FileInfo, error) { + return wd.listDir(ctx, name) }) return wdf, nil } func (wd *WebDAV) Stat(ctx context.Context, name string) (fs.FileInfo, error) { - return wd.fs.Stat(vfs.AbsPath(name)) + return wd.fs.Stat(ctx, vfs.AbsPath(name)) } func (wd *WebDAV) Mkdir(ctx context.Context, name string, perm fs.FileMode) error { @@ -48,19 +47,19 @@ func (wd *WebDAV) Mkdir(ctx context.Context, name string, perm fs.FileMode) erro } func (wd *WebDAV) RemoveAll(ctx context.Context, name string) error { - return wd.fs.Unlink(name) + return wd.fs.Unlink(ctx, name) } func (wd *WebDAV) Rename(ctx context.Context, oldName, newName string) error { return webdav.ErrNotImplemented } -func (wd *WebDAV) lookupFile(name string) (vfs.File, error) { - return wd.fs.Open(path.Clean(name)) +func (wd *WebDAV) lookupFile(ctx context.Context, name string) (vfs.File, error) { + return wd.fs.Open(ctx, path.Clean(name)) } -func (wd *WebDAV) listDir(path string) ([]os.FileInfo, error) { - files, err := wd.fs.ReadDir(path) +func (wd *WebDAV) listDir(ctx context.Context, path string) ([]os.FileInfo, error) { + files, err := wd.fs.ReadDir(ctx, path) if err != nil { return nil, err } @@ -80,9 +79,10 @@ func (wd *WebDAV) listDir(path string) ([]os.FileInfo, error) { var _ webdav.File = &webDAVFile{} type webDAVFile struct { - iio.Reader + ctx context.Context fi os.FileInfo + f vfs.File mudp sync.Mutex dirPos int @@ -93,11 +93,11 @@ type webDAVFile struct { dirContent []os.FileInfo } -func newFile(name string, f vfs.File, df func() ([]os.FileInfo, error)) *webDAVFile { +func newFile(ctx context.Context, name string, f vfs.File, df func() ([]os.FileInfo, error)) *webDAVFile { return &webDAVFile{ + ctx: ctx, fi: newFileInfo(name, f.Size(), f.IsDir()), dirFunc: df, - Reader: f, } } @@ -147,7 +147,7 @@ func (wdf *webDAVFile) Read(p []byte) (int, error) { wdf.mup.Lock() defer wdf.mup.Unlock() - n, err := wdf.Reader.ReadAt(p, wdf.pos) + n, err := wdf.f.ReadAt(wdf.ctx, p, wdf.pos) wdf.pos += int64(n) return n, err @@ -173,6 +173,11 @@ func (wdf *webDAVFile) Write(p []byte) (n int, err error) { return 0, webdav.ErrNotImplemented } +// Close implements webdav.File. +func (wdf *webDAVFile) Close() error { + return wdf.f.Close(wdf.ctx) +} + type webDAVFileInfo struct { name string size int64 diff --git a/src/host/service/service.go b/src/host/service/service.go index 1d42a6a..c746119 100644 --- a/src/host/service/service.go +++ b/src/host/service/service.go @@ -9,8 +9,8 @@ import ( "path/filepath" "slices" "strings" - "time" + "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/host/datastorage" @@ -46,13 +46,11 @@ type Service struct { dirsAquire kv.Store[string, DirAquire] - log *slog.Logger - addTimeout, readTimeout int + log *slog.Logger } func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client, storage datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes, - addTimeout, readTimeout int, ) (*Service, error) { dirsAcquire, err := kv.NewBadgerKV[string, DirAquire](path.Join(cfg.MetadataFolder, "dir-acquire")) if err != nil { @@ -70,8 +68,6 @@ func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client, torrentLoaded: make(chan struct{}), dirsAquire: dirsAcquire, // stats: newStats(), // TODO persistent - addTimeout: addTimeout, - readTimeout: readTimeout, } go func() { @@ -94,14 +90,14 @@ func (s *Service) Close() error { } func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) { - defer f.Close() + defer f.Close(ctx) stat, err := f.Stat() if err != nil { return nil, fmt.Errorf("call stat failed: %w", err) } - mi, err := metainfo.Load(f) + mi, err := metainfo.Load(ctxio.IoReader(ctx, f)) if err != nil { return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err) } @@ -291,10 +287,8 @@ func isValidInfoHashBytes(d []byte) bool { return err == nil } -func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) { - ctx, cancel := context.WithTimeout(context.TODO(), time.Second*time.Duration(s.addTimeout)) - defer cancel() - defer f.Close() +func (s *Service) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) { + defer f.Close(ctx) info, err := f.Stat() if err != nil { @@ -306,7 +300,7 @@ func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) { return nil, err } - return vfs.NewTorrentFs(info.Name(), controller.NewTorrent(t, s.excludedFiles), s.readTimeout), nil + return vfs.NewTorrentFs(info.Name(), controller.NewTorrent(t, s.excludedFiles)), nil } func (s *Service) Stats() (*Stats, error) { @@ -333,7 +327,7 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error { if strings.HasSuffix(path, ".torrent") { file := vfs.NewLazyOsFile(path) - defer file.Close() + defer file.Close(ctx) _, err = s.AddTorrent(ctx, file) if err != nil { diff --git a/src/host/vfs/archive.go b/src/host/vfs/archive.go index b7bc60c..cf1e1a6 100644 --- a/src/host/vfs/archive.go +++ b/src/host/vfs/archive.go @@ -2,6 +2,7 @@ package vfs import ( "archive/zip" + "context" "io" "io/fs" "os" @@ -9,56 +10,57 @@ import ( "path/filepath" "strings" + "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/src/iio" "github.com/bodgit/sevenzip" "github.com/nwaples/rardecode/v2" ) var ArchiveFactories = map[string]FsFactory{ - ".zip": func(f File) (Filesystem, error) { + ".zip": func(ctx context.Context, f File) (Filesystem, error) { stat, err := f.Stat() if err != nil { return nil, err } - return NewArchive(stat.Name(), f, stat.Size(), ZipLoader), nil + return NewArchive(ctx, stat.Name(), f, stat.Size(), ZipLoader), nil }, - ".rar": func(f File) (Filesystem, error) { + ".rar": func(ctx context.Context, f File) (Filesystem, error) { stat, err := f.Stat() if err != nil { return nil, err } - return NewArchive(stat.Name(), f, stat.Size(), RarLoader), nil + return NewArchive(ctx, stat.Name(), f, stat.Size(), RarLoader), nil }, - ".7z": func(f File) (Filesystem, error) { + ".7z": func(ctx context.Context, f File) (Filesystem, error) { stat, err := f.Stat() if err != nil { return nil, err } - return NewArchive(stat.Name(), f, stat.Size(), SevenZipLoader), nil + return NewArchive(ctx, stat.Name(), f, stat.Size(), SevenZipLoader), nil }, } -type archiveLoader func(r iio.Reader, size int64) (map[string]*archiveFile, error) +type archiveLoader func(ctx context.Context, r ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) var _ Filesystem = &ArchiveFS{} type ArchiveFS struct { name string - r iio.Reader + r ctxio.ReaderAt Size int64 files func() (map[string]File, error) } -func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *ArchiveFS { +func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) *ArchiveFS { return &ArchiveFS{ name: name, r: r, Size: size, files: OnceValueWOErr(func() (map[string]File, error) { - zipFiles, err := loader(r, size) + zipFiles, err := loader(ctx, r, size) if err != nil { return nil, err } @@ -94,11 +96,11 @@ func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *Ar } // Unlink implements Filesystem. -func (a *ArchiveFS) Unlink(filename string) error { +func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error { return ErrNotImplemented } -func (a *ArchiveFS) Open(filename string) (File, error) { +func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) { files, err := a.files() if err != nil { return nil, err @@ -107,7 +109,7 @@ func (a *ArchiveFS) Open(filename string) (File, error) { return getFile(files, filename) } -func (fs *ArchiveFS) ReadDir(path string) ([]fs.DirEntry, error) { +func (fs *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { files, err := fs.files() if err != nil { return nil, err @@ -117,7 +119,7 @@ func (fs *ArchiveFS) ReadDir(path string) ([]fs.DirEntry, error) { } // Stat implements Filesystem. -func (afs *ArchiveFS) Stat(filename string) (fs.FileInfo, error) { +func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { files, err := afs.files() if err != nil { return nil, err @@ -204,7 +206,7 @@ func (d *archiveFile) IsDir() bool { return false } -func (d *archiveFile) Close() (err error) { +func (d *archiveFile) Close(ctx context.Context) (err error) { if d.reader != nil { err = d.reader.Close() d.reader = nil @@ -213,7 +215,7 @@ func (d *archiveFile) Close() (err error) { return } -func (d *archiveFile) Read(p []byte) (n int, err error) { +func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) { if err := d.load(); err != nil { return 0, err } @@ -221,7 +223,7 @@ func (d *archiveFile) Read(p []byte) (n int, err error) { return d.reader.Read(p) } -func (d *archiveFile) ReadAt(p []byte, off int64) (n int, err error) { +func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { if err := d.load(); err != nil { return 0, err } @@ -231,7 +233,9 @@ func (d *archiveFile) ReadAt(p []byte, off int64) (n int, err error) { var _ archiveLoader = ZipLoader -func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) { +func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { + reader := ctxio.IoReaderAt(ctx, ctxreader) + zr, err := zip.NewReader(reader, size) if err != nil { return nil, err @@ -261,7 +265,9 @@ func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) { var _ archiveLoader = SevenZipLoader -func SevenZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) { +func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { + reader := ctxio.IoReaderAt(ctx, ctxreader) + r, err := sevenzip.NewReader(reader, size) if err != nil { return nil, err @@ -294,8 +300,10 @@ func SevenZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, err var _ archiveLoader = RarLoader -func RarLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) { - r, err := rardecode.NewReader(iio.NewSeekerWrapper(reader, size)) +func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { + reader := ctxio.IoReadSeekerWrapper(ctx, ctxreader, size) + + r, err := rardecode.NewReader(reader) if err != nil { return nil, err } diff --git a/src/host/vfs/archive_test.go b/src/host/vfs/archive_test.go index c174438..c6ae444 100644 --- a/src/host/vfs/archive_test.go +++ b/src/host/vfs/archive_test.go @@ -3,10 +3,11 @@ package vfs import ( "archive/zip" "bytes" + "context" "io" "testing" - "git.kmsign.ru/royalcat/tstor/src/iio" + "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "github.com/stretchr/testify/require" ) @@ -18,10 +19,12 @@ func TestZipFilesystem(t *testing.T) { zReader, size := createTestZip(require) - // TODO add single dir collapse test - zfs := NewArchive("test", zReader, size, ZipLoader) + ctx := context.Background() - files, err := zfs.ReadDir("/path/to/test/file") + // TODO add single dir collapse test + zfs := NewArchive(ctx, "test", zReader, size, ZipLoader) + + files, err := zfs.ReadDir(ctx, "/path/to/test/file") require.NoError(err) require.Len(files, 1) @@ -30,16 +33,16 @@ func TestZipFilesystem(t *testing.T) { require.NotNil(e) out := make([]byte, 11) - f, err := zfs.Open("/path/to/test/file/1.txt") + f, err := zfs.Open(ctx, "/path/to/test/file/1.txt") require.NoError(err) - n, err := f.Read(out) + n, err := f.Read(ctx, out) require.Equal(io.EOF, err) require.Equal(11, n) require.Equal(fileContent, out) } -func createTestZip(require *require.Assertions) (iio.Reader, int64) { +func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) { buf := bytes.NewBuffer([]byte{}) zWriter := zip.NewWriter(buf) @@ -56,15 +59,16 @@ func createTestZip(require *require.Assertions) (iio.Reader, int64) { } type closeableByteReader struct { - *bytes.Reader + data *bytes.Reader } func newCBR(b []byte) *closeableByteReader { return &closeableByteReader{ - Reader: bytes.NewReader(b), + data: bytes.NewReader(b), } } -func (*closeableByteReader) Close() error { - return nil +// ReadAt implements ctxio.ReaderAt. +func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + return c.data.ReadAt(p, off) } diff --git a/src/host/vfs/dir.go b/src/host/vfs/dir.go index fc0c1a0..6d77249 100644 --- a/src/host/vfs/dir.go +++ b/src/host/vfs/dir.go @@ -1,6 +1,7 @@ package vfs import ( + "context" "io/fs" "path" ) @@ -30,14 +31,14 @@ func (d *dir) IsDir() bool { return true } -func (d *dir) Close() error { +func (d *dir) Close(ctx context.Context) error { return nil } -func (d *dir) Read(p []byte) (n int, err error) { +func (d *dir) Read(ctx context.Context, p []byte) (n int, err error) { return 0, nil } -func (d *dir) ReadAt(p []byte, off int64) (n int, err error) { +func (d *dir) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { return 0, nil } diff --git a/src/host/vfs/fs.go b/src/host/vfs/fs.go index 08282d3..018967f 100644 --- a/src/host/vfs/fs.go +++ b/src/host/vfs/fs.go @@ -1,12 +1,13 @@ package vfs import ( + "context" "errors" "io/fs" "path" "time" - "git.kmsign.ru/royalcat/tstor/src/iio" + "git.kmsign.ru/royalcat/tstor/pkg/ctxio" ) type File interface { @@ -14,7 +15,9 @@ type File interface { Size() int64 Stat() (fs.FileInfo, error) - iio.Reader + ctxio.Reader + ctxio.ReaderAt + ctxio.Closer } var ErrNotImplemented = errors.New("not implemented") @@ -23,14 +26,14 @@ type Filesystem interface { // Open opens the named file for reading. If successful, methods on the // returned file can be used for reading; the associated file descriptor has // mode O_RDONLY. - Open(filename string) (File, error) + Open(ctx context.Context, filename string) (File, error) // ReadDir reads the directory named by dirname and returns a list of // directory entries. - ReadDir(path string) ([]fs.DirEntry, error) + ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) - Stat(filename string) (fs.FileInfo, error) - Unlink(filename string) error + Stat(ctx context.Context, filename string) (fs.FileInfo, error) + Unlink(ctx context.Context, filename string) error fs.DirEntry } diff --git a/src/host/vfs/log.go b/src/host/vfs/log.go index de85be3..7560f26 100644 --- a/src/host/vfs/log.go +++ b/src/host/vfs/log.go @@ -1,8 +1,10 @@ package vfs import ( + "context" "io/fs" "log/slog" + "reflect" ) type LogFS struct { @@ -40,8 +42,8 @@ func (fs *LogFS) Type() fs.FileMode { } // Open implements Filesystem. -func (fs *LogFS) Open(filename string) (File, error) { - file, err := fs.fs.Open(filename) +func (fs *LogFS) Open(ctx context.Context, filename string) (File, error) { + file, err := fs.fs.Open(ctx, filename) if err != nil { fs.log.With("filename", filename).Error("Failed to open file") } @@ -50,17 +52,17 @@ func (fs *LogFS) Open(filename string) (File, error) { } // ReadDir implements Filesystem. -func (fs *LogFS) ReadDir(path string) ([]fs.DirEntry, error) { - file, err := fs.fs.ReadDir(path) +func (fs *LogFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { + file, err := fs.fs.ReadDir(ctx, path) if err != nil { - fs.log.Error("Failed to read dir", "path", path, "error", err) + fs.log.ErrorContext(ctx, "Failed to read dir", "path", path, "error", err.Error(), "fs-type", reflect.TypeOf(fs.fs).Name()) } return file, err } // Stat implements Filesystem. -func (fs *LogFS) Stat(filename string) (fs.FileInfo, error) { - file, err := fs.fs.Stat(filename) +func (fs *LogFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { + file, err := fs.fs.Stat(ctx, filename) if err != nil { fs.log.Error("Failed to stat", "filename", filename, "error", err) } @@ -68,8 +70,8 @@ func (fs *LogFS) Stat(filename string) (fs.FileInfo, error) { } // Unlink implements Filesystem. -func (fs *LogFS) Unlink(filename string) error { - err := fs.fs.Unlink(filename) +func (fs *LogFS) Unlink(ctx context.Context, filename string) error { + err := fs.fs.Unlink(ctx, filename) if err != nil { fs.log.Error("Failed to stat", "filename", filename, "error", err) } @@ -91,8 +93,8 @@ func WrapLogFile(f File, filename string, log *slog.Logger) *LogFile { } // Close implements File. -func (f *LogFile) Close() error { - err := f.f.Close() +func (f *LogFile) Close(ctx context.Context) error { + err := f.f.Close(ctx) if err != nil { f.log.Error("Failed to close", "error", err) } @@ -105,8 +107,8 @@ func (f *LogFile) IsDir() bool { } // Read implements File. -func (f *LogFile) Read(p []byte) (n int, err error) { - n, err = f.f.Read(p) +func (f *LogFile) Read(ctx context.Context, p []byte) (n int, err error) { + n, err = f.f.Read(ctx, p) if err != nil { f.log.Error("Failed to read", "error", err) } @@ -114,8 +116,8 @@ func (f *LogFile) Read(p []byte) (n int, err error) { } // ReadAt implements File. -func (f *LogFile) ReadAt(p []byte, off int64) (n int, err error) { - n, err = f.f.ReadAt(p, off) +func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + n, err = f.f.ReadAt(ctx, p, off) if err != nil { f.log.Error("Failed to read", "offset", off, "error", err) } diff --git a/src/host/vfs/memory.go b/src/host/vfs/memory.go index cd0680a..79d2f0e 100644 --- a/src/host/vfs/memory.go +++ b/src/host/vfs/memory.go @@ -2,6 +2,7 @@ package vfs import ( "bytes" + "context" "io/fs" "path" ) @@ -33,11 +34,6 @@ func (mfs *MemoryFs) Type() fs.FileMode { return fs.ModeDir } -// Unlink implements Filesystem. -func (fs *MemoryFs) Unlink(filename string) error { - return ErrNotImplemented -} - func NewMemoryFS(name string, files map[string]*MemoryFile) *MemoryFs { return &MemoryFs{ name: name, @@ -45,16 +41,16 @@ func NewMemoryFS(name string, files map[string]*MemoryFile) *MemoryFs { } } -func (m *MemoryFs) Open(filename string) (File, error) { +func (m *MemoryFs) Open(ctx context.Context, filename string) (File, error) { return getFile(m.files, filename) } -func (fs *MemoryFs) ReadDir(path string) ([]fs.DirEntry, error) { +func (fs *MemoryFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { return listDirFromFiles(fs.files, path) } // Stat implements Filesystem. -func (mfs *MemoryFs) Stat(filename string) (fs.FileInfo, error) { +func (mfs *MemoryFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { file, ok := mfs.files[filename] if !ok { return nil, ErrNotExist @@ -62,32 +58,47 @@ func (mfs *MemoryFs) Stat(filename string) (fs.FileInfo, error) { return newFileInfo(path.Base(filename), file.Size()), nil } -var _ File = &MemoryFile{} +// Unlink implements Filesystem. +func (fs *MemoryFs) Unlink(ctx context.Context, filename string) error { + return ErrNotImplemented +} + +var _ File = (*MemoryFile)(nil) type MemoryFile struct { name string - *bytes.Reader + data *bytes.Reader } func NewMemoryFile(name string, data []byte) *MemoryFile { return &MemoryFile{ - name: name, - Reader: bytes.NewReader(data), + name: name, + data: bytes.NewReader(data), } } func (d *MemoryFile) Stat() (fs.FileInfo, error) { - return newFileInfo(d.name, int64(d.Reader.Len())), nil + return newFileInfo(d.name, int64(d.data.Len())), nil } func (d *MemoryFile) Size() int64 { - return int64(d.Reader.Len()) + return int64(d.data.Len()) } func (d *MemoryFile) IsDir() bool { return false } -func (d *MemoryFile) Close() (err error) { +func (d *MemoryFile) Close(ctx context.Context) (err error) { return } + +// Read implements File. +func (d *MemoryFile) Read(ctx context.Context, p []byte) (n int, err error) { + return d.data.Read(p) +} + +// ReadAt implements File. +func (d *MemoryFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + return d.data.ReadAt(p, off) +} diff --git a/src/host/vfs/memory_test.go b/src/host/vfs/memory_test.go index a174921..6f6788f 100644 --- a/src/host/vfs/memory_test.go +++ b/src/host/vfs/memory_test.go @@ -1,6 +1,7 @@ package vfs import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -11,6 +12,7 @@ func TestMemory(t *testing.T) { require := require.New(t) testData := "Hello" + ctx := context.Background() c := NewMemoryFS("/", map[string]*MemoryFile{ "/dir/here": NewMemoryFile("here", []byte(testData)), @@ -23,23 +25,23 @@ func TestMemory(t *testing.T) { // c, err := NewContainerFs(fss) // require.NoError(err) - f, err := c.Open("/dir/here") + f, err := c.Open(ctx, "/dir/here") require.NoError(err) require.NotNil(f) require.Equal(int64(5), f.Size()) - require.NoError(f.Close()) + require.NoError(f.Close(ctx)) data := make([]byte, 5) - n, err := f.Read(data) + n, err := f.Read(ctx, data) require.NoError(err) require.Equal(5, n) require.Equal(string(data), testData) - files, err := c.ReadDir("/") + files, err := c.ReadDir(ctx, "/") require.NoError(err) require.Len(files, 1) - files, err = c.ReadDir("/dir") + files, err = c.ReadDir(ctx, "/dir") require.NoError(err) require.Len(files, 1) diff --git a/src/host/vfs/os.go b/src/host/vfs/os.go index 9b0277a..18daf11 100644 --- a/src/host/vfs/os.go +++ b/src/host/vfs/os.go @@ -1,6 +1,7 @@ package vfs import ( + "context" "io/fs" "os" "path" @@ -12,7 +13,7 @@ type OsFS struct { } // Stat implements Filesystem. -func (fs *OsFS) Stat(filename string) (fs.FileInfo, error) { +func (fs *OsFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { if path.Clean(filename) == Separator { return newDirInfo(Separator), nil } @@ -21,12 +22,12 @@ func (fs *OsFS) Stat(filename string) (fs.FileInfo, error) { } // Unlink implements Filesystem. -func (fs *OsFS) Unlink(filename string) error { +func (fs *OsFS) Unlink(ctx context.Context, filename string) error { return os.RemoveAll(path.Join(fs.hostDir, filename)) } // Open implements Filesystem. -func (fs *OsFS) Open(filename string) (File, error) { +func (fs *OsFS) Open(ctx context.Context, filename string) (File, error) { if path.Clean(filename) == Separator { return NewDir(filename), nil } @@ -35,7 +36,7 @@ func (fs *OsFS) Open(filename string) (File, error) { } // ReadDir implements Filesystem. -func (o *OsFS) ReadDir(dir string) ([]fs.DirEntry, error) { +func (o *OsFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) { return os.ReadDir(path.Join(o.hostDir, dir)) } @@ -83,17 +84,17 @@ func (f *OsFile) Info() (fs.FileInfo, error) { } // Close implements File. -func (f *OsFile) Close() error { +func (f *OsFile) Close(ctx context.Context) error { return f.f.Close() } // Read implements File. -func (f *OsFile) Read(p []byte) (n int, err error) { +func (f *OsFile) Read(ctx context.Context, p []byte) (n int, err error) { return f.f.Read(p) } // ReadAt implements File. -func (f *OsFile) ReadAt(p []byte, off int64) (n int, err error) { +func (f *OsFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { return f.f.ReadAt(p, off) } @@ -151,7 +152,7 @@ func (f *LazyOsFile) open() error { } // Close implements File. -func (f *LazyOsFile) Close() error { +func (f *LazyOsFile) Close(ctx context.Context) error { if f.file == nil { return nil } @@ -159,7 +160,7 @@ func (f *LazyOsFile) Close() error { } // Read implements File. -func (f *LazyOsFile) Read(p []byte) (n int, err error) { +func (f *LazyOsFile) Read(ctx context.Context, p []byte) (n int, err error) { err = f.open() if err != nil { return 0, err @@ -168,7 +169,7 @@ func (f *LazyOsFile) Read(p []byte) (n int, err error) { } // ReadAt implements File. -func (f *LazyOsFile) ReadAt(p []byte, off int64) (n int, err error) { +func (f *LazyOsFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { err = f.open() if err != nil { return 0, err diff --git a/src/host/vfs/resolver.go b/src/host/vfs/resolver.go index b465b7c..d434099 100644 --- a/src/host/vfs/resolver.go +++ b/src/host/vfs/resolver.go @@ -1,6 +1,7 @@ package vfs import ( + "context" "fmt" "io/fs" "path" @@ -22,41 +23,41 @@ func NewResolveFS(rootFs Filesystem, factories map[string]FsFactory) *ResolverFS } // Open implements Filesystem. -func (r *ResolverFS) Open(filename string) (File, error) { - fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(filename, r.rootFS.Open) +func (r *ResolverFS) Open(ctx context.Context, filename string) (File, error) { + fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open) if err != nil { return nil, err } if nestedFs != nil { - return nestedFs.Open(nestedFsPath) + return nestedFs.Open(ctx, nestedFsPath) } - return r.rootFS.Open(fsPath) + return r.rootFS.Open(ctx, fsPath) } // ReadDir implements Filesystem. -func (r *ResolverFS) ReadDir(dir string) ([]fs.DirEntry, error) { - fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(dir, r.rootFS.Open) +func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) { + fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, dir, r.rootFS.Open) if err != nil { return nil, err } if nestedFs != nil { - return nestedFs.ReadDir(nestedFsPath) + return nestedFs.ReadDir(ctx, nestedFsPath) } - entries, err := r.rootFS.ReadDir(fsPath) + entries, err := r.rootFS.ReadDir(ctx, fsPath) if err != nil { return nil, err } out := make([]fs.DirEntry, 0, len(entries)) for _, e := range entries { if r.resolver.isNestedFs(e.Name()) { - filepath := path.Join(dir, e.Name()) - file, err := r.Open(filepath) + filepath := path.Join("/", dir, e.Name()) + file, err := r.Open(ctx, filepath) if err != nil { return nil, err } - nestedfs, err := r.resolver.nestedFs(filepath, file) + nestedfs, err := r.resolver.nestedFs(ctx, filepath, file) if err != nil { return nil, err } @@ -70,29 +71,29 @@ func (r *ResolverFS) ReadDir(dir string) ([]fs.DirEntry, error) { } // Stat implements Filesystem. -func (r *ResolverFS) Stat(filename string) (fs.FileInfo, error) { - fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(filename, r.rootFS.Open) +func (r *ResolverFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { + fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open) if err != nil { return nil, err } if nestedFs != nil { - return nestedFs.Stat(nestedFsPath) + return nestedFs.Stat(ctx, nestedFsPath) } - return r.rootFS.Stat(fsPath) + return r.rootFS.Stat(ctx, fsPath) } // Unlink implements Filesystem. -func (r *ResolverFS) Unlink(filename string) error { - fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(filename, r.rootFS.Open) +func (r *ResolverFS) Unlink(ctx context.Context, filename string) error { + fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open) if err != nil { return err } if nestedFs != nil { - return nestedFs.Unlink(nestedFsPath) + return nestedFs.Unlink(ctx, nestedFsPath) } - return r.rootFS.Unlink(fsPath) + return r.rootFS.Unlink(ctx, fsPath) } // Info implements Filesystem. @@ -117,7 +118,7 @@ func (r *ResolverFS) Type() fs.FileMode { var _ Filesystem = &ResolverFS{} -type FsFactory func(f File) (Filesystem, error) +type FsFactory func(ctx context.Context, f File) (Filesystem, error) const Separator = "/" @@ -135,7 +136,7 @@ type resolver struct { // TODO: add fsmap clean } -type openFile func(path string) (File, error) +type openFile func(ctx context.Context, path string) (File, error) func (r *resolver) isNestedFs(f string) bool { for ext := range r.factories { @@ -146,7 +147,7 @@ func (r *resolver) isNestedFs(f string) bool { return false } -func (r *resolver) nestedFs(fsPath string, file File) (Filesystem, error) { +func (r *resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) { for ext, nestFactory := range r.factories { if !strings.HasSuffix(fsPath, ext) { continue @@ -156,7 +157,7 @@ func (r *resolver) nestedFs(fsPath string, file File) (Filesystem, error) { return nestedFs, nil } - nestedFs, err := nestFactory(file) + nestedFs, err := nestFactory(ctx, file) if err != nil { return nil, fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) } @@ -169,7 +170,7 @@ func (r *resolver) nestedFs(fsPath string, file File) (Filesystem, error) { } // open requeue raw open, without resolver call -func (r *resolver) resolvePath(name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) { +func (r *resolver) resolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) { name = path.Clean(name) name = strings.TrimPrefix(name, Separator) parts := strings.Split(name, Separator) @@ -206,11 +207,11 @@ PARTS_LOOP: if nestedFs, ok := r.fsmap[fsPath]; ok { return fsPath, nestedFs, nestedFsPath, nil } else { - fsFile, err := rawOpen(fsPath) + fsFile, err := rawOpen(ctx, fsPath) if err != nil { return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) } - nestedFs, err := nestFactory(fsFile) + nestedFs, err := nestFactory(ctx, fsFile) if err != nil { return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) } diff --git a/src/host/vfs/resolver_test.go b/src/host/vfs/resolver_test.go index c0cef9a..ab801f9 100644 --- a/src/host/vfs/resolver_test.go +++ b/src/host/vfs/resolver_test.go @@ -1,6 +1,7 @@ package vfs import ( + "context" "io/fs" "os" "path" @@ -26,15 +27,15 @@ func (d *Dummy) IsDir() bool { return false } -func (d *Dummy) Close() error { +func (d *Dummy) Close(ctx context.Context) error { return nil } -func (d *Dummy) Read(p []byte) (n int, err error) { +func (d *Dummy) Read(ctx context.Context, p []byte) (n int, err error) { return 0, nil } -func (d *Dummy) ReadAt(p []byte, off int64) (n int, err error) { +func (d *Dummy) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { return 0, nil } @@ -45,19 +46,19 @@ type DummyFs struct { } // Stat implements Filesystem. -func (*DummyFs) Stat(filename string) (fs.FileInfo, error) { +func (*DummyFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { return newFileInfo(path.Base(filename), 0), nil // TODO } -func (d *DummyFs) Open(filename string) (File, error) { +func (d *DummyFs) Open(ctx context.Context, filename string) (File, error) { return &Dummy{}, nil } -func (d *DummyFs) Unlink(filename string) error { +func (d *DummyFs) Unlink(ctx context.Context, filename string) error { return ErrNotImplemented } -func (d *DummyFs) ReadDir(path string) ([]fs.DirEntry, error) { +func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { if path == "/dir/here" { return []fs.DirEntry{ newFileInfo("file1.txt", 0), @@ -93,11 +94,13 @@ var _ Filesystem = &DummyFs{} func TestResolver(t *testing.T) { t.Parallel() resolver := newResolver(ArchiveFactories) + ctx := context.Background() + t.Run("nested fs", func(t *testing.T) { t.Parallel() require := require.New(t) - fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("/f1.rar/f2.rar", func(path string) (File, error) { + fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/f1.rar/f2.rar", func(_ context.Context, path string) (File, error) { require.Equal("/f1.rar", path) return &Dummy{}, nil }) @@ -110,7 +113,7 @@ func TestResolver(t *testing.T) { t.Parallel() require := require.New(t) - fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("/", func(path string) (File, error) { + fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/", func(_ context.Context, path string) (File, error) { require.Equal("/", path) return &Dummy{}, nil }) @@ -124,7 +127,7 @@ func TestResolver(t *testing.T) { t.Parallel() require := require.New(t) - fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("//.//", func(path string) (File, error) { + fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//", func(_ context.Context, path string) (File, error) { require.Equal("/", path) return &Dummy{}, nil }) @@ -137,7 +140,7 @@ func TestResolver(t *testing.T) { t.Parallel() require := require.New(t) - fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("//.//f1.rar", func(path string) (File, error) { + fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//f1.rar", func(_ context.Context, path string) (File, error) { require.Equal("/f1.rar", path) return &Dummy{}, nil }) @@ -150,7 +153,7 @@ func TestResolver(t *testing.T) { t.Parallel() require := require.New(t) - fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath("//test1/f1.rar", func(path string) (File, error) { + fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//test1/f1.rar", func(_ context.Context, path string) (File, error) { require.Equal("/test1/f1.rar", path) return &Dummy{}, nil }) @@ -164,21 +167,23 @@ func TestResolver(t *testing.T) { func TestArchiveFactories(t *testing.T) { t.Parallel() + ctx := context.Background() + require := require.New(t) require.Contains(ArchiveFactories, ".zip") require.Contains(ArchiveFactories, ".rar") require.Contains(ArchiveFactories, ".7z") - fs, err := ArchiveFactories[".zip"](&Dummy{}) + fs, err := ArchiveFactories[".zip"](ctx, &Dummy{}) require.NoError(err) require.NotNil(fs) - fs, err = ArchiveFactories[".rar"](&Dummy{}) + fs, err = ArchiveFactories[".rar"](ctx, &Dummy{}) require.NoError(err) require.NotNil(fs) - fs, err = ArchiveFactories[".7z"](&Dummy{}) + fs, err = ArchiveFactories[".7z"](ctx, &Dummy{}) require.NoError(err) require.NotNil(fs) } diff --git a/src/host/vfs/torrent.go b/src/host/vfs/torrent.go index 4b51e9d..7f4545b 100644 --- a/src/host/vfs/torrent.go +++ b/src/host/vfs/torrent.go @@ -8,12 +8,9 @@ import ( "slices" "strings" "sync" - "time" + "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/src/host/controller" - "git.kmsign.ru/royalcat/tstor/src/iio" - "github.com/RoaringBitmap/roaring" - "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/torrent" "golang.org/x/exp/maps" ) @@ -26,19 +23,16 @@ type TorrentFs struct { mu sync.Mutex Torrent *controller.Torrent - readTimeout int - filesCache map[string]File resolver *resolver } -func NewTorrentFs(name string, c *controller.Torrent, readTimeout int) *TorrentFs { +func NewTorrentFs(name string, c *controller.Torrent) *TorrentFs { return &TorrentFs{ - name: name, - Torrent: c, - readTimeout: readTimeout, - resolver: newResolver(ArchiveFactories), + name: name, + Torrent: c, + resolver: newResolver(ArchiveFactories), } } @@ -64,7 +58,7 @@ func (tfs *TorrentFs) Type() fs.FileMode { return fs.ModeDir } -func (fs *TorrentFs) files() (map[string]File, error) { +func (fs *TorrentFs) files(ctx context.Context) (map[string]File, error) { fs.mu.Lock() defer fs.mu.Unlock() @@ -81,26 +75,25 @@ func (fs *TorrentFs) files() (map[string]File, error) { for _, file := range files { file.Download() p := AbsPath(file.Path()) - - fs.filesCache[p] = &torrentFile{ - name: path.Base(p), - timeout: fs.readTimeout, - file: file, + tf, err := openTorrentFile(ctx, path.Base(p), file) + if err != nil { + return nil, err } + fs.filesCache[p] = tf } // TODO optional if len(fs.filesCache) == 1 && fs.resolver.isNestedFs(fs.Torrent.Name()) { filepath := "/" + fs.Torrent.Name() if file, ok := fs.filesCache[filepath]; ok { - nestedFs, err := fs.resolver.nestedFs(filepath, file) + nestedFs, err := fs.resolver.nestedFs(ctx, filepath, file) if err != nil { return nil, err } if nestedFs == nil { goto DEFAULT_DIR // FIXME } - fs.filesCache, err = listFilesRecursive(nestedFs, "/") + fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/") if err != nil { return nil, err } @@ -130,40 +123,40 @@ DEFAULT_DIR: return fs.filesCache, nil } -func anyPeerHasFiles(file *torrent.File) bool { - for _, conn := range file.Torrent().PeerConns() { - if bitmapHaveFile(conn.PeerPieces(), file) { - return true - } - } - return false -} +// func anyPeerHasFiles(file *torrent.File) bool { +// for _, conn := range file.Torrent().PeerConns() { +// if bitmapHaveFile(conn.PeerPieces(), file) { +// return true +// } +// } +// return false +// } -func bitmapHaveFile(bitmap *roaring.Bitmap, file *torrent.File) bool { - for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ { - if !bitmap.ContainsInt(i) { - return false - } - } - return true -} +// func bitmapHaveFile(bitmap *roaring.Bitmap, file *torrent.File) bool { +// for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ { +// if !bitmap.ContainsInt(i) { +// return false +// } +// } +// return true +// } -func listFilesRecursive(vfs Filesystem, start string) (map[string]File, error) { +func listFilesRecursive(ctx context.Context, vfs Filesystem, start string) (map[string]File, error) { out := make(map[string]File, 0) - entries, err := vfs.ReadDir(start) + entries, err := vfs.ReadDir(ctx, start) if err != nil { return nil, err } for _, entry := range entries { filename := path.Join(start, entry.Name()) if entry.IsDir() { - rec, err := listFilesRecursive(vfs, filename) + rec, err := listFilesRecursive(ctx, vfs, filename) if err != nil { return nil, err } maps.Copy(out, rec) } else { - file, err := vfs.Open(filename) + file, err := vfs.Open(ctx, filename) if err != nil { return nil, err } @@ -174,8 +167,8 @@ func listFilesRecursive(vfs Filesystem, start string) (map[string]File, error) { return out, nil } -func (fs *TorrentFs) rawOpen(path string) (File, error) { - files, err := fs.files() +func (fs *TorrentFs) rawOpen(ctx context.Context, path string) (File, error) { + files, err := fs.files(ctx) if err != nil { return nil, err } @@ -183,8 +176,8 @@ func (fs *TorrentFs) rawOpen(path string) (File, error) { return file, err } -func (fs *TorrentFs) rawStat(filename string) (fs.FileInfo, error) { - files, err := fs.files() +func (fs *TorrentFs) rawStat(ctx context.Context, filename string) (fs.FileInfo, error) { + files, err := fs.files(ctx) if err != nil { return nil, err } @@ -196,43 +189,43 @@ func (fs *TorrentFs) rawStat(filename string) (fs.FileInfo, error) { } // Stat implements Filesystem. -func (fs *TorrentFs) Stat(filename string) (fs.FileInfo, error) { +func (fs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { if filename == Separator { return newDirInfo(filename), nil } - fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(filename, fs.rawOpen) + fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { - return nestedFs.Stat(nestedFsPath) + return nestedFs.Stat(ctx, nestedFsPath) } - return fs.rawStat(fsPath) + return fs.rawStat(ctx, fsPath) } -func (fs *TorrentFs) Open(filename string) (File, error) { - fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(filename, fs.rawOpen) +func (fs *TorrentFs) Open(ctx context.Context, filename string) (File, error) { + fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { - return nestedFs.Open(nestedFsPath) + return nestedFs.Open(ctx, nestedFsPath) } - return fs.rawOpen(fsPath) + return fs.rawOpen(ctx, fsPath) } -func (fs *TorrentFs) ReadDir(name string) ([]fs.DirEntry, error) { - fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(name, fs.rawOpen) +func (fs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { + fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, name, fs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { - return nestedFs.ReadDir(nestedFsPath) + return nestedFs.ReadDir(ctx, nestedFsPath) } - files, err := fs.files() + files, err := fs.files(ctx) if err != nil { return nil, err } @@ -240,13 +233,13 @@ func (fs *TorrentFs) ReadDir(name string) ([]fs.DirEntry, error) { return listDirFromFiles(files, fsPath) } -func (fs *TorrentFs) Unlink(name string) error { +func (fs *TorrentFs) Unlink(ctx context.Context, name string) error { name = AbsPath(name) fs.mu.Lock() defer fs.mu.Unlock() - files, err := fs.files() + files, err := fs.files(ctx) if err != nil { return err } @@ -266,48 +259,84 @@ func (fs *TorrentFs) Unlink(name string) error { return fs.Torrent.ExcludeFile(context.Background(), tfile.file) } -type reader interface { - iio.Reader - missinggo.ReadContexter +var _ File = &torrentFile{} + +type torrentFile struct { + name string + + mu sync.Mutex + + tr torrent.Reader + + file *torrent.File } -type readAtWrapper struct { - timeout int - mu sync.Mutex +func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) { + select { + case <-file.Torrent().GotInfo(): + break + case <-ctx.Done(): + return nil, ctx.Err() + } - torrent.Reader - io.ReaderAt - io.Closer + r := file.NewReader() + r.SetReadahead(4096) // TODO configurable + r.SetResponsive() + + return &torrentFile{ + name: name, + tr: r, + file: file, + }, nil } -func newReadAtWrapper(r torrent.Reader, timeout int) reader { - w := &readAtWrapper{Reader: r, timeout: timeout} - w.SetResponsive() - return w +func (tf *torrentFile) Stat() (fs.FileInfo, error) { + return newFileInfo(tf.name, tf.file.Length()), nil } -func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) { +func (tf *torrentFile) Size() int64 { + return tf.file.Length() +} + +func (tf *torrentFile) IsDir() bool { + return false +} + +func (rw *torrentFile) Close(ctx context.Context) error { rw.mu.Lock() defer rw.mu.Unlock() - _, err := rw.Seek(off, io.SeekStart) + + return rw.tr.Close() +} + +// Read implements ctxio.Reader. +func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) { + tf.mu.Lock() + defer tf.mu.Unlock() + + return tf.tr.ReadContext(ctx, p) +} + +func (yf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { + yf.mu.Lock() + defer yf.mu.Unlock() + + _, err := yf.tr.Seek(off, io.SeekStart) if err != nil { return 0, err } - return readAtLeast(rw, rw.timeout, p, len(p)) + return readAtLeast(ctx, yf, p, len(p)) } -func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) { +func readAtLeast(ctx context.Context, r ctxio.Reader, buf []byte, min int) (n int, err error) { if len(buf) < min { return 0, io.ErrShortBuffer } for n < min && err == nil { var nn int - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) - defer cancel() - - nn, err = r.ReadContext(ctx, buf[n:]) + nn, err = r.Read(ctx, buf[n:]) n += nn } if n >= min { @@ -317,63 +346,3 @@ func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n } return } - -func (rw *readAtWrapper) Close() error { - rw.mu.Lock() - defer rw.mu.Unlock() - return rw.Reader.Close() -} - -var _ File = &torrentFile{} - -type torrentFile struct { - name string - - reader reader - timeout int - - file *torrent.File -} - -func (d *torrentFile) Stat() (fs.FileInfo, error) { - return newFileInfo(d.name, d.file.Length()), nil -} - -func (d *torrentFile) load() { - if d.reader != nil { - return - } - d.reader = newReadAtWrapper(d.file.NewReader(), d.timeout) -} - -func (d *torrentFile) Size() int64 { - return d.file.Length() -} - -func (d *torrentFile) IsDir() bool { - return false -} - -func (d *torrentFile) Close() error { - var err error - if d.reader != nil { - err = d.reader.Close() - } - - d.reader = nil - - return err -} - -func (d *torrentFile) Read(p []byte) (n int, err error) { - d.load() - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Second) - defer cancel() - - return d.reader.ReadContext(ctx, p) -} - -func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) { - d.load() - return d.reader.ReadAt(p, off) -} diff --git a/src/host/vfs/torrent_test.go b/src/host/vfs/torrent_test.go index 2a8fac9..7de12ac 100644 --- a/src/host/vfs/torrent_test.go +++ b/src/host/vfs/torrent_test.go @@ -1,6 +1,7 @@ package vfs import ( + "context" "os" "testing" @@ -87,6 +88,8 @@ func TestMain(m *testing.M) { func TestReadAtTorrent(t *testing.T) { t.Parallel() + ctx := context.Background() + require := require.New(t) to, err := Cli.AddMagnet(testMagnet) @@ -96,19 +99,18 @@ func TestReadAtTorrent(t *testing.T) { torrFile := to.Files()[0] tf := torrentFile{ - file: torrFile, - timeout: 500, + file: torrFile, } - defer tf.Close() + defer tf.Close(ctx) toRead := make([]byte, 5) - n, err := tf.ReadAt(toRead, 6) + n, err := tf.ReadAt(ctx, toRead, 6) require.NoError(err) require.Equal(5, n) require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead) - n, err = tf.ReadAt(toRead, 0) + n, err = tf.ReadAt(ctx, toRead, 0) require.NoError(err) require.Equal(5, n) require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead) @@ -117,6 +119,8 @@ func TestReadAtTorrent(t *testing.T) { func TestReadAtWrapper(t *testing.T) { t.Parallel() + ctx := context.Background() + require := require.New(t) to, err := Cli.AddMagnet(testMagnet) @@ -125,16 +129,16 @@ func TestReadAtWrapper(t *testing.T) { <-to.GotInfo() torrFile := to.Files()[0] - r := newReadAtWrapper(torrFile.NewReader(), 10) - defer r.Close() + r, err := openTorrentFile(ctx, "file", torrFile) + defer r.Close(ctx) toRead := make([]byte, 5) - n, err := r.ReadAt(toRead, 6) + n, err := r.ReadAt(ctx, toRead, 6) require.NoError(err) require.Equal(5, n) require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead) - n, err = r.ReadAt(toRead, 0) + n, err = r.ReadAt(ctx, toRead, 0) require.NoError(err) require.Equal(5, n) require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead) diff --git a/src/iio/wrapper_test.go b/src/iio/wrapper_test.go index 7d8a82d..e53471f 100644 --- a/src/iio/wrapper_test.go +++ b/src/iio/wrapper_test.go @@ -1,11 +1,12 @@ package iio_test import ( + "context" "io" "testing" + "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/src/host/vfs" - "git.kmsign.ru/royalcat/tstor/src/iio" "github.com/stretchr/testify/require" ) @@ -14,11 +15,12 @@ var testData []byte = []byte("Hello World") func TestSeekerWrapper(t *testing.T) { t.Parallel() + ctx := context.Background() require := require.New(t) mf := vfs.NewMemoryFile("text.txt", testData) - r := iio.NewSeekerWrapper(mf, mf.Size()) + r := ctxio.IoReadSeekCloserWrapper(ctx, mf, mf.Size()) defer r.Close() n, err := r.Seek(6, io.SeekStart)