tstor/src/sources/torrent/fs.go

531 lines
11 KiB
Go
Raw Normal View History

2024-05-19 21:24:09 +00:00
package torrent
import (
2021-11-29 10:07:54 +00:00
"context"
2024-04-06 13:51:36 +00:00
"fmt"
2021-11-29 10:07:54 +00:00
"io"
2023-12-21 23:15:39 +00:00
"io/fs"
"path"
2023-12-25 22:11:03 +00:00
"slices"
2024-01-01 18:17:32 +00:00
"strings"
"sync"
2024-03-30 10:16:13 +00:00
"sync/atomic"
2024-03-28 13:09:42 +00:00
"time"
2024-06-02 19:53:33 +00:00
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/torrent"
2024-03-28 13:09:42 +00:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
2023-12-25 22:11:03 +00:00
"golang.org/x/exp/maps"
)
2024-04-24 17:36:33 +00:00
type TorrentFS struct {
2024-03-19 21:30:37 +00:00
name string
mu sync.Mutex
2024-05-19 21:24:09 +00:00
Torrent *Controller
2023-12-21 23:15:39 +00:00
2024-05-19 21:24:09 +00:00
filesCache map[string]vfs.File
2023-12-21 23:15:39 +00:00
2024-03-30 10:16:13 +00:00
lastAccessTimeout atomic.Pointer[time.Time]
2024-05-19 21:24:09 +00:00
resolver *vfs.Resolver
}
2024-05-19 21:24:09 +00:00
var _ vfs.Filesystem = (*TorrentFS)(nil)
2024-06-14 22:14:44 +00:00
func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
2024-05-19 21:24:09 +00:00
c, err := s.LoadTorrent(ctx, f)
if err != nil {
return nil, err
}
2024-03-28 13:09:42 +00:00
2024-04-24 17:36:33 +00:00
return &TorrentFS{
2024-06-16 21:44:09 +00:00
name: f.Name(),
2024-03-20 21:47:51 +00:00
Torrent: c,
2024-05-19 21:24:09 +00:00
resolver: vfs.NewResolver(vfs.ArchiveFactories),
}, nil
}
2024-04-24 17:36:33 +00:00
var _ fs.DirEntry = (*TorrentFS)(nil)
2024-03-19 21:30:37 +00:00
// Name implements fs.DirEntry.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) Name() string {
2024-03-19 21:30:37 +00:00
return tfs.name
}
// Info implements fs.DirEntry.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) Info() (fs.FileInfo, error) {
2024-03-28 13:09:42 +00:00
return tfs, nil
2024-03-19 21:30:37 +00:00
}
// IsDir implements fs.DirEntry.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) IsDir() bool {
2024-03-19 21:30:37 +00:00
return true
}
// Type implements fs.DirEntry.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) Type() fs.FileMode {
2024-03-19 21:30:37 +00:00
return fs.ModeDir
}
2024-03-28 13:09:42 +00:00
// ModTime implements fs.FileInfo.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) ModTime() time.Time {
2024-03-28 13:09:42 +00:00
return time.Time{}
}
// Mode implements fs.FileInfo.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) Mode() fs.FileMode {
2024-03-28 13:09:42 +00:00
return fs.ModeDir
}
// Size implements fs.FileInfo.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) Size() int64 {
2024-03-28 13:09:42 +00:00
return 0
}
// Sys implements fs.FileInfo.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) Sys() any {
2024-03-28 13:09:42 +00:00
return nil
}
// FsName implements Filesystem.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) FsName() string {
2024-03-28 13:09:42 +00:00
return "torrentfs"
}
2024-05-19 21:24:09 +00:00
func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
2024-01-28 20:22:49 +00:00
fs.mu.Lock()
defer fs.mu.Unlock()
2023-12-25 22:11:03 +00:00
2024-01-28 20:22:49 +00:00
if fs.filesCache != nil {
return fs.filesCache, nil
}
2023-12-25 22:11:03 +00:00
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "files", fs.traceAttrs())
defer span.End()
files, err := fs.Torrent.Files(ctx)
2024-01-28 20:22:49 +00:00
if err != nil {
return nil, err
}
2024-01-01 18:17:32 +00:00
2024-05-19 21:24:09 +00:00
fs.filesCache = make(map[string]vfs.File)
2024-01-28 20:22:49 +00:00
for _, file := range files {
2024-05-19 21:24:09 +00:00
p := vfs.AbsPath(file.Path())
2024-03-20 21:47:51 +00:00
tf, err := openTorrentFile(ctx, path.Base(p), file)
if err != nil {
return nil, err
2024-01-28 20:22:49 +00:00
}
2024-03-20 21:47:51 +00:00
fs.filesCache[p] = tf
2024-01-28 20:22:49 +00:00
}
// TODO optional
2024-05-19 21:24:09 +00:00
if len(fs.filesCache) == 1 && fs.resolver.IsNestedFs(fs.Torrent.Name()) {
2024-03-19 21:30:37 +00:00
filepath := "/" + fs.Torrent.Name()
2024-01-28 20:22:49 +00:00
if file, ok := fs.filesCache[filepath]; ok {
2024-05-19 21:24:09 +00:00
nestedFs, err := fs.resolver.NestedFs(ctx, filepath, file)
2024-01-28 20:22:49 +00:00
if err != nil {
return nil, err
}
if nestedFs == nil {
goto DEFAULT_DIR // FIXME
}
2024-05-19 21:24:09 +00:00
fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/")
2024-01-28 20:22:49 +00:00
if err != nil {
return nil, err
2023-12-25 22:11:03 +00:00
}
2024-01-28 20:22:49 +00:00
return fs.filesCache, nil
}
2024-01-01 18:17:32 +00:00
2024-01-28 20:22:49 +00:00
}
DEFAULT_DIR:
2024-03-19 21:30:37 +00:00
rootDir := "/" + fs.Torrent.Name() + "/"
2024-01-28 20:22:49 +00:00
singleDir := true
for k, _ := range fs.filesCache {
if !strings.HasPrefix(k, rootDir) {
singleDir = false
2023-10-16 09:18:40 +00:00
}
2024-01-28 20:22:49 +00:00
}
if singleDir {
for k, f := range fs.filesCache {
delete(fs.filesCache, k)
k, _ = strings.CutPrefix(k, rootDir)
2024-05-19 21:24:09 +00:00
k = vfs.AbsPath(k)
2024-01-28 20:22:49 +00:00
fs.filesCache[k] = f
}
}
return fs.filesCache, nil
}
2024-01-07 17:09:56 +00:00
2024-05-19 21:24:09 +00:00
func listFilesRecursive(ctx context.Context, fs vfs.Filesystem, start string) (map[string]vfs.File, error) {
out := make(map[string]vfs.File, 0)
entries, err := fs.ReadDir(ctx, start)
2024-01-28 20:22:49 +00:00
if err != nil {
return nil, err
}
for _, entry := range entries {
filename := path.Join(start, entry.Name())
if entry.IsDir() {
2024-05-19 21:24:09 +00:00
rec, err := listFilesRecursive(ctx, fs, filename)
2024-01-28 20:22:49 +00:00
if err != nil {
return nil, err
2024-01-07 17:09:56 +00:00
}
2024-01-28 20:22:49 +00:00
maps.Copy(out, rec)
} else {
2024-05-19 21:24:09 +00:00
file, err := fs.Open(ctx, filename)
2024-01-28 20:22:49 +00:00
if err != nil {
return nil, err
2024-01-07 17:09:56 +00:00
}
2024-01-28 20:22:49 +00:00
out[filename] = file
2024-01-07 17:09:56 +00:00
}
2023-10-16 09:18:40 +00:00
}
2024-01-28 20:22:49 +00:00
return out, nil
2023-10-16 09:18:40 +00:00
}
2024-05-19 21:24:09 +00:00
func (fs *TorrentFS) rawOpen(ctx context.Context, filename string) (file vfs.File, err error) {
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "rawOpen",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
2024-03-20 21:47:51 +00:00
files, err := fs.files(ctx)
2023-12-25 22:11:03 +00:00
if err != nil {
return nil, err
}
2024-05-19 21:24:09 +00:00
file, err = vfs.GetFile(files, filename)
2023-10-16 09:18:40 +00:00
return file, err
}
2024-04-24 17:36:33 +00:00
func (fs *TorrentFS) rawStat(ctx context.Context, filename string) (fs.FileInfo, error) {
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "rawStat",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer span.End()
2024-03-20 21:47:51 +00:00
files, err := fs.files(ctx)
2023-12-25 22:11:03 +00:00
if err != nil {
return nil, err
}
2024-03-29 06:53:52 +00:00
2024-05-19 21:24:09 +00:00
file, err := vfs.GetFile(files, filename)
2023-12-21 23:15:39 +00:00
if err != nil {
return nil, err
}
2024-03-28 13:09:42 +00:00
return file.Info()
}
2024-04-24 17:36:33 +00:00
func (fs *TorrentFS) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption {
2024-03-28 13:09:42 +00:00
return trace.WithAttributes(append([]attribute.KeyValue{
attribute.String("fs", fs.FsName()),
attribute.String("torrent", fs.Torrent.Name()),
attribute.String("infohash", fs.Torrent.InfoHash()),
}, add...)...)
2023-12-21 23:15:39 +00:00
}
2024-06-14 22:14:44 +00:00
func (tfs *TorrentFS) readContext(ctx context.Context) (context.Context, context.CancelFunc) {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true))
return context.WithTimeout(ctx, time.Millisecond)
}
return ctx, func() {}
}
2023-12-21 23:15:39 +00:00
// Stat implements Filesystem.
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "Stat",
2024-03-30 10:16:13 +00:00
tfs.traceAttrs(attribute.String("filename", filename)),
2024-03-28 13:09:42 +00:00
)
defer span.End()
2024-05-19 21:24:09 +00:00
if vfs.IsRoot(filename) {
2024-03-30 10:16:13 +00:00
return tfs, nil
2023-12-21 23:15:39 +00:00
}
2024-06-14 22:14:44 +00:00
var err error
ctx, cancel := tfs.readContext(ctx)
defer func() {
cancel()
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
2024-05-19 21:24:09 +00:00
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen)
2023-12-21 23:15:39 +00:00
if err != nil {
return nil, err
}
if nestedFs != nil {
2024-03-20 21:47:51 +00:00
return nestedFs.Stat(ctx, nestedFsPath)
2023-12-21 23:15:39 +00:00
}
2024-03-30 10:16:13 +00:00
return tfs.rawStat(ctx, fsPath)
2023-12-21 23:15:39 +00:00
}
2024-05-19 21:24:09 +00:00
func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file vfs.File, err error) {
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "Open",
2024-03-30 10:16:13 +00:00
tfs.traceAttrs(attribute.String("filename", filename)),
2024-03-28 13:09:42 +00:00
)
defer span.End()
2024-05-19 21:24:09 +00:00
if vfs.IsRoot(filename) {
return vfs.NewDirFile(tfs.name), nil
2024-03-28 13:09:42 +00:00
}
2024-06-14 22:14:44 +00:00
ctx, cancel := tfs.readContext(ctx)
defer func() {
cancel()
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
2024-05-19 21:24:09 +00:00
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen)
2023-10-16 09:18:40 +00:00
if err != nil {
return nil, err
}
2023-10-16 09:18:40 +00:00
if nestedFs != nil {
2024-03-30 10:16:13 +00:00
2024-03-20 21:47:51 +00:00
return nestedFs.Open(ctx, nestedFsPath)
}
2024-03-30 10:16:13 +00:00
return tfs.rawOpen(ctx, fsPath)
}
2024-04-24 17:36:33 +00:00
func (tfs *TorrentFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "ReadDir",
2024-03-30 10:16:13 +00:00
tfs.traceAttrs(attribute.String("name", name)),
2024-03-28 13:09:42 +00:00
)
defer span.End()
2024-06-14 22:14:44 +00:00
var err error
ctx, cancel := tfs.readContext(ctx)
defer func() {
cancel()
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
2024-05-19 21:24:09 +00:00
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, name, tfs.rawOpen)
2023-10-16 09:18:40 +00:00
if err != nil {
return nil, err
}
if nestedFs != nil {
2024-03-20 21:47:51 +00:00
return nestedFs.ReadDir(ctx, nestedFsPath)
2023-10-16 09:18:40 +00:00
}
2024-03-30 10:16:13 +00:00
files, err := tfs.files(ctx)
2023-12-25 22:11:03 +00:00
if err != nil {
return nil, err
}
2024-05-19 21:24:09 +00:00
return vfs.ListDirFromFiles(files, fsPath)
}
2024-04-24 17:36:33 +00:00
func (fs *TorrentFS) Unlink(ctx context.Context, name string) error {
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "Unlink",
fs.traceAttrs(attribute.String("name", name)),
)
defer span.End()
2024-05-19 21:24:09 +00:00
name = vfs.AbsPath(name)
2023-12-31 22:54:55 +00:00
2023-12-25 22:11:03 +00:00
fs.mu.Lock()
defer fs.mu.Unlock()
2024-03-20 21:47:51 +00:00
files, err := fs.files(ctx)
2023-12-25 22:11:03 +00:00
if err != nil {
return err
}
2023-12-31 22:54:55 +00:00
if !slices.Contains(maps.Keys(files), name) {
2024-05-19 21:24:09 +00:00
return vfs.ErrNotExist
2023-12-25 22:11:03 +00:00
}
2023-12-31 22:54:55 +00:00
file := files[name]
delete(fs.filesCache, name)
2024-01-28 20:22:49 +00:00
tfile, ok := file.(*torrentFile)
if !ok {
2024-05-19 21:24:09 +00:00
return vfs.ErrNotImplemented
2024-01-28 20:22:49 +00:00
}
2024-03-28 13:09:42 +00:00
return fs.Torrent.ExcludeFile(ctx, tfile.file)
2023-10-18 09:52:48 +00:00
}
2024-05-19 21:24:09 +00:00
var _ vfs.File = (*torrentFile)(nil)
2024-03-20 21:47:51 +00:00
type torrentFile struct {
name string
2024-03-30 10:16:13 +00:00
mu sync.RWMutex
2024-03-20 21:47:51 +00:00
tr torrent.Reader
2024-03-30 10:16:13 +00:00
lastReadTimeout atomic.Pointer[time.Time]
2024-03-28 13:09:42 +00:00
2024-03-20 21:47:51 +00:00
file *torrent.File
}
2024-03-30 10:16:13 +00:00
const secondaryTimeout = time.Hour * 24
2024-03-28 13:09:42 +00:00
2024-03-20 21:47:51 +00:00
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
2024-06-14 22:14:44 +00:00
select {
case <-file.Torrent().GotInfo():
break
case <-ctx.Done():
return nil, ctx.Err()
}
2024-03-20 21:47:51 +00:00
r := file.NewReader()
2024-03-28 13:09:42 +00:00
r.SetReadahead(1024 * 1024 * 16) // TODO configurable
2024-04-06 13:51:36 +00:00
_, err := r.ReadContext(ctx, make([]byte, 128))
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed initial file read: %w", err)
}
_, err = r.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("failed seeking to start, after initial read: %w", err)
}
2024-03-20 21:47:51 +00:00
return &torrentFile{
name: name,
tr: r,
file: file,
}, nil
}
2024-03-28 13:09:42 +00:00
// Name implements File.
func (tf *torrentFile) Name() string {
return tf.name
}
// Type implements File.
func (tf *torrentFile) Type() fs.FileMode {
2024-05-19 21:24:09 +00:00
return vfs.ROMode | fs.ModeDir
2024-03-28 13:09:42 +00:00
}
func (tf *torrentFile) Info() (fs.FileInfo, error) {
2024-05-19 21:24:09 +00:00
return vfs.NewFileInfo(tf.name, tf.file.Length()), nil
2021-11-29 10:07:54 +00:00
}
2024-03-20 21:47:51 +00:00
func (tf *torrentFile) Size() int64 {
return tf.file.Length()
}
func (tf *torrentFile) IsDir() bool {
return false
}
func (rw *torrentFile) Close(ctx context.Context) error {
2021-12-01 18:59:21 +00:00
rw.mu.Lock()
defer rw.mu.Unlock()
2024-03-20 21:47:51 +00:00
return rw.tr.Close()
}
2024-06-14 22:14:44 +00:00
func (tf *torrentFile) readTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
lastReadTimeout := tf.lastReadTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true))
return context.WithTimeout(ctx, time.Millisecond)
}
return ctx, func() {}
}
2024-03-20 21:47:51 +00:00
// Read implements ctxio.Reader.
func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
2024-03-28 13:09:42 +00:00
ctx, span := tracer.Start(ctx, "Read",
trace.WithAttributes(attribute.Int("length", len(p))),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
2024-03-30 10:16:13 +00:00
tf.mu.RLock()
defer tf.mu.RUnlock()
2024-03-20 21:47:51 +00:00
2024-06-14 22:14:44 +00:00
ctx, cancel := tf.readTimeout(ctx)
defer cancel()
2024-03-28 13:09:42 +00:00
defer func() {
if err == context.DeadlineExceeded {
2024-03-30 10:16:13 +00:00
now := time.Now()
tf.lastReadTimeout.Store(&now)
2024-03-28 13:09:42 +00:00
}
}()
2024-03-20 21:47:51 +00:00
return tf.tr.ReadContext(ctx, p)
}
2024-03-28 13:09:42 +00:00
func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
ctx, span := tracer.Start(ctx, "ReadAt",
trace.WithAttributes(attribute.Int("length", len(p)), attribute.Int64("offset", off)),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
2024-03-30 10:16:13 +00:00
tf.mu.RLock()
defer tf.mu.RUnlock()
2024-03-28 13:09:42 +00:00
2024-06-14 22:14:44 +00:00
ctx, cancel := tf.readTimeout(ctx)
defer cancel()
2024-03-28 13:09:42 +00:00
defer func() {
if err == context.DeadlineExceeded {
2024-03-30 10:16:13 +00:00
now := time.Now()
tf.lastReadTimeout.Store(&now)
2024-03-28 13:09:42 +00:00
}
}()
_, err = tf.tr.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
// return tf.tr.ReadContext(ctx, p)
n, err = readAtLeast(ctx, tf.tr, p, len(p))
2024-03-20 21:47:51 +00:00
2024-03-28 13:09:42 +00:00
_, err = tf.tr.Seek(0, io.SeekStart)
2021-11-29 10:07:54 +00:00
if err != nil {
return 0, err
}
2021-12-01 18:59:21 +00:00
2024-03-28 13:09:42 +00:00
return n, err
}
2024-03-28 13:09:42 +00:00
func readAtLeast(ctx context.Context, r torrent.Reader, buf []byte, min int) (n int, err error) {
2021-11-29 10:07:54 +00:00
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
2024-03-28 13:09:42 +00:00
nn, err = r.ReadContext(ctx, buf[n:])
2021-11-29 10:07:54 +00:00
n += nn
}
if n >= min {
err = nil
} else if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}