[wip] daemon separation
This commit is contained in:
parent
98ee1dc6f1
commit
fa084118c3
48 changed files with 48 additions and 35 deletions
daemons/qbittorrent
27
daemons/qbittorrent/api.go
Normal file
27
daemons/qbittorrent/api.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
)
|
||||
|
||||
func (d *Daemon) ListTorrents(ctx context.Context) ([]*qbittorrent.TorrentInfo, error) {
|
||||
return d.client.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{})
|
||||
}
|
||||
|
||||
func (d *Daemon) SourceFiles(ctx context.Context, hash string) ([]string, error) {
|
||||
d.sourceFilesMu.Lock()
|
||||
defer d.sourceFilesMu.Unlock()
|
||||
|
||||
out := make([]string, 0, 1)
|
||||
for k, h := range d.sourceFiles {
|
||||
if h != hash {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, k)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
66
daemons/qbittorrent/cleanup.go
Normal file
66
daemons/qbittorrent/cleanup.go
Normal file
|
@ -0,0 +1,66 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
)
|
||||
|
||||
func (d *Daemon) Cleanup(ctx context.Context, run bool) ([]string, error) {
|
||||
d.log.Info(ctx, "cleanup started")
|
||||
|
||||
torrentInfos, err := d.client.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{})
|
||||
if err != nil {
|
||||
d.log.Error(ctx, "failed to get torrents", rlog.Error(err))
|
||||
return nil, fmt.Errorf("failed to get torrents: %w", err)
|
||||
}
|
||||
|
||||
torrentToDelete := make([]string, 0, 5)
|
||||
|
||||
for _, info := range torrentInfos {
|
||||
if d.registeredTorrents.Contains(info.Hash) {
|
||||
continue
|
||||
}
|
||||
|
||||
d.log.Info(ctx, "torrent not found in registry", slog.String("infohash", info.Hash))
|
||||
torrentToDelete = append(torrentToDelete, info.Hash)
|
||||
}
|
||||
|
||||
d.log.Info(ctx, "marked torrents to delete",
|
||||
slog.Int("count", len(torrentToDelete)),
|
||||
slog.Any("infohashes", torrentToDelete),
|
||||
)
|
||||
|
||||
if !run {
|
||||
d.log.Info(ctx, "dry run, skipping deletion")
|
||||
return torrentToDelete, nil
|
||||
}
|
||||
|
||||
err = d.client.qb.Torrent().DeleteTorrents(ctx, torrentToDelete, true)
|
||||
if err != nil {
|
||||
d.log.Error(ctx, "failed to delete torrents", slog.Any("infohashes", torrentToDelete), rlog.Error(err))
|
||||
return nil, fmt.Errorf("failed to delete torrents: %w", err)
|
||||
}
|
||||
d.log.Info(ctx, "torrents deleted from qbittorrent", slog.Int("count", len(torrentToDelete)))
|
||||
|
||||
for _, hash := range torrentToDelete {
|
||||
torrentPath := path.Join(d.dataDir, hash)
|
||||
_, err := os.Stat(torrentPath)
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
d.log.Error(ctx, "failed to get torrent path", slog.String("path", torrentPath), rlog.Error(err))
|
||||
continue
|
||||
}
|
||||
d.log.Warn(ctx, "leftover data for torrent detected, cleaning up", slog.String("infohash", hash), slog.String("path", torrentPath))
|
||||
}
|
||||
|
||||
return torrentToDelete, nil
|
||||
}
|
164
daemons/qbittorrent/client.go
Normal file
164
daemons/qbittorrent/client.go
Normal file
|
@ -0,0 +1,164 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/royalcat/btrgo/btrsync"
|
||||
)
|
||||
|
||||
type cacheClient struct {
|
||||
qb qbittorrent.Client
|
||||
|
||||
propertiesCache *expirable.LRU[string, qbittorrent.TorrentProperties]
|
||||
torrentsCache *expirable.LRU[string, qbittorrent.TorrentInfo]
|
||||
|
||||
pieceCache btrsync.MapOf[pieceKey, int]
|
||||
}
|
||||
|
||||
type pieceKey struct {
|
||||
hash string
|
||||
index int
|
||||
}
|
||||
|
||||
func wrapClient(qb qbittorrent.Client) *cacheClient {
|
||||
|
||||
const (
|
||||
cacheSize = 5000
|
||||
cacheTTL = time.Minute
|
||||
)
|
||||
|
||||
return &cacheClient{
|
||||
qb: qb,
|
||||
propertiesCache: expirable.NewLRU[string, qbittorrent.TorrentProperties](cacheSize, nil, cacheTTL),
|
||||
torrentsCache: expirable.NewLRU[string, qbittorrent.TorrentInfo](cacheSize, nil, cacheTTL),
|
||||
pieceCache: btrsync.MapOf[pieceKey, int]{},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *cacheClient) getInfo(ctx context.Context, hash string) (*qbittorrent.TorrentInfo, error) {
|
||||
if v, ok := f.torrentsCache.Get(hash); ok {
|
||||
return &v, nil
|
||||
}
|
||||
|
||||
infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
|
||||
Hashes: []string{hash},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error to check torrent existence: %w", err)
|
||||
}
|
||||
|
||||
if len(infos) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if len(infos) > 1 {
|
||||
return nil, fmt.Errorf("multiple torrents with the same hash")
|
||||
}
|
||||
|
||||
f.torrentsCache.Add(hash, *infos[0])
|
||||
|
||||
return infos[0], nil
|
||||
}
|
||||
|
||||
func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) {
|
||||
if v, ok := f.propertiesCache.Get(hash); ok {
|
||||
return &v, nil
|
||||
}
|
||||
|
||||
info, err := f.qb.Torrent().GetProperties(ctx, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f.propertiesCache.Add(hash, *info)
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (f *cacheClient) listContent(ctx context.Context, hash string) ([]*qbittorrent.TorrentContent, error) {
|
||||
contents, err := f.qb.Torrent().GetContents(ctx, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return contents, nil
|
||||
}
|
||||
|
||||
func (f *cacheClient) getContent(ctx context.Context, hash string, contentIndex int) (*qbittorrent.TorrentContent, error) {
|
||||
contents, err := f.qb.Torrent().GetContents(ctx, hash, contentIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
contentI := slices.IndexFunc(contents, func(c *qbittorrent.TorrentContent) bool {
|
||||
return c.Index == contentIndex
|
||||
})
|
||||
if contentI == -1 {
|
||||
return nil, fmt.Errorf("content not found")
|
||||
}
|
||||
|
||||
return contents[contentI], nil
|
||||
}
|
||||
|
||||
func (f *cacheClient) isPieceComplete(ctx context.Context, hash string, pieceIndex int) (bool, error) {
|
||||
cachedPieceState, ok := f.pieceCache.Load(pieceKey{hash: hash, index: pieceIndex})
|
||||
if ok && cachedPieceState == 2 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
completion, err := f.qb.Torrent().GetPiecesStates(ctx, hash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for i, v := range completion {
|
||||
f.pieceCache.Store(pieceKey{hash: hash, index: i}, v)
|
||||
}
|
||||
|
||||
if completion[pieceIndex] == 2 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (f *cacheClient) waitPieceToComplete(ctx context.Context, hash string, pieceIndex int) error {
|
||||
const checkingInterval = 1 * time.Second
|
||||
|
||||
ok, err := f.isPieceComplete(ctx, hash, pieceIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if deadline, ok := ctx.Deadline(); ok && time.Until(deadline) < checkingInterval {
|
||||
return context.DeadlineExceeded
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(checkingInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
ok, err := f.isPieceComplete(ctx, hash, pieceIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if deadline, ok := ctx.Deadline(); ok && time.Until(deadline) < checkingInterval {
|
||||
return context.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
284
daemons/qbittorrent/daemon.go
Normal file
284
daemons/qbittorrent/daemon.go
Normal file
|
@ -0,0 +1,284 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/logwrap"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/types/infohash"
|
||||
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
|
||||
mapset "github.com/deckarep/golang-set/v2"
|
||||
"github.com/iceber/iouring-go"
|
||||
"github.com/royalcat/ctxio"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
var trace = otel.Tracer("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent")
|
||||
|
||||
type Daemon struct {
|
||||
proc *os.Process
|
||||
qb qbittorrent.Client
|
||||
client *cacheClient
|
||||
|
||||
sourceFilesMu sync.Mutex
|
||||
sourceFiles map[string]string // [sourcePath]infohash
|
||||
|
||||
registeredTorrents mapset.Set[string] // infohash list
|
||||
|
||||
dataDir string
|
||||
ur *iouring.IOURing
|
||||
log *rlog.Logger
|
||||
}
|
||||
|
||||
const defaultConf = `
|
||||
[LegalNotice]
|
||||
Accepted=true
|
||||
|
||||
[Preferences]
|
||||
WebUI\LocalHostAuth=false
|
||||
WebUI\Password_PBKDF2="@ByteArray(qef5I4wZBkDG+PP6/5mQwA==:LoTmorQM/QM5RHI4+dOiu6xfAz9xak6fhR4ZGpRtJF3JNCGG081Yrtva4G71kXz//ODUuWQKTLlrZPuIDvzqUQ==)"
|
||||
`
|
||||
|
||||
func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
|
||||
ctx := context.Background()
|
||||
log := rlog.Component("qbittorrent")
|
||||
|
||||
binPath := conf.MetadataFolder + "/qbittorrent-nox"
|
||||
err := downloadLatestQbitRelease(ctx, binPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
daemonLog := log.WithComponent("process")
|
||||
outLog := logwrap.NewSlogWriter(ctx, slog.LevelInfo, daemonLog.Slog())
|
||||
errLog := logwrap.NewSlogWriter(ctx, slog.LevelError, daemonLog.Slog())
|
||||
|
||||
_, err = os.Stat(conf.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf")
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
err = os.MkdirAll(conf.MetadataFolder+"/profile/qBittorrent/config", 0744)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = os.WriteFile(conf.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = os.MkdirAll(conf.DataFolder, 0744)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
const port = 25436
|
||||
|
||||
proc, err := runQBittorrent(binPath, conf.MetadataFolder+"/profile", port, outLog, errLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
qb, err := qbittorrent.NewClient(ctx, &qbittorrent.Config{
|
||||
Address: fmt.Sprintf("http://localhost:%d", port),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for { // wait for qbittorrent to start
|
||||
ver, err := qb.Application().Version(ctx)
|
||||
log.Info(ctx, "qbittorrent started", slog.String("version", ver))
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Warn(ctx, "waiting for qbittorrent to start", rlog.Error(err))
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
dataDir, err := filepath.Abs(conf.DataFolder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = qb.Application().SetPreferences(ctx, &qbittorrent.Preferences{
|
||||
SavePath: dataDir,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ur, err := iouring.New(8, iouring.WithAsync())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Daemon{
|
||||
qb: qb,
|
||||
proc: proc,
|
||||
dataDir: conf.DataFolder,
|
||||
ur: ur,
|
||||
sourceFiles: make(map[string]string),
|
||||
registeredTorrents: mapset.NewSet[string](),
|
||||
client: wrapClient(qb),
|
||||
log: rlog.Component("qbittorrent"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *Daemon) Close(ctx context.Context) error {
|
||||
err := d.proc.Signal(os.Interrupt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = d.proc.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func torrentDataPath(dataDir string, ih string) (string, error) {
|
||||
return filepath.Abs(path.Join(dataDir, ih))
|
||||
}
|
||||
|
||||
func (fs *Daemon) GetTorrentFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) {
|
||||
ctx, span := trace.Start(ctx, "GetTorrentFS")
|
||||
defer span.End()
|
||||
|
||||
log := fs.log.With(slog.String("file", file.Name()))
|
||||
|
||||
ih, err := readInfoHash(ctx, file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log = log.With(slog.String("infohash", ih.HexString()))
|
||||
|
||||
torrentPath, err := torrentDataPath(fs.dataDir, ih.HexString())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting torrent path: %w", err)
|
||||
}
|
||||
log = log.With(slog.String("torrentPath", torrentPath))
|
||||
|
||||
log.Debug(ctx, "creating fs for torrent")
|
||||
|
||||
err = fs.syncTorrentState(ctx, file, ih, torrentPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error syncing torrent state: %w", err)
|
||||
}
|
||||
|
||||
fs.sourceFilesMu.Lock()
|
||||
fs.sourceFiles[sourcePath] = ih.HexString()
|
||||
fs.sourceFilesMu.Unlock()
|
||||
|
||||
return newTorrentFS(ctx, fs.ur, fs.client, file.Name(), ih.HexString(), torrentPath)
|
||||
}
|
||||
|
||||
func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainfo.Hash, torrentPath string) error {
|
||||
ctx, span := trace.Start(ctx, "syncTorrentState")
|
||||
defer span.End()
|
||||
log := d.log.With(slog.String("file", file.Name()), slog.String("infohash", ih.HexString()))
|
||||
|
||||
info, err := d.client.getInfo(ctx, ih.HexString())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log = log.With(slog.String("torrentPath", torrentPath))
|
||||
|
||||
if info == nil {
|
||||
_, err := file.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := ctxio.ReadAll(ctx, file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = d.qb.Torrent().AddNewTorrent(ctx, &qbittorrent.TorrentAddOption{
|
||||
Torrents: []*qbittorrent.TorrentAddFileMetadata{
|
||||
{
|
||||
Data: data,
|
||||
},
|
||||
},
|
||||
SavePath: torrentPath,
|
||||
// SequentialDownload: "true",
|
||||
FirstLastPiecePrio: "true",
|
||||
})
|
||||
if err != nil {
|
||||
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var props *qbittorrent.TorrentProperties
|
||||
for {
|
||||
props, err = d.client.getProperties(ctx, ih.HexString())
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
log.Error(ctx, "waiting for torrent to be added", rlog.Error(err))
|
||||
time.Sleep(time.Millisecond * 15)
|
||||
}
|
||||
|
||||
log.Info(ctx, "added torrent", slog.String("infohash", ih.HexString()))
|
||||
|
||||
d.registeredTorrents.Add(props.Hash)
|
||||
|
||||
return nil
|
||||
} else {
|
||||
// info := existing[0]
|
||||
props, err := d.client.getProperties(ctx, ih.HexString())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.registeredTorrents.Add(props.Hash)
|
||||
|
||||
if props.SavePath != torrentPath {
|
||||
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
|
||||
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TODO caching
|
||||
func readInfoHash(ctx context.Context, file vfs.File) (infohash.T, error) {
|
||||
mi, err := metainfo.Load(ctxio.IoReader(ctx, file))
|
||||
if err != nil {
|
||||
return infohash.T{}, err
|
||||
}
|
||||
info, err := mi.UnmarshalInfo()
|
||||
if err != nil {
|
||||
return infohash.T{}, err
|
||||
}
|
||||
|
||||
if info.HasV2() {
|
||||
ih := infohash_v2.HashBytes(mi.InfoBytes)
|
||||
return *(&ih).ToShort(), nil
|
||||
}
|
||||
|
||||
return infohash.HashBytes(mi.InfoBytes), nil
|
||||
}
|
409
daemons/qbittorrent/fs.go
Normal file
409
daemons/qbittorrent/fs.go
Normal file
|
@ -0,0 +1,409 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/uring"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/iceber/iouring-go"
|
||||
)
|
||||
|
||||
type FS struct {
|
||||
mu sync.Mutex
|
||||
client *cacheClient
|
||||
name string
|
||||
hash string
|
||||
dataDir string // directory where torrent files are stored
|
||||
|
||||
ur *iouring.IOURing
|
||||
|
||||
entries map[string]fileEntry
|
||||
|
||||
log *rlog.Logger
|
||||
|
||||
vfs.FilesystemPrototype
|
||||
}
|
||||
|
||||
type fileEntry struct {
|
||||
fs.FileInfo
|
||||
Content *qbittorrent.TorrentContent
|
||||
}
|
||||
|
||||
var _ vfs.Filesystem = (*FS)(nil)
|
||||
|
||||
func newTorrentFS(ctx context.Context, ur *iouring.IOURing, client *cacheClient, name string, hash string, dataDir string) (*FS, error) {
|
||||
ctx, span := trace.Start(ctx, "newTorrentFS")
|
||||
defer span.End()
|
||||
|
||||
cnts, err := client.listContent(ctx, hash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list content for hash %s: %w", hash, err)
|
||||
}
|
||||
|
||||
entries := make(map[string]fileEntry, len(cnts))
|
||||
for _, cnt := range cnts {
|
||||
if cnt.Priority == qbittorrent.PriorityDoNotDownload {
|
||||
continue
|
||||
}
|
||||
|
||||
entries[vfs.AbsPath(cnt.Name)] = fileEntry{
|
||||
Content: cnt,
|
||||
FileInfo: vfs.NewFileInfo(cnt.Name, cnt.Size),
|
||||
}
|
||||
}
|
||||
|
||||
return &FS{
|
||||
client: client,
|
||||
name: name,
|
||||
hash: hash,
|
||||
|
||||
dataDir: dataDir,
|
||||
|
||||
entries: entries,
|
||||
|
||||
ur: ur,
|
||||
|
||||
log: rlog.Component("qbittorrent", "fs"),
|
||||
|
||||
FilesystemPrototype: vfs.FilesystemPrototype(name),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Open implements vfs.Filesystem.
|
||||
func (f *FS) Open(ctx context.Context, name string) (vfs.File, error) {
|
||||
if name == vfs.Separator {
|
||||
return vfs.NewDirFile(name), nil
|
||||
}
|
||||
|
||||
if entry, ok := f.entries[name]; ok {
|
||||
return openFile(ctx, f.ur, f.client, f.dataDir, f.hash, entry.Content)
|
||||
}
|
||||
|
||||
for p := range f.entries {
|
||||
if strings.HasPrefix(p, name) {
|
||||
return vfs.NewDirFile(name), nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfs.ErrNotExist
|
||||
}
|
||||
|
||||
// ReadDir implements vfs.Filesystem.
|
||||
func (f *FS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
||||
infos := make(map[string]fs.FileInfo, len(f.entries))
|
||||
for k, v := range f.entries {
|
||||
infos[k] = v.FileInfo
|
||||
}
|
||||
|
||||
return vfs.ListDirFromInfo(infos, name)
|
||||
}
|
||||
|
||||
// Stat implements vfs.Filesystem.
|
||||
func (f *FS) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
||||
name = vfs.AbsPath(path.Clean(name))
|
||||
|
||||
if vfs.IsRoot(name) {
|
||||
return vfs.NewDirInfo(f.name), nil
|
||||
}
|
||||
|
||||
if entry, ok := f.entries[name]; ok {
|
||||
return entry.FileInfo, nil
|
||||
}
|
||||
|
||||
for p := range f.entries {
|
||||
if strings.HasPrefix(p, name) {
|
||||
return vfs.NewDirInfo(name), nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfs.ErrNotExist
|
||||
}
|
||||
|
||||
// Unlink implements vfs.Filesystem.
|
||||
func (f *FS) Unlink(ctx context.Context, filename string) error {
|
||||
filename = vfs.AbsPath(path.Clean(filename))
|
||||
|
||||
// we cannot delete a torrent itself, cause it will be added on next source scan and all delited files will be restored
|
||||
|
||||
if entry, ok := f.entries[filename]; ok {
|
||||
return f.removeFile(ctx, f.hash, entry.Content)
|
||||
}
|
||||
|
||||
for p, entry := range f.entries {
|
||||
if strings.HasPrefix(p, filename) {
|
||||
return f.removeFile(ctx, f.hash, entry.Content)
|
||||
}
|
||||
}
|
||||
|
||||
return vfs.ErrNotExist
|
||||
}
|
||||
|
||||
func (f *FS) Rename(ctx context.Context, oldpath string, newpath string) error {
|
||||
oldpath = vfs.AbsPath(path.Clean(oldpath))
|
||||
newpath = vfs.AbsPath(path.Clean(newpath))
|
||||
|
||||
if _, ok := f.entries[oldpath]; ok {
|
||||
err := f.client.qb.Torrent().RenameFile(ctx, f.hash, vfs.RelPath(oldpath), vfs.RelPath(newpath))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to rename file %s to %s: %w", oldpath, newpath, err)
|
||||
}
|
||||
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
f.entries[newpath] = f.entries[oldpath]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return vfs.ErrNotExist
|
||||
}
|
||||
|
||||
func (f *FS) removeFile(ctx context.Context, hash string, content *qbittorrent.TorrentContent) error {
|
||||
log := f.log.With(slog.String("hash", hash), slog.String("file", content.Name))
|
||||
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
fpath := vfs.AbsPath(content.Name)
|
||||
|
||||
if _, ok := f.entries[fpath]; !ok {
|
||||
return fmt.Errorf("file %s is does not found", fpath)
|
||||
}
|
||||
delete(f.entries, fpath)
|
||||
|
||||
err := f.client.qb.Torrent().SetFilePriority(ctx, f.hash, content.Index, qbittorrent.PriorityDoNotDownload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set priority for torrent %s for file %s: %w", hash, content.Name, err)
|
||||
}
|
||||
|
||||
err = os.Remove(path.Join(f.dataDir, vfs.RelPath(content.Name)))
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
log.Warn(ctx, "failed to remove file", rlog.Error(err))
|
||||
return fmt.Errorf("failed to remove file %s: %w", content.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func openFile(ctx context.Context, ur *iouring.IOURing, client *cacheClient, torrentDir string, hash string, content *qbittorrent.TorrentContent) (*File, error) {
|
||||
props, err := client.getProperties(ctx, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// FIXME error when file not started downloading
|
||||
file, err := os.OpenFile(path.Join(torrentDir, content.Name), os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &File{
|
||||
client: client,
|
||||
hash: hash,
|
||||
torrentDir: torrentDir,
|
||||
|
||||
filePath: content.Name,
|
||||
contentIndex: content.Index,
|
||||
pieceSize: props.PieceSize,
|
||||
fileSize: content.Size,
|
||||
|
||||
file: uring.NewFile(ur, file),
|
||||
|
||||
offset: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type File struct {
|
||||
client *cacheClient
|
||||
hash string
|
||||
torrentDir string
|
||||
filePath string // path inside a torrent directory
|
||||
contentIndex int
|
||||
pieceSize int
|
||||
fileSize int64
|
||||
|
||||
mu sync.Mutex
|
||||
file *uring.File
|
||||
offset int64
|
||||
}
|
||||
|
||||
var _ vfs.File = (*File)(nil)
|
||||
|
||||
// Info implements vfs.File.
|
||||
func (f *File) Info() (fs.FileInfo, error) {
|
||||
return &fileInfo{name: path.Base(f.filePath), size: f.fileSize}, nil
|
||||
}
|
||||
|
||||
// IsDir implements vfs.File.
|
||||
func (f *File) IsDir() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Seek implements vfs.File.
|
||||
func (f *File) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
f.offset = offset
|
||||
case io.SeekCurrent:
|
||||
f.offset += offset
|
||||
case io.SeekEnd:
|
||||
f.offset = f.fileSize + offset
|
||||
}
|
||||
return f.offset, nil
|
||||
}
|
||||
|
||||
// Name implements vfs.File.
|
||||
func (f *File) Name() string {
|
||||
return path.Base(f.filePath)
|
||||
}
|
||||
|
||||
func (f *File) canExpectSoon(ctx context.Context) (bool, error) {
|
||||
info, err := f.client.getInfo(ctx, f.hash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil
|
||||
}
|
||||
|
||||
func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (bool, error) {
|
||||
startPieceIndex := int(offset / int64(f.pieceSize))
|
||||
pieceCount := (size + f.pieceSize - 1) / f.pieceSize // rouding up
|
||||
|
||||
for i := range pieceCount {
|
||||
ok, err := f.client.isPieceComplete(ctx, f.hash, startPieceIndex+i)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error {
|
||||
complete, err := f.isRangeComplete(ctx, offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if complete {
|
||||
return nil
|
||||
}
|
||||
|
||||
canExpectSoon, err := f.canExpectSoon(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !canExpectSoon {
|
||||
return fmt.Errorf("torrent is not downloading")
|
||||
}
|
||||
|
||||
const checkingInterval = 1 * time.Second
|
||||
|
||||
ticker := time.NewTicker(checkingInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
complete, err := f.isRangeComplete(ctx, offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if complete {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements vfs.File.
|
||||
func (f *File) Read(ctx context.Context, p []byte) (int, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err := f.file.ReadAt(ctx, p, f.offset)
|
||||
f.offset += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// ReadAt implements vfs.File.
|
||||
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
|
||||
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return f.file.ReadAt(ctx, p, off)
|
||||
}
|
||||
|
||||
// Size implements vfs.File.
|
||||
func (f *File) Size() int64 {
|
||||
return f.fileSize
|
||||
}
|
||||
|
||||
// Type implements vfs.File.
|
||||
func (f *File) Type() fs.FileMode {
|
||||
return fs.ModeDir
|
||||
}
|
||||
|
||||
// Close implements vfs.File.
|
||||
func (f *File) Close(ctx context.Context) error {
|
||||
return f.file.Close(ctx)
|
||||
}
|
||||
|
||||
type fileInfo struct {
|
||||
name string
|
||||
size int64
|
||||
}
|
||||
|
||||
var _ fs.FileInfo = (*fileInfo)(nil)
|
||||
|
||||
// IsDir implements fs.FileInfo.
|
||||
func (f *fileInfo) IsDir() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// ModTime implements fs.FileInfo.
|
||||
func (f *fileInfo) ModTime() time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
// Mode implements fs.FileInfo.
|
||||
func (f *fileInfo) Mode() fs.FileMode {
|
||||
return vfs.ModeFileRO
|
||||
}
|
||||
|
||||
// Name implements fs.FileInfo.
|
||||
func (f *fileInfo) Name() string {
|
||||
return f.name
|
||||
}
|
||||
|
||||
// Size implements fs.FileInfo.
|
||||
func (f *fileInfo) Size() int64 {
|
||||
return f.size
|
||||
}
|
||||
|
||||
// Sys implements fs.FileInfo.
|
||||
func (f *fileInfo) Sys() any {
|
||||
return nil
|
||||
}
|
150
daemons/qbittorrent/install.go
Normal file
150
daemons/qbittorrent/install.go
Normal file
|
@ -0,0 +1,150 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-github/v63/github"
|
||||
"golang.org/x/sys/cpu"
|
||||
)
|
||||
|
||||
const (
|
||||
repoOwner = "userdocs"
|
||||
repoName = "qbittorrent-nox-static"
|
||||
)
|
||||
|
||||
func runQBittorrent(binPath string, profileDir string, port int, stdout, stderr io.Writer) (*os.Process, error) {
|
||||
err := os.Chmod(binPath, 0755)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cmd := exec.Command(binPath, fmt.Sprintf("--profile=%s", profileDir), fmt.Sprintf("--webui-port=%d", port))
|
||||
cmd.Stdin = bytes.NewReader([]byte("y\n"))
|
||||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cmd.Process, nil
|
||||
}
|
||||
|
||||
func downloadLatestQbitRelease(ctx context.Context, binPath string) error {
|
||||
client := github.NewClient(nil)
|
||||
rel, _, err := client.Repositories.GetLatestRelease(ctx, repoOwner, repoName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
arch := ""
|
||||
switch runtime.GOARCH {
|
||||
case "amd64":
|
||||
arch = "x86_64"
|
||||
case "arm":
|
||||
arch = "armhf" // this is a safe version, go does not distinguish between armv6 and armv7
|
||||
if cpu.ARM.HasNEON {
|
||||
arch = "armv7"
|
||||
}
|
||||
case "arm64":
|
||||
arch = "aarch64"
|
||||
}
|
||||
|
||||
if arch == "" {
|
||||
return errors.New("unsupported architecture")
|
||||
}
|
||||
|
||||
binName := arch + "-qbittorrent-nox"
|
||||
|
||||
var targetRelease *github.ReleaseAsset
|
||||
for _, v := range rel.Assets {
|
||||
if v.GetName() == binName {
|
||||
targetRelease = v
|
||||
break
|
||||
}
|
||||
}
|
||||
if targetRelease == nil {
|
||||
return fmt.Errorf("target asset %s not found", binName)
|
||||
}
|
||||
|
||||
downloadUrl := targetRelease.GetBrowserDownloadURL()
|
||||
if downloadUrl == "" {
|
||||
return errors.New("download url is empty")
|
||||
}
|
||||
|
||||
err = os.MkdirAll(path.Dir(binPath), 0755)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.InfoContext(ctx, "downloading latest qbittorrent-nox release", slog.String("url", downloadUrl))
|
||||
|
||||
return downloadFile(ctx, binPath, downloadUrl)
|
||||
}
|
||||
|
||||
func downloadFile(ctx context.Context, filepath string, webUrl string) error {
|
||||
if stat, err := os.Stat(filepath); err == nil {
|
||||
resp, err := http.Head(webUrl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var lastModified time.Time
|
||||
|
||||
lastModifiedHeader := resp.Header.Get("Last-Modified")
|
||||
if lastModifiedHeader != "" {
|
||||
lastModified, err = time.Parse(http.TimeFormat, lastModifiedHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if resp.ContentLength == stat.Size() && lastModified.Before(stat.ModTime()) {
|
||||
slog.InfoContext(ctx, "there is already newest version of the file", slog.String("filepath", filepath))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Create the file
|
||||
out, err := os.Create(filepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, webUrl, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the data
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Check server response
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("bad status: %s", resp.Status)
|
||||
}
|
||||
|
||||
// Writer the body to file
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
18
daemons/qbittorrent/install_test.go
Normal file
18
daemons/qbittorrent/install_test.go
Normal file
|
@ -0,0 +1,18 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDownloadQBittorent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tempDir := t.TempDir()
|
||||
require := require.New(t)
|
||||
err := downloadLatestQbitRelease(ctx, tempDir)
|
||||
require.NoError(err)
|
||||
err = downloadLatestQbitRelease(ctx, tempDir)
|
||||
require.NoError(err)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue