package qbittorrent import ( "context" "errors" "fmt" "io" "io/fs" "log/slog" "os" "path" "strings" "sync" "time" "git.kmsign.ru/royalcat/tstor/pkg/qbittorrent" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/pkg/uring" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/iceber/iouring-go" ) type FS struct { mu sync.Mutex client *cacheClient name string hash string dataDir string // directory where torrent files are stored ur *iouring.IOURing entries map[string]fileEntry log *rlog.Logger vfs.FilesystemPrototype } type fileEntry struct { fs.FileInfo Content *qbittorrent.TorrentContent } var _ vfs.Filesystem = (*FS)(nil) func newTorrentFS(ctx context.Context, ur *iouring.IOURing, client *cacheClient, name string, hash string, dataDir string) (*FS, error) { ctx, span := trace.Start(ctx, "newTorrentFS") defer span.End() cnts, err := client.listContent(ctx, hash) if err != nil { return nil, fmt.Errorf("failed to list content for hash %s: %w", hash, err) } entries := make(map[string]fileEntry, len(cnts)) for _, cnt := range cnts { if cnt.Priority == qbittorrent.PriorityDoNotDownload { continue } entries[vfs.AbsPath(cnt.Name)] = fileEntry{ Content: cnt, FileInfo: vfs.NewFileInfo(cnt.Name, cnt.Size), } } return &FS{ client: client, name: name, hash: hash, dataDir: dataDir, entries: entries, ur: ur, log: rlog.Component("qbittorrent", "fs"), FilesystemPrototype: vfs.FilesystemPrototype(name), }, nil } // Open implements vfs.Filesystem. func (f *FS) Open(ctx context.Context, name string) (vfs.File, error) { if name == vfs.Separator { return vfs.NewDirFile(name), nil } if entry, ok := f.entries[name]; ok { return openFile(ctx, f.ur, f.client, f.dataDir, f.hash, entry.Content) } for p := range f.entries { if strings.HasPrefix(p, name) { return vfs.NewDirFile(name), nil } } return nil, vfs.ErrNotExist } // ReadDir implements vfs.Filesystem. func (f *FS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { infos := make(map[string]fs.FileInfo, len(f.entries)) for k, v := range f.entries { infos[k] = v.FileInfo } return vfs.ListDirFromInfo(infos, name) } // Stat implements vfs.Filesystem. func (f *FS) Stat(ctx context.Context, name string) (fs.FileInfo, error) { name = vfs.AbsPath(path.Clean(name)) if vfs.IsRoot(name) { return vfs.NewDirInfo(f.name), nil } if entry, ok := f.entries[name]; ok { return entry.FileInfo, nil } for p := range f.entries { if strings.HasPrefix(p, name) { return vfs.NewDirInfo(name), nil } } return nil, vfs.ErrNotExist } // Unlink implements vfs.Filesystem. func (f *FS) Unlink(ctx context.Context, filename string) error { filename = vfs.AbsPath(path.Clean(filename)) // we cannot delete a torrent itself, cause it will be added on next source scan and all delited files will be restored if entry, ok := f.entries[filename]; ok { return f.removeFile(ctx, f.hash, entry.Content) } for p, entry := range f.entries { if strings.HasPrefix(p, filename) { return f.removeFile(ctx, f.hash, entry.Content) } } return vfs.ErrNotExist } func (f *FS) Rename(ctx context.Context, oldpath string, newpath string) error { oldpath = vfs.AbsPath(path.Clean(oldpath)) newpath = vfs.AbsPath(path.Clean(newpath)) if _, ok := f.entries[oldpath]; ok { err := f.client.qb.Torrent().RenameFile(ctx, f.hash, vfs.RelPath(oldpath), vfs.RelPath(newpath)) if err != nil { return fmt.Errorf("failed to rename file %s to %s: %w", oldpath, newpath, err) } f.mu.Lock() defer f.mu.Unlock() f.entries[newpath] = f.entries[oldpath] return nil } return vfs.ErrNotExist } func (f *FS) removeFile(ctx context.Context, hash string, content *qbittorrent.TorrentContent) error { log := f.log.With(slog.String("hash", hash), slog.String("file", content.Name)) f.mu.Lock() defer f.mu.Unlock() fpath := vfs.AbsPath(content.Name) if _, ok := f.entries[fpath]; !ok { return fmt.Errorf("file %s is does not found", fpath) } delete(f.entries, fpath) err := f.client.qb.Torrent().SetFilePriority(ctx, f.hash, content.Index, qbittorrent.PriorityDoNotDownload) if err != nil { return fmt.Errorf("failed to set priority for torrent %s for file %s: %w", hash, content.Name, err) } err = os.Remove(path.Join(f.dataDir, vfs.RelPath(content.Name))) if err != nil && !errors.Is(err, fs.ErrNotExist) { log.Warn(ctx, "failed to remove file", rlog.Error(err)) return fmt.Errorf("failed to remove file %s: %w", content.Name, err) } return nil } func openFile(ctx context.Context, ur *iouring.IOURing, client *cacheClient, torrentDir string, hash string, content *qbittorrent.TorrentContent) (*File, error) { props, err := client.getProperties(ctx, hash) if err != nil { return nil, err } // FIXME error when file not started downloading file, err := os.OpenFile(path.Join(torrentDir, content.Name), os.O_RDONLY, 0) if err != nil { return nil, err } return &File{ client: client, hash: hash, torrentDir: torrentDir, filePath: content.Name, contentIndex: content.Index, pieceSize: props.PieceSize, fileSize: content.Size, file: uring.NewFile(ur, file), offset: 0, }, nil } type File struct { client *cacheClient hash string torrentDir string filePath string // path inside a torrent directory contentIndex int pieceSize int fileSize int64 mu sync.Mutex file *uring.File offset int64 } var _ vfs.File = (*File)(nil) // Info implements vfs.File. func (f *File) Info() (fs.FileInfo, error) { return &fileInfo{name: path.Base(f.filePath), size: f.fileSize}, nil } // IsDir implements vfs.File. func (f *File) IsDir() bool { return false } // Seek implements vfs.File. func (f *File) Seek(offset int64, whence int) (int64, error) { switch whence { case io.SeekStart: f.offset = offset case io.SeekCurrent: f.offset += offset case io.SeekEnd: f.offset = f.fileSize + offset } return f.offset, nil } // Name implements vfs.File. func (f *File) Name() string { return path.Base(f.filePath) } func (f *File) canExpectSoon(ctx context.Context) (bool, error) { info, err := f.client.getInfo(ctx, f.hash) if err != nil { return false, err } return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil } func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (bool, error) { startPieceIndex := int(offset / int64(f.pieceSize)) pieceCount := (size + f.pieceSize - 1) / f.pieceSize // rouding up for i := range pieceCount { ok, err := f.client.isPieceComplete(ctx, f.hash, startPieceIndex+i) if err != nil { return false, err } if !ok { return false, nil } } return true, nil } func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error { complete, err := f.isRangeComplete(ctx, offset, size) if err != nil { return err } if complete { return nil } canExpectSoon, err := f.canExpectSoon(ctx) if err != nil { return err } if !canExpectSoon { return fmt.Errorf("torrent is not downloading") } const checkingInterval = 1 * time.Second ticker := time.NewTicker(checkingInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: complete, err := f.isRangeComplete(ctx, offset, size) if err != nil { return err } if complete { return nil } } } } // Read implements vfs.File. func (f *File) Read(ctx context.Context, p []byte) (int, error) { f.mu.Lock() defer f.mu.Unlock() if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil { return 0, err } n, err := f.file.ReadAt(ctx, p, f.offset) f.offset += int64(n) return n, err } // ReadAt implements vfs.File. func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil { return 0, err } return f.file.ReadAt(ctx, p, off) } // Size implements vfs.File. func (f *File) Size() int64 { return f.fileSize } // Type implements vfs.File. func (f *File) Type() fs.FileMode { return fs.ModeDir } // Close implements vfs.File. func (f *File) Close(ctx context.Context) error { return f.file.Close(ctx) } type fileInfo struct { name string size int64 } var _ fs.FileInfo = (*fileInfo)(nil) // IsDir implements fs.FileInfo. func (f *fileInfo) IsDir() bool { return false } // ModTime implements fs.FileInfo. func (f *fileInfo) ModTime() time.Time { return time.Time{} } // Mode implements fs.FileInfo. func (f *fileInfo) Mode() fs.FileMode { return vfs.ROMode } // Name implements fs.FileInfo. func (f *fileInfo) Name() string { return f.name } // Size implements fs.FileInfo. func (f *fileInfo) Size() int64 { return f.size } // Sys implements fs.FileInfo. func (f *fileInfo) Sys() any { return nil }