tstor/src/sources/qbittorrent/fs.go

400 lines
8.4 KiB
Go
Raw Normal View History

package qbittorrent
import (
"context"
2024-09-24 13:26:15 +00:00
"errors"
2024-08-31 23:00:13 +00:00
"fmt"
"io"
"io/fs"
2024-09-24 13:26:15 +00:00
"log/slog"
"os"
"path"
2024-08-31 23:00:13 +00:00
"strings"
2024-09-24 13:26:15 +00:00
"sync"
"time"
2024-08-31 23:00:13 +00:00
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
2024-09-24 13:26:15 +00:00
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/vfs"
)
type FS struct {
2024-09-24 13:26:15 +00:00
mu sync.Mutex
2024-08-31 23:00:13 +00:00
client *cacheClient
name string
hash string
2024-09-24 13:26:15 +00:00
dataDir string // directory where torrent files are stored
2024-08-31 23:00:13 +00:00
2024-09-24 13:26:15 +00:00
entries map[string]fileEntry
log *rlog.Logger
2024-08-31 23:00:13 +00:00
vfs.FilesystemPrototype
}
2024-09-24 13:26:15 +00:00
type fileEntry struct {
fs.FileInfo
Content *qbittorrent.TorrentContent
}
var _ vfs.Filesystem = (*FS)(nil)
2024-08-31 23:00:13 +00:00
func newTorrentFS(ctx context.Context, client *cacheClient, name string, hash string, dataDir string) (*FS, error) {
2024-09-24 13:26:15 +00:00
ctx, span := trace.Start(ctx, "newTorrentFS")
defer span.End()
2024-08-31 23:00:13 +00:00
cnts, err := client.listContent(ctx, hash)
if err != nil {
return nil, fmt.Errorf("failed to list content for hash %s: %w", hash, err)
}
2024-09-24 13:26:15 +00:00
entries := make(map[string]fileEntry, len(cnts))
2024-08-31 23:00:13 +00:00
for _, cnt := range cnts {
2024-09-24 13:26:15 +00:00
if cnt.Priority == qbittorrent.PriorityDoNotDownload {
continue
}
entries[vfs.AbsPath(cnt.Name)] = fileEntry{
Content: cnt,
FileInfo: vfs.NewFileInfo(cnt.Name, cnt.Size),
}
2024-08-31 23:00:13 +00:00
}
return &FS{
2024-08-31 23:00:13 +00:00
client: client,
name: name,
hash: hash,
dataDir: dataDir,
2024-08-31 23:00:13 +00:00
2024-09-24 13:26:15 +00:00
entries: entries,
log: rlog.Component("qbittorrent", "fs"),
2024-08-31 23:00:13 +00:00
FilesystemPrototype: vfs.FilesystemPrototype(name),
}, nil
}
2024-08-31 23:00:13 +00:00
// Open implements vfs.Filesystem.
func (f *FS) Open(ctx context.Context, name string) (vfs.File, error) {
if name == vfs.Separator {
return vfs.NewDirFile(name), nil
}
2024-09-24 13:26:15 +00:00
if entry, ok := f.entries[name]; ok {
return openFile(ctx, f.client, f.dataDir, f.hash, entry.Content)
2024-08-31 23:00:13 +00:00
}
2024-09-24 13:26:15 +00:00
for p := range f.entries {
2024-08-31 23:00:13 +00:00
if strings.HasPrefix(p, name) {
return vfs.NewDirFile(name), nil
}
}
2024-08-31 23:00:13 +00:00
return nil, vfs.ErrNotExist
}
// ReadDir implements vfs.Filesystem.
2024-09-24 13:26:15 +00:00
func (f *FS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
infos := make(map[string]fs.FileInfo, len(f.entries))
for k, v := range f.entries {
infos[k] = v.FileInfo
}
return vfs.ListDirFromInfo(infos, name)
}
// Stat implements vfs.Filesystem.
2024-08-31 23:00:13 +00:00
func (f *FS) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
2024-09-24 13:26:15 +00:00
name = vfs.AbsPath(path.Clean(name))
if vfs.IsRoot(name) {
return vfs.NewDirInfo(f.name), nil
}
if entry, ok := f.entries[name]; ok {
return entry.FileInfo, nil
}
for p := range f.entries {
if strings.HasPrefix(p, name) {
return vfs.NewDirInfo(name), nil
}
2024-08-31 23:00:13 +00:00
}
2024-09-24 13:26:15 +00:00
return nil, vfs.ErrNotExist
}
// Unlink implements vfs.Filesystem.
func (f *FS) Unlink(ctx context.Context, filename string) error {
2024-09-24 13:26:15 +00:00
filename = vfs.AbsPath(path.Clean(filename))
// we cannot delete a torrent itself, cause it will be added on next source scan and all delited files will be restored
if entry, ok := f.entries[filename]; ok {
return f.removeFile(ctx, f.hash, entry.Content)
}
for p, entry := range f.entries {
if strings.HasPrefix(p, filename) {
return f.removeFile(ctx, f.hash, entry.Content)
}
}
return vfs.ErrNotExist
}
func (f *FS) removeFile(ctx context.Context, hash string, content *qbittorrent.TorrentContent) error {
log := f.log.With(slog.String("hash", hash), slog.String("file", content.Name))
f.mu.Lock()
defer f.mu.Unlock()
fpath := vfs.AbsPath(content.Name)
if _, ok := f.entries[fpath]; !ok {
return fmt.Errorf("file %s is does not found", fpath)
}
delete(f.entries, fpath)
err := f.client.qb.Torrent().SetFilePriority(ctx, f.hash, content.Index, qbittorrent.PriorityDoNotDownload)
if err != nil {
return fmt.Errorf("failed to set priority for torrent %s for file %s: %w", hash, content.Name, err)
}
err = os.Remove(path.Join(f.dataDir, vfs.RelPath(content.Name)))
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Warn(ctx, "failed to remove file", rlog.Error(err))
return fmt.Errorf("failed to remove file %s: %w", content.Name, err)
}
return nil
}
2024-08-31 23:00:13 +00:00
func openFile(ctx context.Context, client *cacheClient, torrentDir string, hash string, content *qbittorrent.TorrentContent) (*File, error) {
props, err := client.getProperties(ctx, hash)
if err != nil {
return nil, err
}
return &File{
2024-08-31 23:00:13 +00:00
client: client,
hash: hash,
torrentDir: torrentDir,
filePath: content.Name,
contentIndex: content.Index,
pieceSize: props.PieceSize,
fileSize: content.Size,
offset: 0,
}, nil
}
type File struct {
2024-08-31 23:00:13 +00:00
client *cacheClient
hash string
2024-08-31 23:00:13 +00:00
torrentDir string
filePath string // path inside a torrent directory
contentIndex int
pieceSize int
fileSize int64
offset int64
osfile *os.File
}
var _ vfs.File = (*File)(nil)
// Info implements vfs.File.
func (f *File) Info() (fs.FileInfo, error) {
return &fileInfo{name: path.Base(f.filePath), size: f.fileSize}, nil
}
// IsDir implements vfs.File.
func (f *File) IsDir() bool {
return false
}
// Seek implements vfs.File.
func (f *File) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
f.offset = offset
case io.SeekCurrent:
f.offset += offset
case io.SeekEnd:
f.offset = f.fileSize + offset
}
return f.offset, nil
}
// Name implements vfs.File.
func (f *File) Name() string {
return path.Base(f.filePath)
}
2024-09-24 13:26:15 +00:00
func (f *File) canExpectSoon(ctx context.Context) (bool, error) {
info, err := f.client.getInfo(ctx, f.hash)
if err != nil {
return false, err
}
return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil
}
func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (bool, error) {
startPieceIndex := int(offset / int64(f.pieceSize))
pieceCount := (size + f.pieceSize - 1) / f.pieceSize // rouding up
for i := range pieceCount {
ok, err := f.client.isPieceComplete(ctx, f.hash, startPieceIndex+i)
if err != nil {
return false, err
}
if !ok {
return false, nil
}
}
return true, nil
}
func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error {
complete, err := f.isRangeComplete(ctx, offset, size)
if err != nil {
return err
}
if complete {
return nil
}
canExpectSoon, err := f.canExpectSoon(ctx)
if err != nil {
return err
}
if !canExpectSoon {
return fmt.Errorf("torrent is not downloading")
}
const checkingInterval = 1 * time.Second
ticker := time.NewTicker(checkingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
complete, err := f.isRangeComplete(ctx, offset, size)
if err != nil {
return err
}
if complete {
return nil
}
}
}
}
// Read implements vfs.File.
func (f *File) Read(ctx context.Context, p []byte) (n int, err error) {
2024-09-24 13:26:15 +00:00
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
return 0, err
}
descriptor, err := f.descriptor()
if err != nil {
return 0, err
}
n, err = descriptor.ReadAt(p, f.offset)
f.offset += int64(n)
return n, err
}
// ReadAt implements vfs.File.
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
2024-09-24 13:26:15 +00:00
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
return 0, err
}
descriptor, err := f.descriptor()
if err != nil {
return 0, err
}
return descriptor.ReadAt(p, off)
}
// Size implements vfs.File.
func (f *File) Size() int64 {
return f.fileSize
}
// Type implements vfs.File.
func (f *File) Type() fs.FileMode {
2024-08-31 23:00:13 +00:00
return fs.ModeDir
}
func (f *File) descriptor() (*os.File, error) {
if f.osfile != nil {
return f.osfile, nil
}
2024-08-31 23:00:13 +00:00
osfile, err := os.Open(path.Join(f.torrentDir, f.filePath))
if err != nil {
return nil, err
}
f.osfile = osfile
return f.osfile, nil
}
2024-08-31 23:00:13 +00:00
// Close implements vfs.File.
func (f *File) Close(ctx context.Context) error {
if f.osfile != nil {
err := f.osfile.Close()
f.osfile = nil
return err
}
return nil
}
type fileInfo struct {
name string
size int64
}
var _ fs.FileInfo = (*fileInfo)(nil)
// IsDir implements fs.FileInfo.
func (f *fileInfo) IsDir() bool {
return false
}
// ModTime implements fs.FileInfo.
func (f *fileInfo) ModTime() time.Time {
return time.Time{}
}
// Mode implements fs.FileInfo.
func (f *fileInfo) Mode() fs.FileMode {
return vfs.ROMode
}
// Name implements fs.FileInfo.
func (f *fileInfo) Name() string {
return f.name
}
// Size implements fs.FileInfo.
func (f *fileInfo) Size() int64 {
return f.size
}
// Sys implements fs.FileInfo.
func (f *fileInfo) Sys() any {
return nil
}