package atorrent import ( "context" "fmt" "io" "io/fs" "log/slog" "path" "slices" "strings" "sync" "sync/atomic" "time" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/anacrolix/torrent" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" ) type TorrentFS struct { name string Torrent *Controller filesCacheMu sync.Mutex filesCache map[string]vfs.File lastTorrentReadTimeout atomic.Pointer[time.Time] resolver *vfs.Resolver } var _ vfs.Filesystem = (*TorrentFS)(nil) const shortTimeout = time.Millisecond const lowTimeout = time.Second * 5 func (s *Daemon) NewTorrentFs(ctx context.Context, _ string, f vfs.File) (vfs.Filesystem, error) { c, err := s.loadTorrent(ctx, f) if err != nil { return nil, err } if err := f.Close(ctx); err != nil { s.log.Error(ctx, "failed to close file", slog.String("name", f.Name()), rlog.Error(err)) } return &TorrentFS{ name: f.Name(), Torrent: c, resolver: vfs.NewResolver(vfs.ArchiveFactories), }, nil } var _ fs.DirEntry = (*TorrentFS)(nil) // Name implements fs.DirEntry. func (tfs *TorrentFS) Name() string { return tfs.name } // Info implements fs.DirEntry. func (tfs *TorrentFS) Info() (fs.FileInfo, error) { return tfs, nil } // IsDir implements fs.DirEntry. func (tfs *TorrentFS) IsDir() bool { return true } // Type implements fs.DirEntry. func (tfs *TorrentFS) Type() fs.FileMode { return fs.ModeDir } // ModTime implements fs.FileInfo. func (tfs *TorrentFS) ModTime() time.Time { return time.Time{} } // Mode implements fs.FileInfo. func (tfs *TorrentFS) Mode() fs.FileMode { return fs.ModeDir } // Size implements fs.FileInfo. func (tfs *TorrentFS) Size() int64 { return 0 } // Sys implements fs.FileInfo. func (tfs *TorrentFS) Sys() any { return nil } // FsName implements Filesystem. func (tfs *TorrentFS) FsName() string { return "torrentfs" } func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) { fs.filesCacheMu.Lock() defer fs.filesCacheMu.Unlock() if fs.filesCache != nil { return fs.filesCache, nil } ctx, span := tracer.Start(ctx, "files", fs.traceAttrs()) defer span.End() files, err := fs.Torrent.Files(ctx) if err != nil { return nil, err } fs.filesCache = make(map[string]vfs.File) for _, file := range files { props, err := file.Properties(ctx) if err != nil { return nil, err } if props.Excluded { continue } p := vfs.AbsPath(file.Path()) tf, err := openTorrentFile(ctx, path.Base(p), file.file, &fs.lastTorrentReadTimeout) if err != nil { return nil, err } fs.filesCache[p] = tf } // TODO optional // if len(fs.filesCache) == 1 && fs.resolver.IsNestedFs(fs.Torrent.Name()) { // filepath := "/" + fs.Torrent.Name() // if file, ok := fs.filesCache[filepath]; ok { // nestedFs, err := fs.resolver.NestedFs(ctx, filepath, file) // if err != nil { // return nil, err // } // if nestedFs == nil { // goto DEFAULT_DIR // FIXME // } // fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/") // if err != nil { // return nil, err // } // return fs.filesCache, nil // } // } // DEFAULT_DIR: rootDir := "/" + fs.Torrent.Name() + "/" singleDir := true for k, _ := range fs.filesCache { if !strings.HasPrefix(k, rootDir) { singleDir = false } } if singleDir { for k, f := range fs.filesCache { delete(fs.filesCache, k) k, _ = strings.CutPrefix(k, rootDir) k = vfs.AbsPath(k) fs.filesCache[k] = f } } return fs.filesCache, nil } func listFilesRecursive(ctx context.Context, fs vfs.Filesystem, start string) (map[string]vfs.File, error) { out := make(map[string]vfs.File, 0) entries, err := fs.ReadDir(ctx, start) if err != nil { return nil, err } for _, entry := range entries { filename := path.Join(start, entry.Name()) if entry.IsDir() { rec, err := listFilesRecursive(ctx, fs, filename) if err != nil { return nil, err } maps.Copy(out, rec) } else { file, err := fs.Open(ctx, filename) if err != nil { return nil, err } out[filename] = file } } return out, nil } func (fs *TorrentFS) rawOpen(ctx context.Context, filename string) (file vfs.File, err error) { ctx, span := tracer.Start(ctx, "rawOpen", fs.traceAttrs(attribute.String("filename", filename)), ) defer func() { if err != nil { span.RecordError(err) } span.End() }() files, err := fs.files(ctx) if err != nil { return nil, err } file, err = vfs.GetFile(files, filename) return file, err } func (fs *TorrentFS) rawStat(ctx context.Context, filename string) (fs.FileInfo, error) { ctx, span := tracer.Start(ctx, "rawStat", fs.traceAttrs(attribute.String("filename", filename)), ) defer span.End() files, err := fs.files(ctx) if err != nil { return nil, err } file, err := vfs.GetFile(files, filename) if err != nil { return nil, err } return file.Info() } func (fs *TorrentFS) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption { return trace.WithAttributes(append([]attribute.KeyValue{ attribute.String("fs", fs.FsName()), attribute.String("torrent", fs.Torrent.Name()), attribute.String("infohash", fs.Torrent.InfoHash()), }, add...)...) } func (tfs *TorrentFS) readContext(ctx context.Context) (context.Context, context.CancelFunc) { lastReadTimeout := tfs.lastTorrentReadTimeout.Load() if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true)) return context.WithTimeout(ctx, shortTimeout) } return ctx, func() {} } // Stat implements Filesystem. func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { ctx, span := tracer.Start(ctx, "Stat", tfs.traceAttrs(attribute.String("filename", filename)), ) defer span.End() if vfs.IsRoot(filename) { return tfs, nil } var err error ctx, cancel := tfs.readContext(ctx) defer func() { cancel() if err == context.DeadlineExceeded { now := time.Now() tfs.lastTorrentReadTimeout.Store(&now) } }() fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { return nestedFs.Stat(ctx, nestedFsPath) } return tfs.rawStat(ctx, fsPath) } func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file vfs.File, err error) { ctx, span := tracer.Start(ctx, "Open", tfs.traceAttrs(attribute.String("filename", filename)), ) defer span.End() if vfs.IsRoot(filename) { return vfs.NewDirFile(tfs.name), nil } ctx, cancel := tfs.readContext(ctx) defer func() { cancel() if err == context.DeadlineExceeded { now := time.Now() tfs.lastTorrentReadTimeout.Store(&now) } }() fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { return nestedFs.Open(ctx, nestedFsPath) } return tfs.rawOpen(ctx, fsPath) } func (tfs *TorrentFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { ctx, span := tracer.Start(ctx, "ReadDir", tfs.traceAttrs(attribute.String("name", name)), ) defer span.End() var err error ctx, cancel := tfs.readContext(ctx) defer func() { cancel() if err == context.DeadlineExceeded { now := time.Now() tfs.lastTorrentReadTimeout.Store(&now) } }() fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, name, tfs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { return nestedFs.ReadDir(ctx, nestedFsPath) } files, err := tfs.files(ctx) if err != nil { return nil, err } return vfs.ListDirFromFiles(files, fsPath) } func (fs *TorrentFS) Unlink(ctx context.Context, name string) error { ctx, span := tracer.Start(ctx, "Unlink", fs.traceAttrs(attribute.String("name", name)), ) defer span.End() name = vfs.AbsPath(name) files, err := fs.files(ctx) if err != nil { return err } if !slices.Contains(maps.Keys(files), name) { return vfs.ErrNotExist } file := files[name] fs.filesCacheMu.Lock() delete(fs.filesCache, name) fs.filesCacheMu.Unlock() tfile, ok := file.(*torrentFile) if !ok { return vfs.ErrNotImplemented } return fs.Torrent.ExcludeFile(ctx, tfile.file) } // Rename implements vfs.Filesystem. func (s *TorrentFS) Rename(ctx context.Context, oldpath string, newpath string) error { return vfs.ErrNotImplemented } var _ vfs.File = (*torrentFile)(nil) type torrentFile struct { name string mu sync.RWMutex tr torrent.Reader lastReadTimeout atomic.Pointer[time.Time] lastTorrentReadTimeout *atomic.Pointer[time.Time] file *torrent.File } const secondaryTimeout = time.Hour * 24 func openTorrentFile(ctx context.Context, name string, file *torrent.File, lastTorrentReadTimeout *atomic.Pointer[time.Time]) (*torrentFile, error) { select { case <-file.Torrent().GotInfo(): break case <-ctx.Done(): return nil, ctx.Err() } r := file.NewReader() _, err := r.ReadContext(ctx, make([]byte, 128)) if err != nil && err != io.EOF { return nil, fmt.Errorf("failed initial file read: %w", err) } _, err = r.Seek(0, io.SeekStart) if err != nil { return nil, fmt.Errorf("failed seeking to start, after initial read: %w", err) } return &torrentFile{ name: name, tr: r, file: file, lastTorrentReadTimeout: lastTorrentReadTimeout, }, nil } // Name implements File. func (tf *torrentFile) Name() string { return tf.name } // Seek implements vfs.File. func (tf *torrentFile) Seek(offset int64, whence int) (int64, error) { tf.mu.Lock() defer tf.mu.Unlock() return tf.tr.Seek(offset, whence) } // Type implements File. func (tf *torrentFile) Type() fs.FileMode { return vfs.ModeFileRO | fs.ModeDir } func (tf *torrentFile) Info() (fs.FileInfo, error) { return vfs.NewFileInfo(tf.name, tf.file.Length(), time.Time{}), nil } func (tf *torrentFile) Size() int64 { return tf.file.Length() } func (tf *torrentFile) IsDir() bool { return false } func (rw *torrentFile) Close(ctx context.Context) error { rw.mu.Lock() defer rw.mu.Unlock() return rw.tr.Close() } func (tf *torrentFile) readTimeout(ctx context.Context) (context.Context, context.CancelFunc) { lastReadTimeout := tf.lastReadTimeout.Load() if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true)) return context.WithTimeout(ctx, shortTimeout) } lastTorrentReadTimeout := tf.lastTorrentReadTimeout.Load() if lastTorrentReadTimeout != nil && time.Since(*lastTorrentReadTimeout) < secondaryTimeout { // make short timeout for already faliled files trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("low_timeout", true)) return context.WithTimeout(ctx, lowTimeout) } return ctx, func() {} } // Read implements ctxio.Reader. func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) { ctx, span := tracer.Start(ctx, "Read", trace.WithAttributes(attribute.Int("length", len(p))), ) defer func() { span.SetAttributes(attribute.Int("read", n)) span.End() }() tf.mu.Lock() defer tf.mu.Unlock() ctx, cancel := tf.readTimeout(ctx) defer cancel() defer func() { if err == context.DeadlineExceeded { now := time.Now() tf.lastReadTimeout.Store(&now) tf.lastTorrentReadTimeout.Store(&now) } }() return tf.tr.ReadContext(ctx, p) } func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { ctx, span := tracer.Start(ctx, "ReadAt", trace.WithAttributes(attribute.Int("length", len(p)), attribute.Int64("offset", off)), ) defer func() { span.SetAttributes(attribute.Int("read", n)) span.End() }() tf.mu.RLock() defer tf.mu.RUnlock() ctx, cancel := tf.readTimeout(ctx) defer cancel() defer func() { if err == context.DeadlineExceeded { now := time.Now() tf.lastReadTimeout.Store(&now) tf.lastTorrentReadTimeout.Store(&now) } }() _, err = tf.tr.Seek(off, io.SeekStart) if err != nil { return 0, err } // return tf.tr.ReadContext(ctx, p) n, err = readAtLeast(ctx, tf.tr, p, len(p)) _, err = tf.tr.Seek(0, io.SeekStart) if err != nil { return 0, err } return n, err } func readAtLeast(ctx context.Context, r torrent.Reader, buf []byte, min int) (n int, err error) { if len(buf) < min { return 0, io.ErrShortBuffer } for n < min && err == nil { var nn int nn, err = r.ReadContext(ctx, buf[n:]) n += nn } if n >= min { err = nil } else if n > 0 && err == io.EOF { err = io.ErrUnexpectedEOF } return }