tstor/daemons/qbittorrent/daemon.go
royalcat 0ae11aa283
All checks were successful
docker / build-docker (push) Successful in 3m22s
correct modtime
2024-12-17 12:51:10 +03:00

289 lines
7 KiB
Go

package qbittorrent
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path"
"path/filepath"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
mapset "github.com/deckarep/golang-set/v2"
"github.com/iceber/iouring-go"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel"
)
var trace = otel.Tracer("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent")
type Daemon struct {
proc *os.Process
qb qbittorrent.Client
client *cacheClient
sourceFilesMu sync.Mutex
sourceFiles map[string]string // [sourcePath]infohash
registeredTorrents mapset.Set[string] // infohash list
dataDir string
ur *iouring.IOURing
log *rlog.Logger
}
const defaultConf = `
[LegalNotice]
Accepted=true
[Preferences]
WebUI\LocalHostAuth=false
WebUI\Password_PBKDF2="@ByteArray(qef5I4wZBkDG+PP6/5mQwA==:LoTmorQM/QM5RHI4+dOiu6xfAz9xak6fhR4ZGpRtJF3JNCGG081Yrtva4G71kXz//ODUuWQKTLlrZPuIDvzqUQ==)"
`
func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
ctx := context.Background()
log := rlog.Component("qbittorrent")
binPath := conf.MetadataFolder + "/qbittorrent-nox"
err := downloadLatestQbitRelease(ctx, binPath)
if err != nil {
return nil, err
}
daemonLog := log.WithComponent("process")
outLog := logwrap.NewSlogWriter(ctx, slog.LevelInfo, daemonLog.Slog())
errLog := logwrap.NewSlogWriter(ctx, slog.LevelError, daemonLog.Slog())
_, err = os.Stat(conf.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf")
if errors.Is(err, os.ErrNotExist) {
err = os.MkdirAll(conf.MetadataFolder+"/profile/qBittorrent/config", 0744)
if err != nil {
return nil, err
}
err = os.WriteFile(conf.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644)
if err != nil {
return nil, err
}
}
err = os.MkdirAll(conf.DataFolder, 0744)
if err != nil {
return nil, err
}
const port = 25436
proc, err := runQBittorrent(binPath, conf.MetadataFolder+"/profile", port, outLog, errLog)
if err != nil {
return nil, err
}
time.Sleep(time.Second)
qb, err := qbittorrent.NewClient(ctx, &qbittorrent.Config{
Address: fmt.Sprintf("http://localhost:%d", port),
})
if err != nil {
return nil, err
}
for { // wait for qbittorrent to start
ver, err := qb.Application().Version(ctx)
log.Info(ctx, "qbittorrent started", slog.String("version", ver))
if err == nil {
break
}
log.Warn(ctx, "waiting for qbittorrent to start", rlog.Error(err))
time.Sleep(time.Second)
}
dataDir, err := filepath.Abs(conf.DataFolder)
if err != nil {
return nil, err
}
err = qb.Application().SetPreferences(ctx, &qbittorrent.Preferences{
SavePath: dataDir,
})
if err != nil {
return nil, err
}
ur, err := iouring.New(8, iouring.WithAsync())
if err != nil {
return nil, err
}
return &Daemon{
qb: qb,
proc: proc,
dataDir: conf.DataFolder,
ur: ur,
sourceFiles: make(map[string]string),
registeredTorrents: mapset.NewSet[string](),
client: wrapClient(qb),
log: rlog.Component("qbittorrent"),
}, nil
}
func (d *Daemon) Close(ctx context.Context) error {
err := d.proc.Signal(os.Interrupt)
if err != nil {
return err
}
_, err = d.proc.Wait()
if err != nil {
return err
}
return nil
}
func torrentDataPath(dataDir string, ih string) (string, error) {
return filepath.Abs(path.Join(dataDir, ih))
}
func (fs *Daemon) GetTorrentFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) {
ctx, span := trace.Start(ctx, "GetTorrentFS")
defer span.End()
stat, err := file.Info()
if err != nil {
return nil, err
}
log := fs.log.With(slog.String("file", file.Name()))
ih, err := readInfoHash(ctx, file)
if err != nil {
return nil, err
}
log = log.With(slog.String("infohash", ih.HexString()))
torrentPath, err := torrentDataPath(fs.dataDir, ih.HexString())
if err != nil {
return nil, fmt.Errorf("error getting torrent path: %w", err)
}
log = log.With(slog.String("torrentPath", torrentPath))
log.Debug(ctx, "creating fs for torrent")
err = fs.syncTorrentState(ctx, file, ih, torrentPath)
if err != nil {
return nil, fmt.Errorf("error syncing torrent state: %w", err)
}
fs.sourceFilesMu.Lock()
fs.sourceFiles[sourcePath] = ih.HexString()
fs.sourceFilesMu.Unlock()
return newTorrentFS(ctx, fs.client, file.Name(), ih.HexString(), stat.ModTime(), torrentPath)
}
func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainfo.Hash, torrentPath string) error {
ctx, span := trace.Start(ctx, "syncTorrentState")
defer span.End()
log := d.log.With(slog.String("file", file.Name()), slog.String("infohash", ih.HexString()))
info, err := d.client.getInfo(ctx, ih.HexString())
if err != nil {
return err
}
log = log.With(slog.String("torrentPath", torrentPath))
if info == nil {
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return err
}
data, err := ctxio.ReadAll(ctx, file)
if err != nil {
return err
}
err = d.qb.Torrent().AddNewTorrent(ctx, &qbittorrent.TorrentAddOption{
Torrents: []*qbittorrent.TorrentAddFileMetadata{
{
Data: data,
},
},
SavePath: torrentPath,
// SequentialDownload: "true",
FirstLastPiecePrio: "true",
})
if err != nil {
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
return fmt.Errorf("error adding torrent: %w", err)
}
var props *qbittorrent.TorrentProperties
for {
props, err = d.client.getProperties(ctx, ih.HexString())
if err == nil {
break
}
if errors.Is(err, context.DeadlineExceeded) {
return err
}
log.Error(ctx, "waiting for torrent to be added", rlog.Error(err))
time.Sleep(time.Millisecond * 15)
}
log.Info(ctx, "added torrent", slog.String("infohash", ih.HexString()))
d.registeredTorrents.Add(props.Hash)
return nil
} else {
// info := existing[0]
props, err := d.client.getProperties(ctx, ih.HexString())
if err != nil {
return fmt.Errorf("error getting torrent properties: %w for infohash: %s", err, ih.HexString())
}
d.registeredTorrents.Add(props.Hash)
if props.SavePath != torrentPath {
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)
if err != nil {
return fmt.Errorf("error moving torrent: %w", err)
}
}
return nil
}
}
// TODO caching
func readInfoHash(ctx context.Context, file vfs.File) (infohash.T, error) {
mi, err := metainfo.Load(ctxio.IoReader(ctx, file))
if err != nil {
return infohash.T{}, err
}
info, err := mi.UnmarshalInfo()
if err != nil {
return infohash.T{}, err
}
if info.HasV2() {
ih := infohash_v2.HashBytes(mi.InfoBytes)
return *(&ih).ToShort(), nil
}
return infohash.HashBytes(mi.InfoBytes), nil
}