seek, load only with priority, qbittorrent

This commit is contained in:
royalcat 2024-08-23 01:16:16 +03:00
parent e517332a65
commit ae4501ae21
26 changed files with 1357 additions and 623 deletions

View file

@ -0,0 +1,85 @@
package qbittorrent
import (
"context"
"fmt"
"slices"
"time"
"github.com/xuthus5/qbittorrent-client-go/qbittorrent"
)
type client struct {
qb qbittorrent.Client
}
func wrapClient(qb qbittorrent.Client) *client {
return &client{qb: qb}
}
func (f *client) getFileContent(ctx context.Context, hash string, contextIndex int) (*qbittorrent.TorrentContent, error) {
contents, err := f.qb.Torrent().GetContents(hash)
if err != nil {
return nil, err
}
contentIndex := slices.IndexFunc(contents, func(c *qbittorrent.TorrentContent) bool {
return c.Index == contextIndex
})
if contentIndex == -1 {
return nil, fmt.Errorf("content not found")
}
return contents[contentIndex], nil
}
func (f *client) isPieceComplete(ctx context.Context, hash string, pieceIndex int) (bool, error) {
completion, err := f.qb.Torrent().GetPiecesStates(hash)
if err != nil {
return false, err
}
if completion[pieceIndex] == 2 {
return true, nil
}
return false, nil
}
func (f *client) waitPieceToComplete(ctx context.Context, hash string, pieceIndex int) error {
const checkingInterval = 1 * time.Second
ok, err := f.isPieceComplete(ctx, hash, pieceIndex)
if err != nil {
return err
}
if ok {
return nil
}
if deadline, ok := ctx.Deadline(); ok && time.Until(deadline) < checkingInterval {
return context.DeadlineExceeded
}
ticker := time.NewTicker(checkingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
ok, err := f.isPieceComplete(ctx, hash, pieceIndex)
if err != nil {
return err
}
if ok {
return nil
}
if deadline, ok := ctx.Deadline(); ok && time.Until(deadline) < checkingInterval {
return context.DeadlineExceeded
}
}
}
}

View file

@ -0,0 +1,101 @@
package qbittorrent
import (
"bytes"
"context"
"path"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/ctxio"
"github.com/xuthus5/qbittorrent-client-go/qbittorrent"
)
type Daemon struct {
qb qbittorrent.Client
client *client
dataDir string
}
func NewDaemon(dir string) (*Daemon, error) {
dataDir := dir + "/data"
qb, err := qbittorrent.NewClient(&qbittorrent.Config{
Address: "localhost:8080",
})
if err != nil {
return nil, err
}
return &Daemon{
qb: qb,
dataDir: dataDir,
client: wrapClient(qb),
}, nil
}
func (fs *Daemon) torrentPath(ih infohash.T) string {
return path.Join(fs.dataDir, ih.HexString())
}
func (fs *Daemon) addTorrent(ctx context.Context, f vfs.File) error {
file, err := ctxio.ReadAll(ctx, f)
if err != nil {
return err
}
mi, err := metainfo.Load(bytes.NewBuffer(file))
if err != nil {
return err
}
ih := mi.HashInfoBytes()
err = fs.qb.Torrent().AddNewTorrent(&qbittorrent.TorrentAddOption{
Torrents: []*qbittorrent.TorrentAddFileMetadata{
{
Data: file,
},
},
SavePath: fs.torrentPath(ih),
// SequentialDownload: "true",
// FirstLastPiecePrio: "true",
})
if err != nil {
return err
}
return nil
}
func (fs *Daemon) TorrentFS(ctx context.Context, file vfs.File) (*FS, error) {
ih, err := readInfoHash(ctx, file)
if err != nil {
return nil, err
}
existing, err := fs.qb.Torrent().GetTorrents(&qbittorrent.TorrentOption{
Hashes: []string{ih.HexString()},
})
if err != nil {
return nil, err
}
if len(existing) == 0 {
err := fs.addTorrent(ctx, file)
if err != nil {
return nil, err
}
}
return newTorrentFS(fs.client, file.Name(), ih.HexString(), fs.torrentPath(ih))
}
// TODO caching
func readInfoHash(ctx context.Context, file vfs.File) (infohash.T, error) {
mi, err := metainfo.Load(ctxio.IoReader(ctx, file))
if err != nil {
return infohash.T{}, err
}
return mi.HashInfoBytes(), nil
}

View file

@ -0,0 +1,228 @@
package qbittorrent
import (
"context"
"io"
"io/fs"
"os"
"path"
"time"
"git.kmsign.ru/royalcat/tstor/src/vfs"
)
type FS struct {
client *client
name string
hash string
dataDir string
}
var _ vfs.Filesystem = (*FS)(nil)
func newTorrentFS(client *client, name string, hash string, dataDir string) (*FS, error) {
return &FS{
client: client,
name: name,
hash: hash,
dataDir: dataDir,
}, nil
}
// Info implements vfs.Filesystem.
func (f *FS) Info() (fs.FileInfo, error) {
return vfs.NewDirInfo(f.name), nil
}
// IsDir implements vfs.Filesystem.
func (f *FS) IsDir() bool {
return true
}
// Name implements vfs.Filesystem.
func (f *FS) Name() string {
return path.Base(f.dataDir)
}
// Open implements vfs.Filesystem.
func (f *FS) Open(ctx context.Context, filename string) (vfs.File, error) {
panic("unimplemented")
}
// ReadDir implements vfs.Filesystem.
func (f *FS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
panic("unimplemented")
}
// Stat implements vfs.Filesystem.
func (f *FS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
return vfs.NewDirInfo(f.name), nil
}
// Type implements vfs.Filesystem.
func (f *FS) Type() fs.FileMode {
return vfs.ROMode
}
// Unlink implements vfs.Filesystem.
func (f *FS) Unlink(ctx context.Context, filename string) error {
panic("unimplemented")
}
func openFile(ctx context.Context, client client, hash, filePath string) *File {
client.getFileContent(ctx, hash, 0)
return &File{
client: client,
hash: hash,
filePath: filePath,
}
}
type File struct {
client client
hash string
dataDir string
filePath string // path inside a torrent directory
contentIndex int
pieceSize int
fileSize int64
offset int64
osfile *os.File
}
var _ vfs.File = (*File)(nil)
// Close implements vfs.File.
func (f *File) Close(ctx context.Context) error {
if f.osfile != nil {
err := f.osfile.Close()
f.osfile = nil
return err
}
return nil
}
// Info implements vfs.File.
func (f *File) Info() (fs.FileInfo, error) {
return &fileInfo{name: path.Base(f.filePath), size: f.fileSize}, nil
}
// IsDir implements vfs.File.
func (f *File) IsDir() bool {
return false
}
// Seek implements vfs.File.
func (f *File) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
f.offset = offset
case io.SeekCurrent:
f.offset += offset
case io.SeekEnd:
f.offset = f.fileSize + offset
}
return f.offset, nil
}
// Name implements vfs.File.
func (f *File) Name() string {
return path.Base(f.filePath)
}
// Read implements vfs.File.
func (f *File) Read(ctx context.Context, p []byte) (n int, err error) {
pieceIndex := int(f.offset / int64(f.pieceSize))
err = f.client.waitPieceToComplete(ctx, f.hash, pieceIndex)
if err != nil {
return 0, err
}
descriptor, err := f.descriptor()
if err != nil {
return 0, err
}
n, err = descriptor.ReadAt(p, f.offset)
f.offset += int64(n)
return n, err
}
// ReadAt implements vfs.File.
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
pieceIndex := int(off / int64(f.pieceSize))
err = f.client.waitPieceToComplete(ctx, f.hash, pieceIndex)
if err != nil {
return 0, err
}
descriptor, err := f.descriptor()
if err != nil {
return 0, err
}
return descriptor.ReadAt(p, off)
}
// Size implements vfs.File.
func (f *File) Size() int64 {
return f.fileSize
}
// Type implements vfs.File.
func (f *File) Type() fs.FileMode {
return vfs.ROMode
}
func (f *File) descriptor() (*os.File, error) {
if f.osfile != nil {
return f.osfile, nil
}
osfile, err := os.Open(path.Join(f.dataDir, f.filePath))
if err != nil {
return nil, err
}
f.osfile = osfile
return f.osfile, nil
}
type fileInfo struct {
name string
size int64
}
var _ fs.FileInfo = (*fileInfo)(nil)
// IsDir implements fs.FileInfo.
func (f *fileInfo) IsDir() bool {
return false
}
// ModTime implements fs.FileInfo.
func (f *fileInfo) ModTime() time.Time {
return time.Time{}
}
// Mode implements fs.FileInfo.
func (f *fileInfo) Mode() fs.FileMode {
return vfs.ROMode
}
// Name implements fs.FileInfo.
func (f *fileInfo) Name() string {
return f.name
}
// Size implements fs.FileInfo.
func (f *fileInfo) Size() int64 {
return f.size
}
// Sys implements fs.FileInfo.
func (f *fileInfo) Sys() any {
return nil
}

View file

@ -0,0 +1,139 @@
package qbittorrent
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path"
"runtime"
"time"
"github.com/google/go-github/v63/github"
"golang.org/x/sys/cpu"
)
const (
repoOwner = "userdocs"
repoName = "qbittorrent-nox-static"
binName = "qbittorrent-nox"
)
func runQBittorrent(binDir string, profileDir string, stdout, stderr io.Writer) (*os.Process, error) {
cmd := exec.Command(
path.Join(binDir, binName),
fmt.Sprintf("--profile=%s", profileDir),
)
cmd.Stdin = bytes.NewReader([]byte("y\n"))
cmd.Stdout = stdout
cmd.Stderr = stderr
err := cmd.Start()
if err != nil {
return nil, err
}
return cmd.Process, nil
}
func downloadLatestRelease(ctx context.Context, binPath string) error {
client := github.NewClient(nil)
rel, _, err := client.Repositories.GetLatestRelease(ctx, repoOwner, repoName)
if err != nil {
return err
}
arch := ""
switch runtime.GOARCH {
case "amd64":
arch = "x86_64"
case "arm":
arch = "armhf" // this is a safe version, go does not distinguish between armv6 and armv7
if cpu.ARM.HasNEON {
arch = "armv7"
}
case "arm64":
arch = "aarch64"
}
if arch == "" {
return errors.New("unsupported architecture")
}
binName := arch + "-qbittorrent-nox"
var targetRelease *github.ReleaseAsset
for _, v := range rel.Assets {
if v.GetName() == binName {
targetRelease = v
break
}
}
if targetRelease == nil {
return fmt.Errorf("target asset %s not found", binName)
}
downloadUrl := targetRelease.GetBrowserDownloadURL()
if downloadUrl == "" {
return errors.New("download url is empty")
}
err = os.MkdirAll(path.Dir(binPath), 0755)
if err != nil {
return err
}
return downloadFile(binPath, downloadUrl)
}
func downloadFile(filepath string, webUrl string) error {
if stat, err := os.Stat(filepath); err == nil {
resp, err := http.Head(webUrl)
if err != nil {
return err
}
defer resp.Body.Close()
var lastModified time.Time
lastModifiedHeader := resp.Header.Get("Last-Modified")
if lastModifiedHeader != "" {
lastModified, err = time.Parse(http.TimeFormat, lastModifiedHeader)
if err != nil {
return err
}
}
if resp.ContentLength == stat.Size() && lastModified.Before(stat.ModTime()) {
return nil
}
}
// Create the file
out, err := os.Create(filepath)
if err != nil {
return err
}
defer out.Close()
// Get the data
resp, err := http.Get(webUrl)
if err != nil {
return err
}
defer resp.Body.Close()
// Check server response
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("bad status: %s", resp.Status)
}
// Writer the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,18 @@
package qbittorrent
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestDownloadQBittorent(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
require := require.New(t)
err := downloadLatestRelease(ctx, tempDir)
require.NoError(err)
err = downloadLatestRelease(ctx, tempDir)
require.NoError(err)
}

View file

@ -7,7 +7,6 @@ import (
"git.kmsign.ru/royalcat/tstor/src/config"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/bep44"
tlog "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
@ -23,12 +22,19 @@ func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.Torrent
torrentCfg.PeerID = string(id[:])
torrentCfg.DefaultStorage = st
// torrentCfg.AlwaysWantConns = true
// torrentCfg.DropMutuallyCompletePeers = true
torrentCfg.DropMutuallyCompletePeers = true
// torrentCfg.TorrentPeersLowWater = 100
// torrentCfg.TorrentPeersHighWater = 1000
// torrentCfg.AcceptPeerConnections = true
torrentCfg.Seed = true
// torrentCfg.DisableAggressiveUpload = false
torrentCfg.DisableAggressiveUpload = false
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
// torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
// cfg.Store = fis
// cfg.Exp = dhtTTL
// cfg.PeerStore = fis
// }
tl := tlog.NewLogger("torrent-client")
tl.SetHandlers(&dlog.Torrent{L: l})
@ -37,12 +43,12 @@ func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.Torrent
torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) {
l.With(peerAttrs(p)...).Debug("new peer")
})
torrentCfg.Callbacks.PeerClosed = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) {
l.With(peerAttrs(p)...).Debug("peer closed")
})
torrentCfg.Callbacks.CompletedHandshake = func(pc *torrent.PeerConn, ih infohash.T) {
l.With(peerAttrs(&pc.Peer)...).Debug("completed handshake")
attrs := append(peerAttrs(&pc.Peer), slog.String("infohash", ih.HexString()))
l.With(attrs...).Debug("completed handshake")
}
torrentCfg.Callbacks.PeerConnAdded = append(torrentCfg.Callbacks.PeerConnAdded, func(pc *torrent.PeerConn) {
l.With(peerAttrs(&pc.Peer)...).Debug("peer conn added")
@ -50,12 +56,16 @@ func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.Torrent
torrentCfg.Callbacks.PeerConnClosed = func(pc *torrent.PeerConn) {
l.With(peerAttrs(&pc.Peer)...).Debug("peer conn closed")
}
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
cfg.Store = fis
cfg.Exp = dhtTTL
torrentCfg.Callbacks.CompletedHandshake = func(pc *torrent.PeerConn, ih infohash.T) {
attrs := append(peerAttrs(&pc.Peer), slog.String("infohash", ih.HexString()))
l.With(attrs...).Debug("completed handshake")
}
torrentCfg.Callbacks.ReceivedRequested = append(torrentCfg.Callbacks.ReceivedRequested, func(pme torrent.PeerMessageEvent) {
l.With(peerAttrs(pme.Peer)...).Debug("received requested")
})
torrentCfg.Callbacks.ReceivedUsefulData = append(torrentCfg.Callbacks.ReceivedUsefulData, func(pme torrent.PeerMessageEvent) {
l.With(peerAttrs(pme.Peer)...).Debug("received useful data")
})
return torrentCfg
}

View file

@ -1,23 +1,18 @@
package torrent
import (
"bufio"
"context"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/tkv"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
@ -29,7 +24,6 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
"github.com/go-git/go-billy/v5"
"github.com/go-git/go-billy/v5/util"
"github.com/royalcat/kv"
)
@ -54,8 +48,7 @@ type Daemon struct {
fileProperties kv.Store[string, FileProperties]
statsStore *statsStore
loadMutex sync.Mutex
torrentLoaded chan struct{}
loadMutex sync.Mutex
sourceFs billy.Filesystem
@ -64,12 +57,11 @@ type Daemon struct {
const dhtTTL = 180 * 24 * time.Hour
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
func NewDaemon(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
s := &Daemon{
log: rlog.Component("torrent-service"),
sourceFs: sourceFs,
torrentLoaded: make(chan struct{}),
loadMutex: sync.Mutex{},
log: rlog.Component("torrent-service"),
sourceFs: sourceFs,
loadMutex: sync.Mutex{},
}
err := os.MkdirAll(conf.MetadataFolder, 0744)
@ -131,19 +123,16 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
go func() {
ctx := context.Background()
err := s.loadTorrentFiles(ctx)
err := s.backgroudFileLoad(ctx)
if err != nil {
s.log.Error(ctx, "initial torrent load failed", rlog.Error(err))
}
close(s.torrentLoaded)
}()
go func() {
ctx := context.Background()
const period = time.Second * 10
<-s.torrentLoaded
err := registerTorrentMetrics(s.client)
if err != nil {
s.log.Error(ctx, "error registering torrent metrics", rlog.Error(err))
@ -167,70 +156,6 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return s, nil
}
func (s *Daemon) allStats(ctx context.Context) (map[infohash.T]TorrentStats, TorrentStats) {
totalPeers := 0
activePeers := 0
connectedSeeders := 0
perTorrentStats := map[infohash.T]TorrentStats{}
for _, v := range s.client.Torrents() {
stats := v.Stats()
perTorrentStats[v.InfoHash()] = TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(stats.BytesRead.Int64()),
UploadedBytes: uint64(stats.BytesWritten.Int64()),
TotalPeers: uint16(stats.TotalPeers),
ActivePeers: uint16(stats.ActivePeers),
ConnectedSeeders: uint16(stats.ConnectedSeeders),
}
totalPeers += stats.TotalPeers
activePeers += stats.ActivePeers
connectedSeeders += stats.ConnectedSeeders
}
totalStats := s.client.Stats()
return perTorrentStats, TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(totalStats.BytesRead.Int64()),
UploadedBytes: uint64(totalStats.BytesWritten.Int64()),
TotalPeers: uint16(totalPeers),
ActivePeers: uint16(activePeers),
ConnectedSeeders: uint16(connectedSeeders),
}
}
func (s *Daemon) updateStats(ctx context.Context) {
log := s.log
perTorrentStats, totalStats := s.allStats(ctx)
for ih, v := range perTorrentStats {
err := s.statsStore.AddTorrentStats(ih, v)
if err != nil {
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
}
}
err := s.statsStore.AddTotalStats(totalStats)
if err != nil {
log.Error(ctx, "error saving total stats", rlog.Error(err))
}
}
func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadTotalStatsHistory(ctx, since)
}
func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih)
}
func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadStatsHistory(ctx, since)
}
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
func (s *Daemon) Close(ctx context.Context) error {
@ -244,104 +169,6 @@ func (s *Daemon) Close(ctx context.Context) error {
)...)
}
func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
ctx, span := tracer.Start(ctx, "loadTorrent")
defer span.End()
log := s.log
stat, err := f.Info()
if err != nil {
return nil, fmt.Errorf("call stat failed: %w", err)
}
span.SetAttributes(attribute.String("filename", stat.Name()))
mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f)))
if err != nil {
return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err)
}
var ctl *Controller
t, ok := s.client.Torrent(mi.HashInfoBytes())
if ok {
ctl = s.newController(t)
} else {
span.AddEvent("torrent not found, loading from file")
log.Info(ctx, "torrent not found, loading from file")
spec, err := torrent.TorrentSpecFromMetaInfoErr(mi)
if err != nil {
return nil, fmt.Errorf("parse spec from metadata: %w", err)
}
infoBytes := spec.InfoBytes
if !isValidInfoHashBytes(infoBytes) {
log.Warn(ctx, "info loaded from spec not valid")
infoBytes = nil
}
if len(infoBytes) == 0 {
log.Info(ctx, "no info loaded from file, try to load from cache")
infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash)
if err != nil && err != errNotFound {
return nil, fmt.Errorf("get info bytes from database: %w", err)
}
}
t, _ = s.client.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: spec.InfoHash,
InfoHashV2: spec.InfoHashV2,
Storage: s.Storage,
InfoBytes: infoBytes,
ChunkSize: spec.ChunkSize,
})
t.AllowDataDownload()
t.AllowDataUpload()
span.AddEvent("torrent added to client")
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-t.GotInfo():
err := s.infoBytes.Set(t.InfoHash(), t.Metainfo())
if err != nil {
log.Error(ctx, "error setting info bytes for torrent",
slog.String("torrent-name", t.Name()),
rlog.Error(err),
)
}
}
span.AddEvent("got info")
ctl = s.newController(t)
err = ctl.initializeTorrentPriories(ctx)
if err != nil {
return nil, fmt.Errorf("initialize torrent priorities: %w", err)
}
// info := t.Info()
// if info == nil {
// return nil, fmt.Errorf("info is nil")
// }
// compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, *info)
// if err != nil {
// return nil, err
// }
// if !compatable {
// return nil, fmt.Errorf(
// "torrent with name '%s' not compatable existing infohash: %s, new: %s",
// t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
// )
// }
}
return ctl, nil
}
func isValidInfoHashBytes(d []byte) bool {
var info metainfo.Info
err := bencode.Unmarshal(d, &info)
@ -352,74 +179,6 @@ func (s *Daemon) Stats() torrent.ConnStats {
return s.client.Stats().ConnStats
}
const loadWorkers = 5
func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes(
attribute.Int("workers", loadWorkers),
))
defer span.End()
log := s.log
loaderPaths := make(chan string, loadWorkers*5)
wg := sync.WaitGroup{}
defer func() {
close(loaderPaths)
wg.Wait()
}()
loaderWorker := func() {
for path := range loaderPaths {
info, err := s.sourceFs.Stat(path)
if err != nil {
log.Error(ctx, "error stat torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
file, err := s.sourceFs.Open(path)
if err != nil {
log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
defer file.Close()
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
_, err = s.loadTorrent(ctx, vfile)
if err != nil {
log.Error(ctx, "failed adding torrent", rlog.Error(err))
}
}
wg.Done()
}
wg.Add(loadWorkers)
for range loadWorkers {
go loaderWorker()
}
return util.Walk(s.sourceFs, "", func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("fs walk error: %w", err)
}
if ctx.Err() != nil {
return ctx.Err()
}
if info.IsDir() {
return nil
}
if strings.HasSuffix(path, ".torrent") {
loaderPaths <- path
}
return nil
})
}
func storeByTorrent[K kv.Bytes, V any](s kv.Store[K, V], infohash infohash.T) kv.Store[K, V] {
return kv.PrefixBytes[K, V](s, K(infohash.HexString()+"/"))
}
@ -433,8 +192,6 @@ func (s *Daemon) newController(t *torrent.Torrent) *Controller {
}
func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) {
<-s.torrentLoaded
out := []*Controller{}
for _, v := range s.client.Torrents() {
out = append(out, s.newController(v))
@ -443,8 +200,6 @@ func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) {
}
func (s *Daemon) GetTorrent(infohashHex string) (*Controller, error) {
<-s.torrentLoaded
t, ok := s.client.Torrent(infohash.FromHexString(infohashHex))
if !ok {
return nil, nil

View file

@ -0,0 +1,246 @@
package torrent
import (
"bufio"
"context"
"fmt"
"io"
"log/slog"
"os"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/go-git/go-billy/v5/util"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const activityTimeout = time.Minute * 15
func readInfoHash(ctx context.Context, f vfs.File) (metainfo.Hash, error) {
ctx, span := tracer.Start(ctx, "readInfoHash")
defer span.End()
mi, err := metainfo.Load(ctxio.IoReader(ctx, f))
if err != nil {
return metainfo.Hash{}, fmt.Errorf("loading metainfo: %w", err)
}
return mi.HashInfoBytes(), nil
}
func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
ctx, span := tracer.Start(ctx, "loadTorrent")
defer span.End()
log := s.log
stat, err := f.Info()
if err != nil {
return nil, fmt.Errorf("call stat failed: %w", err)
}
span.SetAttributes(attribute.String("filename", stat.Name()))
mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f)))
if err != nil {
return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err)
}
log = log.With(slog.String("info-hash", mi.HashInfoBytes().HexString()))
var ctl *Controller
t, ok := s.client.Torrent(mi.HashInfoBytes())
if ok {
log = log.With(slog.String("torrent-name", t.Name()))
ctl = s.newController(t)
} else {
span.AddEvent("torrent not found, loading from file")
log.Info(ctx, "torrent not found, loading from file")
spec, err := torrent.TorrentSpecFromMetaInfoErr(mi)
if err != nil {
return nil, fmt.Errorf("parse spec from metadata: %w", err)
}
infoBytes := spec.InfoBytes
if !isValidInfoHashBytes(infoBytes) {
log.Warn(ctx, "info loaded from spec not valid")
infoBytes = nil
}
if len(infoBytes) == 0 {
log.Info(ctx, "no info loaded from file, try to load from cache")
infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash)
if err != nil && err != errNotFound {
return nil, fmt.Errorf("get info bytes from database: %w", err)
}
}
t, _ = s.client.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: spec.InfoHash,
InfoHashV2: spec.InfoHashV2,
Storage: s.Storage,
InfoBytes: infoBytes,
ChunkSize: spec.ChunkSize,
})
log = log.With(slog.String("torrent-name", t.Name()))
t.AllowDataDownload()
t.AllowDataUpload()
span.AddEvent("torrent added to client")
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-t.GotInfo():
err := s.infoBytes.Set(t.InfoHash(), t.Metainfo())
if err != nil {
log.Error(ctx, "error setting info bytes for torrent",
slog.String("torrent-name", t.Name()),
rlog.Error(err),
)
}
}
span.AddEvent("got info")
ctl = s.newController(t)
err = ctl.initializeTorrentPriories(ctx)
if err != nil {
return nil, fmt.Errorf("initialize torrent priorities: %w", err)
}
// go func() {
// subscr := ctl.t.SubscribePieceStateChanges()
// defer subscr.Close()
// dropTimer := time.NewTimer(activityTimeout)
// defer dropTimer.Stop()
// for {
// select {
// case <-subscr.Values:
// dropTimer.Reset(activityTimeout)
// case <-dropTimer.C:
// log.Info(ctx, "torrent dropped by activity timeout")
// select {
// case <-ctl.t.Closed():
// return
// case <-time.After(time.Second):
// ctl.t.Drop()
// }
// case <-ctl.t.Closed():
// return
// }
// }
// }()
}
return ctl, nil
}
const loadWorkers = 5
func (s *Daemon) backgroudFileLoad(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes(
attribute.Int("workers", loadWorkers),
))
defer span.End()
log := s.log
loaderPaths := make(chan string, loadWorkers*5)
wg := sync.WaitGroup{}
defer func() {
close(loaderPaths)
wg.Wait()
}()
loaderWorker := func() {
for path := range loaderPaths {
info, err := s.sourceFs.Stat(path)
if err != nil {
log.Error(ctx, "error stat torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
file, err := s.sourceFs.Open(path)
if err != nil {
log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
defer file.Close()
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
ih, err := readInfoHash(ctx, vfile)
if err != nil {
log.Error(ctx, "error reading info hash", slog.String("filename", path), rlog.Error(err))
continue
}
props := storeByTorrent(s.fileProperties, ih)
_, err = vfile.Seek(0, io.SeekStart)
if err != nil {
log.Error(ctx, "error seeking file", slog.String("filename", path), rlog.Error(err))
continue
}
isPrioritized := false
err = props.Range(ctx, func(k string, v FileProperties) error {
if v.Priority > 0 {
isPrioritized = true
return io.EOF
}
return nil
})
if err != nil && err != io.EOF {
log.Error(ctx, "error checking file priority", slog.String("filename", path), rlog.Error(err))
continue
}
if !isPrioritized {
log.Debug(ctx, "file not prioritized, skipping", slog.String("filename", path))
continue
}
_, err = s.loadTorrent(ctx, vfile)
if err != nil {
log.Error(ctx, "failed adding torrent", rlog.Error(err))
}
}
wg.Done()
}
wg.Add(loadWorkers)
for range loadWorkers {
go loaderWorker()
}
return util.Walk(s.sourceFs, "", func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("fs walk error: %w", err)
}
if ctx.Err() != nil {
return ctx.Err()
}
if info.IsDir() {
return nil
}
if strings.HasSuffix(path, ".torrent") {
loaderPaths <- path
}
return nil
})
}

View file

@ -0,0 +1,73 @@
package torrent
import (
"context"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent/types/infohash"
)
func (s *Daemon) allStats(ctx context.Context) (map[infohash.T]TorrentStats, TorrentStats) {
totalPeers := 0
activePeers := 0
connectedSeeders := 0
perTorrentStats := map[infohash.T]TorrentStats{}
for _, v := range s.client.Torrents() {
stats := v.Stats()
perTorrentStats[v.InfoHash()] = TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(stats.BytesRead.Int64()),
UploadedBytes: uint64(stats.BytesWritten.Int64()),
TotalPeers: uint16(stats.TotalPeers),
ActivePeers: uint16(stats.ActivePeers),
ConnectedSeeders: uint16(stats.ConnectedSeeders),
}
totalPeers += stats.TotalPeers
activePeers += stats.ActivePeers
connectedSeeders += stats.ConnectedSeeders
}
totalStats := s.client.Stats()
return perTorrentStats, TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(totalStats.BytesRead.Int64()),
UploadedBytes: uint64(totalStats.BytesWritten.Int64()),
TotalPeers: uint16(totalPeers),
ActivePeers: uint16(activePeers),
ConnectedSeeders: uint16(connectedSeeders),
}
}
func (s *Daemon) updateStats(ctx context.Context) {
log := s.log
perTorrentStats, totalStats := s.allStats(ctx)
for ih, v := range perTorrentStats {
err := s.statsStore.AddTorrentStats(ih, v)
if err != nil {
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
}
}
err := s.statsStore.AddTotalStats(totalStats)
if err != nil {
log.Error(ctx, "error saving total stats", rlog.Error(err))
}
}
func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadTotalStatsHistory(ctx, since)
}
func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih)
}
func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadStatsHistory(ctx, since)
}

View file

@ -432,6 +432,14 @@ func (tf *torrentFile) Name() string {
return tf.name
}
// Seek implements vfs.File.
func (tf *torrentFile) Seek(offset int64, whence int) (int64, error) {
tf.mu.Lock()
defer tf.mu.Unlock()
return tf.tr.Seek(offset, whence)
}
// Type implements File.
func (tf *torrentFile) Type() fs.FileMode {
return vfs.ROMode | fs.ModeDir
@ -482,8 +490,8 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
span.End()
}()
tf.mu.RLock()
defer tf.mu.RUnlock()
tf.mu.Lock()
defer tf.mu.Unlock()
ctx, cancel := tf.readTimeout(ctx)
defer cancel()

View file

@ -16,8 +16,11 @@ func registerTorrentMetrics(client *torrent.Client) error {
meterSeeders, _ := meter.Int64ObservableGauge("torrent.seeders")
meterDownloaded, _ := meter.Int64ObservableGauge("torrent.downloaded", metric.WithUnit("By"))
meterIO, _ := meter.Int64ObservableGauge("torrent.io", metric.WithUnit("By"))
meterLoaded, _ := meter.Int64ObservableGauge("torrent.loaded")
_, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
o.ObserveInt64(meterLoaded, int64(len(client.Torrents())))
for _, v := range client.Torrents() {
as := attribute.NewSet(
attribute.String("infohash", v.InfoHash().HexString()),
@ -34,7 +37,7 @@ func registerTorrentMetrics(client *torrent.Client) error {
}
return nil
}, meterTotalPeers, meterActivePeers, meterSeeders, meterIO, meterDownloaded)
}, meterTotalPeers, meterActivePeers, meterSeeders, meterIO, meterDownloaded, meterLoaded)
if err != nil {
return err
}

View file

@ -0,0 +1,24 @@
package torrent
import (
"github.com/anacrolix/dht/v2/krpc"
peer_store "github.com/anacrolix/dht/v2/peer-store"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
)
type peerStore struct {
store kv.Store[infohash.T, []krpc.NodeAddr]
}
var _ peer_store.Interface = (*peerStore)(nil)
// AddPeer implements peer_store.Interface.
func (p *peerStore) AddPeer(ih infohash.T, node krpc.NodeAddr) {
panic("unimplemented")
}
// GetPeers implements peer_store.Interface.
func (p *peerStore) GetPeers(ih infohash.T) []krpc.NodeAddr {
panic("unimplemented")
}

View file

@ -19,13 +19,14 @@ import (
)
// OpenTorrent implements storage.ClientImplCloser.
func (me *fileStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
ctx := context.Background()
func (me *fileStorage) OpenTorrent(ctx context.Context, info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
ctx, span := tracer.Start(ctx, "OpenTorrent")
defer span.End()
log := me.log.With(slog.String("infohash", infoHash.HexString()), slog.String("name", info.BestName()))
log.Debug(ctx, "opening torrent")
impl, err := me.client.OpenTorrent(info, infoHash)
impl, err := me.client.OpenTorrent(ctx, info, infoHash)
if err != nil {
log.Error(ctx, "error opening torrent", rlog.Error(err))
}

View file

@ -12,6 +12,7 @@ import (
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp"
logsdk "github.com/agoda-com/opentelemetry-logs-go/sdk/logs"
"github.com/google/uuid"
otelpyroscope "github.com/grafana/otel-profiling-go"
"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel"
@ -71,6 +72,7 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
semconv.SchemaURL,
semconv.ServiceName(appName),
semconv.HostName(hostName),
semconv.ServiceInstanceID(uuid.NewString()),
),
)
if err != nil {

View file

@ -193,6 +193,20 @@ type archiveFile struct {
buffer *filebuffer.Buffer
}
// Seek implements File.
func (d *archiveFile) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
d.offset = offset
case io.SeekCurrent:
d.offset += offset
case io.SeekEnd:
d.offset = d.size + offset
}
return d.offset, nil
}
// Name implements File.
func (d *archiveFile) Name() string {
return d.name

View file

@ -110,6 +110,11 @@ type CtxBillyFile struct {
file ctxbilly.File
}
// Seek implements File.
func (c *CtxBillyFile) Seek(offset int64, whence int) (int64, error) {
return c.file.Seek(offset, whence)
}
// Close implements File.
func (c *CtxBillyFile) Close(ctx context.Context) error {
return c.file.Close(ctx)

View file

@ -33,6 +33,11 @@ func (d *dirFile) IsDir() bool {
return true
}
// Seek implements File.
func (d *dirFile) Seek(offset int64, whence int) (int64, error) {
return 0, fs.ErrInvalid
}
// Name implements File.
func (d *dirFile) Name() string {
return d.name

View file

@ -79,6 +79,11 @@ type DummyFile struct {
name string
}
// Seek implements File.
func (d *DummyFile) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}
// Name implements File.
func (d *DummyFile) Name() string {
panic("unimplemented")

View file

@ -19,6 +19,7 @@ type File interface {
ctxio.Reader
ctxio.ReaderAt
ctxio.Closer
ctxio.Seeker
}
var ErrNotImplemented = errors.New("not implemented")

View file

@ -207,6 +207,11 @@ type LogFile struct {
timeout time.Duration
}
// Seek implements File.
func (f *LogFile) Seek(offset int64, whence int) (int64, error) {
return f.f.Seek(offset, whence)
}
// Name implements File.
func (f *LogFile) Name() string {
return f.f.Name()

View file

@ -108,6 +108,11 @@ func (d *MemoryFile) Name() string {
return d.name
}
// Seek implements File.
func (d *MemoryFile) Seek(offset int64, whence int) (int64, error) {
return d.data.Seek(offset, whence)
}
// Type implements File.
func (d *MemoryFile) Type() fs.FileMode {
return ROMode

View file

@ -122,6 +122,11 @@ func (f *LazyOsFile) Type() fs.FileMode {
return f.info.Mode()
}
// Seek implements File.
func (f *LazyOsFile) Seek(offset int64, whence int) (int64, error) {
return f.file.Seek(offset, whence)
}
// Close implements File.
func (f *LazyOsFile) Close(ctx context.Context) error {
if f.file == nil {