package qbittorrent import ( "context" "errors" "fmt" "io" "log/slog" "os" "path" "path/filepath" "sync" "time" "git.kmsign.ru/royalcat/tstor/pkg/qbittorrent" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/daemon" "git.kmsign.ru/royalcat/tstor/src/logwrap" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/types/infohash" infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" mapset "github.com/deckarep/golang-set/v2" "github.com/iceber/iouring-go" "github.com/knadh/koanf/v2" "github.com/royalcat/ctxio" "go.opentelemetry.io/otel" ) var trace = otel.Tracer("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent") type Daemon struct { proc *os.Process qb qbittorrent.Client client *cacheClient sourceFilesMu sync.Mutex sourceFiles map[string]string // [sourcePath]infohash registeredTorrents mapset.Set[string] // infohash list dataDir string ur *iouring.IOURing log *rlog.Logger } const defaultConf = ` [LegalNotice] Accepted=true [Preferences] WebUI\LocalHostAuth=false WebUI\Password_PBKDF2="@ByteArray(qef5I4wZBkDG+PP6/5mQwA==:LoTmorQM/QM5RHI4+dOiu6xfAz9xak6fhR4ZGpRtJF3JNCGG081Yrtva4G71kXz//ODUuWQKTLlrZPuIDvzqUQ==)" ` const DaemonName = "qbittorrent" var _ daemon.DaemonConstructor = NewDaemon func NewDaemon(ctx context.Context, koanf *koanf.Koanf) (daemon.Daemon, error) { log := rlog.Component(DaemonName) config, err := loadConfig(koanf) if err != nil { return nil, err } binPath := config.MetadataFolder + "/qbittorrent-nox" err = downloadLatestQbitRelease(ctx, binPath) if err != nil { return nil, err } daemonLog := log.WithComponent("process") outLog := logwrap.NewSlogWriter(ctx, slog.LevelInfo, daemonLog.Slog()) errLog := logwrap.NewSlogWriter(ctx, slog.LevelError, daemonLog.Slog()) _, err = os.Stat(config.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf") if errors.Is(err, os.ErrNotExist) { err = os.MkdirAll(config.MetadataFolder+"/profile/qBittorrent/config", 0744) if err != nil { return nil, err } err = os.WriteFile(config.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644) if err != nil { return nil, err } } err = os.MkdirAll(config.DataFolder, 0744) if err != nil { return nil, err } const port = 25436 proc, err := runQBittorrent(binPath, config.MetadataFolder+"/profile", port, outLog, errLog) if err != nil { return nil, err } time.Sleep(time.Second) qb, err := qbittorrent.NewClient(ctx, &qbittorrent.Config{ Address: fmt.Sprintf("http://localhost:%d", port), }) if err != nil { return nil, err } for { // wait for qbittorrent to start ver, err := qb.Application().Version(ctx) log.Info(ctx, "qbittorrent started", slog.String("version", ver)) if err == nil { break } log.Warn(ctx, "waiting for qbittorrent to start", rlog.Error(err)) time.Sleep(time.Second) } dataDir, err := filepath.Abs(config.DataFolder) if err != nil { return nil, err } err = qb.Application().SetPreferences(ctx, &qbittorrent.Preferences{ SavePath: dataDir, }) if err != nil { return nil, err } ur, err := iouring.New(8, iouring.WithAsync()) if err != nil { return nil, err } return &Daemon{ qb: qb, proc: proc, dataDir: config.DataFolder, ur: ur, sourceFiles: make(map[string]string), registeredTorrents: mapset.NewSet[string](), client: wrapClient(qb), log: rlog.Component(DaemonName), }, nil } func (d *Daemon) Name() string { return DaemonName } func (d *Daemon) Extensions() []string { return []string{".torrent"} } func (d *Daemon) Close(ctx context.Context) error { err := d.proc.Signal(os.Interrupt) if err != nil { return err } _, err = d.proc.Wait() if err != nil { return err } return nil } func torrentDataPath(dataDir string, ih string) (string, error) { return filepath.Abs(path.Join(dataDir, ih)) } func (fs *Daemon) GetFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) { ctx, span := trace.Start(ctx, "GetTorrentFS") defer span.End() stat, err := file.Info() if err != nil { return nil, err } log := fs.log.With(slog.String("file", file.Name())) ih, err := readInfoHash(ctx, file) if err != nil { return nil, err } log = log.With(slog.String("infohash", ih.HexString())) torrentPath, err := torrentDataPath(fs.dataDir, ih.HexString()) if err != nil { return nil, fmt.Errorf("error getting torrent path: %w", err) } log = log.With(slog.String("torrentPath", torrentPath)) log.Debug(ctx, "creating fs for torrent") err = fs.syncTorrentState(ctx, file, ih, torrentPath) if err != nil { return nil, fmt.Errorf("error syncing torrent state: %w", err) } fs.sourceFilesMu.Lock() fs.sourceFiles[sourcePath] = ih.HexString() fs.sourceFilesMu.Unlock() return newTorrentFS(ctx, fs.client, file.Name(), ih.HexString(), stat.ModTime(), torrentPath) } func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainfo.Hash, torrentPath string) error { ctx, span := trace.Start(ctx, "syncTorrentState") defer span.End() log := d.log.With(slog.String("file", file.Name()), slog.String("infohash", ih.HexString())) info, err := d.client.getInfo(ctx, ih.HexString()) if err != nil { return err } log = log.With(slog.String("torrentPath", torrentPath)) if info == nil { _, err := file.Seek(0, io.SeekStart) if err != nil { return err } data, err := ctxio.ReadAll(ctx, file) if err != nil { return err } err = d.qb.Torrent().AddNewTorrent(ctx, &qbittorrent.TorrentAddOption{ Torrents: []*qbittorrent.TorrentAddFileMetadata{ { Data: data, }, }, SavePath: torrentPath, // SequentialDownload: "true", FirstLastPiecePrio: "true", }) if err != nil { d.log.Error(ctx, "error adding torrent", rlog.Error(err)) return fmt.Errorf("error adding torrent: %w", err) } var props *qbittorrent.TorrentProperties for { props, err = d.client.getProperties(ctx, ih.HexString()) if err == nil { break } if errors.Is(err, context.DeadlineExceeded) { return err } log.Error(ctx, "waiting for torrent to be added", rlog.Error(err)) time.Sleep(time.Millisecond * 15) } log.Info(ctx, "added torrent", slog.String("infohash", ih.HexString())) d.registeredTorrents.Add(props.Hash) return nil } else { // info := existing[0] props, err := d.client.getProperties(ctx, ih.HexString()) if err != nil { return fmt.Errorf("error getting torrent properties: %w for infohash: %s", err, ih.HexString()) } d.registeredTorrents.Add(props.Hash) if props.SavePath != torrentPath { log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath)) err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath) if err != nil { return fmt.Errorf("error moving torrent: %w", err) } } return nil } } // TODO caching func readInfoHash(ctx context.Context, file vfs.File) (infohash.T, error) { mi, err := metainfo.Load(ctxio.IoReader(ctx, file)) if err != nil { return infohash.T{}, err } info, err := mi.UnmarshalInfo() if err != nil { return infohash.T{}, err } if info.HasV2() { ih := infohash_v2.HashBytes(mi.InfoBytes) return *(&ih).ToShort(), nil } return infohash.HashBytes(mi.InfoBytes), nil }