torrent stats

This commit is contained in:
royalcat 2024-07-16 23:58:06 +03:00
parent d5aa78cb39
commit f9311284fc
16 changed files with 1541 additions and 987 deletions

View file

@ -21,6 +21,9 @@ models:
model: github.com/99designs/gqlgen/graphql.Time model: github.com/99designs/gqlgen/graphql.Time
Int: Int:
model: github.com/99designs/gqlgen/graphql.Int64 model: github.com/99designs/gqlgen/graphql.Int64
UInt:
model:
- github.com/99designs/gqlgen/graphql.Uint
Torrent: Torrent:
extraFields: extraFields:
T: T:

View file

@ -5,6 +5,7 @@ directive @stream on FIELD_DEFINITION
scalar DateTime scalar DateTime
scalar Upload scalar Upload
scalar UInt
type Schema { type Schema {
query: Query query: Query

View file

@ -1,6 +1,7 @@
type TorrentDaemonQuery { type TorrentDaemonQuery {
torrents(filter: TorrentsFilter): [Torrent!]! @resolver torrents(filter: TorrentsFilter): [Torrent!]! @resolver
stats: TorrentStats! @resolver clientStats: TorrentClientStats! @resolver
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
} }
input TorrentsFilter { input TorrentsFilter {

View file

@ -33,7 +33,7 @@ enum TorrentPriority {
NOW NOW
} }
type TorrentStats { type TorrentClientStats {
bytesWritten: Int! bytesWritten: Int!
bytesWrittenData: Int! bytesWrittenData: Int!
bytesRead: Int! bytesRead: Int!
@ -51,3 +51,12 @@ type TorrentStats {
piecesDirtiedGood: Int! piecesDirtiedGood: Int!
piecesDirtiedBad: Int! piecesDirtiedBad: Int!
} }
type TorrentStats {
timestamp: DateTime!
downloadedBytes: UInt!
uploadedBytes: UInt!
totalPeers: UInt!
activePeers: UInt!
connectedSeeders: UInt!
}

View file

@ -1,126 +0,0 @@
package delivery
import (
"bytes"
"io"
"math"
"net/http"
"os"
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-gonic/gin"
)
var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerFunc {
return func(ctx *gin.Context) {
stat := gin.H{
"torrentStats": ss.GlobalStats(),
}
if fc != nil {
stat["cacheItems"] = fc.Info().NumItems
stat["cacheFilled"] = fc.Info().Filled / 1024 / 1024
stat["cacheCapacity"] = fc.Info().Capacity / 1024 / 1024
}
// TODO move to a struct
ctx.JSON(http.StatusOK, stat)
}
}
// var apiServersHandler = func(ss []*service.Server) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// var infos []*torrent.ServerInfo
// for _, s := range ss {
// infos = append(infos, s.Info())
// }
// ctx.JSON(http.StatusOK, infos)
// }
// }
// var apiRoutesHandler = func(ss *service.Stats) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// s := ss.RoutesStats()
// sort.Sort(torrent.ByName(s))
// ctx.JSON(http.StatusOK, s)
// }
// }
// var apiAddTorrentHandler = func(s *service.Service) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// route := ctx.Param("route")
// var json RouteAdd
// if err := ctx.ShouldBindJSON(&json); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
// if err := s.AddMagnet(route, json.Magnet); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
// ctx.JSON(http.StatusOK, nil)
// }
// }
// var apiDelTorrentHandler = func(s *service.Service) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// route := ctx.Param("route")
// hash := ctx.Param("torrent_hash")
// if err := s.RemoveFromHash(route, hash); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
// ctx.JSON(http.StatusOK, nil)
// }
// }
var apiLogHandler = func(path string) gin.HandlerFunc {
return func(ctx *gin.Context) {
f, err := os.Open(path)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
fi, err := f.Stat()
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
max := math.Max(float64(-fi.Size()), -1024*8*8)
_, err = f.Seek(int64(max), io.SeekEnd)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
var b bytes.Buffer
ctx.Stream(func(w io.Writer) bool {
_, err := b.ReadFrom(f)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return false
}
_, err = b.WriteTo(w)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return false
}
return true
})
if err := f.Close(); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -7,6 +7,14 @@ import (
atorrent "github.com/anacrolix/torrent" atorrent "github.com/anacrolix/torrent"
) )
func Apply[I any, O any](in []I, f func(I) O) []O {
out := make([]O, len(in))
for i, v := range in {
out[i] = f(v)
}
return out
}
func MapPeerSource(source atorrent.PeerSource) string { func MapPeerSource(source atorrent.PeerSource) string {
switch source { switch source {
case atorrent.PeerSourceDirect: case atorrent.PeerSourceDirect:
@ -43,3 +51,14 @@ func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) {
T: t, T: t,
}, nil }, nil
} }
func MapTorrentStats(s torrent.TorrentStats) *TorrentStats {
return &TorrentStats{
Timestamp: s.Timestamp,
DownloadedBytes: uint(s.DownloadedBytes),
UploadedBytes: uint(s.UploadedBytes),
TotalPeers: uint(s.TotalPeers),
ActivePeers: uint(s.ActivePeers),
ConnectedSeeders: uint(s.ConnectedSeeders),
}
}

View file

@ -184,6 +184,22 @@ type Torrent struct {
T *torrent.Controller `json:"-"` T *torrent.Controller `json:"-"`
} }
type TorrentClientStats struct {
BytesWritten int64 `json:"bytesWritten"`
BytesWrittenData int64 `json:"bytesWrittenData"`
BytesRead int64 `json:"bytesRead"`
BytesReadData int64 `json:"bytesReadData"`
BytesReadUsefulData int64 `json:"bytesReadUsefulData"`
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"`
ChunksWritten int64 `json:"chunksWritten"`
ChunksRead int64 `json:"chunksRead"`
ChunksReadUseful int64 `json:"chunksReadUseful"`
ChunksReadWasted int64 `json:"chunksReadWasted"`
MetadataChunksRead int64 `json:"metadataChunksRead"`
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
}
type TorrentDaemonMutation struct { type TorrentDaemonMutation struct {
ValidateTorrent bool `json:"validateTorrent"` ValidateTorrent bool `json:"validateTorrent"`
SetTorrentPriority bool `json:"setTorrentPriority"` SetTorrentPriority bool `json:"setTorrentPriority"`
@ -192,7 +208,8 @@ type TorrentDaemonMutation struct {
type TorrentDaemonQuery struct { type TorrentDaemonQuery struct {
Torrents []*Torrent `json:"torrents"` Torrents []*Torrent `json:"torrents"`
Stats *TorrentStats `json:"stats"` ClientStats *TorrentClientStats `json:"clientStats"`
StatsHistory []*TorrentStats `json:"statsHistory"`
} }
type TorrentFs struct { type TorrentFs struct {
@ -271,19 +288,12 @@ func (this TorrentProgress) GetCurrent() int64 { return this.Current }
func (this TorrentProgress) GetTotal() int64 { return this.Total } func (this TorrentProgress) GetTotal() int64 { return this.Total }
type TorrentStats struct { type TorrentStats struct {
BytesWritten int64 `json:"bytesWritten"` Timestamp time.Time `json:"timestamp"`
BytesWrittenData int64 `json:"bytesWrittenData"` DownloadedBytes uint `json:"downloadedBytes"`
BytesRead int64 `json:"bytesRead"` UploadedBytes uint `json:"uploadedBytes"`
BytesReadData int64 `json:"bytesReadData"` TotalPeers uint `json:"totalPeers"`
BytesReadUsefulData int64 `json:"bytesReadUsefulData"` ActivePeers uint `json:"activePeers"`
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"` ConnectedSeeders uint `json:"connectedSeeders"`
ChunksWritten int64 `json:"chunksWritten"`
ChunksRead int64 `json:"chunksRead"`
ChunksReadUseful int64 `json:"chunksReadUseful"`
ChunksReadWasted int64 `json:"chunksReadWasted"`
MetadataChunksRead int64 `json:"metadataChunksRead"`
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
} }
type TorrentsFilter struct { type TorrentsFilter struct {

View file

@ -8,10 +8,13 @@ import (
"context" "context"
"slices" "slices"
"strings" "strings"
"time"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
"git.kmsign.ru/royalcat/tstor/src/sources/torrent" "git.kmsign.ru/royalcat/tstor/src/sources/torrent"
tinfohash "github.com/anacrolix/torrent/types/infohash"
) )
// Torrents is the resolver for the torrents field. // Torrents is the resolver for the torrents field.
@ -86,10 +89,10 @@ func (r *torrentDaemonQueryResolver) Torrents(ctx context.Context, obj *model.To
return tr, nil return tr, nil
} }
// Stats is the resolver for the stats field. // ClientStats is the resolver for the clientStats field.
func (r *torrentDaemonQueryResolver) Stats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentStats, error) { func (r *torrentDaemonQueryResolver) ClientStats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentClientStats, error) {
stats := r.Service.Stats() stats := r.Service.Stats()
return &model.TorrentStats{ return &model.TorrentClientStats{
BytesWritten: stats.BytesWritten.Int64(), BytesWritten: stats.BytesWritten.Int64(),
BytesRead: stats.BytesRead.Int64(), BytesRead: stats.BytesRead.Int64(),
BytesWrittenData: stats.BytesWrittenData.Int64(), BytesWrittenData: stats.BytesWrittenData.Int64(),
@ -106,6 +109,33 @@ func (r *torrentDaemonQueryResolver) Stats(ctx context.Context, obj *model.Torre
}, nil }, nil
} }
// StatsHistory is the resolver for the statsHistory field.
func (r *torrentDaemonQueryResolver) StatsHistory(ctx context.Context, obj *model.TorrentDaemonQuery, since time.Time, infohash *string) ([]*model.TorrentStats, error) {
var stats []torrent.TorrentStats
if infohash == nil {
stats, err := r.Service.StatsHistory(ctx, since)
if err != nil {
return nil, err
}
return model.Apply(stats, model.MapTorrentStats), nil
} else if *infohash == "total" {
var err error
stats, err = r.Service.TotalStatsHistory(ctx, since)
if err != nil {
return nil, err
}
} else {
ih := tinfohash.FromHexString(*infohash)
var err error
stats, err = r.Service.TorrentStatsHistory(ctx, since, ih)
if err != nil {
return nil, err
}
}
return model.Apply(stats, model.MapTorrentStats), nil
}
// TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation. // TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation.
func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver { func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver {
return &torrentDaemonQueryResolver{r} return &torrentDaemonQueryResolver{r}

View file

@ -2,7 +2,6 @@ package torrent
import ( import (
"log/slog" "log/slog"
"time"
"github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/bep44" "github.com/anacrolix/dht/v2/bep44"
@ -59,9 +58,14 @@ func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) { torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
cfg.Store = fis cfg.Store = fis
cfg.Exp = 2 * time.Hour cfg.Exp = dhtTTL
cfg.NoSecurity = false cfg.NoSecurity = false
} }
return torrent.NewClient(torrentCfg) t, err := torrent.NewClient(torrentCfg)
if err != nil {
return nil, err
}
return t, nil
} }

View file

@ -48,6 +48,7 @@ type Daemon struct {
fis *dhtFileItemStore fis *dhtFileItemStore
dirsAquire kv.Store[string, DirAquire] dirsAquire kv.Store[string, DirAquire]
fileProperties kv.Store[string, FileProperties] fileProperties kv.Store[string, FileProperties]
statsStore *statsStore
loadMutex sync.Mutex loadMutex sync.Mutex
torrentLoaded chan struct{} torrentLoaded chan struct{}
@ -57,6 +58,8 @@ type Daemon struct {
log *rlog.Logger log *rlog.Logger
} }
const dhtTTL = 24 * time.Hour
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) { func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
s := &Daemon{ s := &Daemon{
log: rlog.Component("torrent-service"), log: rlog.Component("torrent-service"),
@ -70,7 +73,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return nil, fmt.Errorf("error creating metadata folder: %w", err) return nil, fmt.Errorf("error creating metadata folder: %w", err)
} }
s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), 3*time.Hour) s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), dhtTTL)
if err != nil { if err != nil {
return nil, fmt.Errorf("error starting item store: %w", err) return nil, fmt.Errorf("error starting item store: %w", err)
} }
@ -86,7 +89,6 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
} }
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder) s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -96,6 +98,11 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return nil, fmt.Errorf("error creating node ID: %w", err) return nil, fmt.Errorf("error creating node ID: %w", err)
} }
s.statsStore, err = newStatsStore(conf.MetadataFolder, time.Hour*24*30)
if err != nil {
return nil, err
}
s.client, err = newClient(s.Storage, s.fis, &conf, id) s.client, err = newClient(s.Storage, s.fis, &conf, id)
if err != nil { if err != nil {
return nil, fmt.Errorf("error starting torrent client: %w", err) return nil, fmt.Errorf("error starting torrent client: %w", err)
@ -116,9 +123,77 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
close(s.torrentLoaded) close(s.torrentLoaded)
}() }()
go func() {
const period = time.Second * 10
<-s.torrentLoaded
timer := time.NewTicker(period)
for {
select {
case <-s.client.Closed():
return
case <-timer.C:
s.updateStats(context.Background())
}
}
}()
return s, nil return s, nil
} }
func (s *Daemon) updateStats(ctx context.Context) {
log := s.log
totalPeers := 0
activePeers := 0
connectedSeeders := 0
for _, v := range s.client.Torrents() {
stats := v.Stats()
err := s.statsStore.AddTorrentStats(v.InfoHash(), TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(stats.BytesRead.Int64()),
UploadedBytes: uint64(stats.BytesWritten.Int64()),
TotalPeers: uint16(stats.TotalPeers),
ActivePeers: uint16(stats.ActivePeers),
ConnectedSeeders: uint16(stats.ConnectedSeeders),
})
if err != nil {
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
}
totalPeers += stats.TotalPeers
activePeers += stats.ActivePeers
connectedSeeders += stats.ConnectedSeeders
}
totalStats := s.client.Stats()
err := s.statsStore.AddTotalStats(TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(totalStats.BytesRead.Int64()),
UploadedBytes: uint64(totalStats.BytesWritten.Int64()),
TotalPeers: uint16(totalPeers),
ActivePeers: uint16(activePeers),
ConnectedSeeders: uint16(connectedSeeders),
})
if err != nil {
log.Error(ctx, "error saving total stats", rlog.Error(err))
}
}
func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadTotalStatsHistory(ctx, since)
}
func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih)
}
func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadStatsHistory(ctx, since)
}
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
func (s *Daemon) Close(ctx context.Context) error { func (s *Daemon) Close(ctx context.Context) error {
@ -237,7 +312,7 @@ func isValidInfoHashBytes(d []byte) bool {
} }
func (s *Daemon) Stats() torrent.ConnStats { func (s *Daemon) Stats() torrent.ConnStats {
return s.client.ConnStats() return s.client.Stats().ConnStats
} }
const loadWorkers = 5 const loadWorkers = 5

View file

@ -85,7 +85,25 @@ func (fis *dhtFileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
} }
func (fis *dhtFileItemStore) Del(t bep44.Target) error { func (fis *dhtFileItemStore) Del(t bep44.Target) error {
// ignore this tx := fis.db.NewTransaction(true)
defer tx.Discard()
err := tx.Delete(t[:])
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
err = tx.Commit()
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
return nil return nil
} }

View file

@ -155,7 +155,6 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
return fs.filesCache, nil return fs.filesCache, nil
} }
} }
DEFAULT_DIR: DEFAULT_DIR:

View file

@ -1,218 +1,207 @@
package torrent package torrent
import ( import (
"errors" "context"
"sync" "encoding/json"
"path"
"slices"
"time" "time"
"github.com/anacrolix/torrent" "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
) )
var ErrTorrentNotFound = errors.New("torrent not found") func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
db, err := badger.OpenManaged(
type PieceStatus string badger.
DefaultOptions(path.Join(metaDir, "stats")).
const ( WithNumVersionsToKeep(int(^uint(0) >> 1)).
Checking PieceStatus = "H" WithLogger(log.BadgerLogger("stats")), // Infinity
Partial PieceStatus = "P"
Complete PieceStatus = "C"
Waiting PieceStatus = "W"
Error PieceStatus = "?"
) )
if err != nil {
return nil, err
}
type PieceChunk struct { go func() {
Status PieceStatus `json:"status"` for n := range time.NewTimer(lifetime / 2).C {
NumPieces int `json:"numPieces"` db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
}
}()
return &statsStore{
db: db,
}, nil
}
type statsStore struct {
db *badger.DB
} }
type TorrentStats struct { type TorrentStats struct {
Name string `json:"name"` Timestamp time.Time
Hash string `json:"hash"` DownloadedBytes uint64
DownloadedBytes int64 `json:"downloadedBytes"` UploadedBytes uint64
UploadedBytes int64 `json:"uploadedBytes"` TotalPeers uint16
Peers int `json:"peers"` ActivePeers uint16
Seeders int `json:"seeders"` ConnectedSeeders uint16
TimePassed float64 `json:"timePassed"`
PieceChunks []*PieceChunk `json:"pieceChunks"`
TotalPieces int `json:"totalPieces"`
PieceSize int64 `json:"pieceSize"`
} }
type byName []*TorrentStats func (s TorrentStats) Same(o TorrentStats) bool {
return s.DownloadedBytes == o.DownloadedBytes &&
func (a byName) Len() int { return len(a) } s.UploadedBytes == o.UploadedBytes &&
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } s.TotalPeers == o.TotalPeers &&
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name } s.ActivePeers == o.ActivePeers &&
s.ConnectedSeeders == o.ConnectedSeeders
type TotalTorrentStats struct {
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
TimePassed float64 `json:"timePassed"`
} }
type RouteStats struct { func (r *statsStore) addStats(key []byte, stat TorrentStats) error {
Name string `json:"name"` ts := uint64(stat.Timestamp.Unix())
TorrentStats []*TorrentStats `json:"torrentStats"`
txn := r.db.NewTransactionAt(ts, true)
defer txn.Discard()
item, err := txn.Get(key)
if err != nil && err != badger.ErrKeyNotFound {
return err
} }
type ByName []*RouteStats if err != badger.ErrKeyNotFound {
var prevStats TorrentStats
func (a ByName) Len() int { return len(a) } err = item.Value(func(val []byte) error {
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } return json.Unmarshal(val, &prevStats)
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type stat struct {
totalDownloadBytes int64
downloadBytes int64
totalUploadBytes int64
uploadBytes int64
peers int
seeders int
time time.Time
}
type Stats struct {
mut sync.Mutex
torrents map[string]*torrent.Torrent
previousStats map[string]*stat
gTime time.Time
}
func NewStats() *Stats {
return &Stats{
gTime: time.Now(),
torrents: make(map[string]*torrent.Torrent),
// torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
// previousStats: make(map[string]*stat),
}
}
func (s *Stats) Add(path string, t *torrent.Torrent) {
s.mut.Lock()
defer s.mut.Unlock()
s.torrents[path] = t
s.previousStats[path] = &stat{}
}
func (s *Stats) Del(path string) {
s.mut.Lock()
defer s.mut.Unlock()
delete(s.torrents, path)
delete(s.previousStats, path)
}
func (s *Stats) Stats(path string) (*TorrentStats, error) {
s.mut.Lock()
defer s.mut.Unlock()
t, ok := s.torrents[path]
if !(ok) {
return nil, ErrTorrentNotFound
}
now := time.Now()
return s.stats(now, t, true), nil
}
func (s *Stats) GlobalStats() *TotalTorrentStats {
s.mut.Lock()
defer s.mut.Unlock()
now := time.Now()
var totalDownload int64
var totalUpload int64
for _, torrent := range s.torrents {
tStats := s.stats(now, torrent, false)
totalDownload += tStats.DownloadedBytes
totalUpload += tStats.UploadedBytes
}
timePassed := now.Sub(s.gTime)
s.gTime = now
return &TotalTorrentStats{
DownloadedBytes: totalDownload,
UploadedBytes: totalUpload,
TimePassed: timePassed.Seconds(),
}
}
func (s *Stats) stats(now time.Time, t *torrent.Torrent, chunks bool) *TorrentStats {
ts := &TorrentStats{}
prev, ok := s.previousStats[t.InfoHash().String()]
if !ok {
return &TorrentStats{}
}
if s.returnPreviousMeasurements(now) {
ts.DownloadedBytes = prev.downloadBytes
ts.UploadedBytes = prev.uploadBytes
} else {
st := t.Stats()
rd := st.BytesReadData.Int64()
wd := st.BytesWrittenData.Int64()
ist := &stat{
downloadBytes: rd - prev.totalDownloadBytes,
uploadBytes: wd - prev.totalUploadBytes,
totalDownloadBytes: rd,
totalUploadBytes: wd,
time: now,
peers: st.TotalPeers,
seeders: st.ConnectedSeeders,
}
ts.DownloadedBytes = ist.downloadBytes
ts.UploadedBytes = ist.uploadBytes
ts.Peers = ist.peers
ts.Seeders = ist.seeders
s.previousStats[t.InfoHash().String()] = ist
}
ts.TimePassed = now.Sub(prev.time).Seconds()
var totalPieces int
if chunks {
var pch []*PieceChunk
for _, psr := range t.PieceStateRuns() {
var s PieceStatus
switch {
case psr.Checking:
s = Checking
case psr.Partial:
s = Partial
case psr.Complete:
s = Complete
case !psr.Ok:
s = Error
default:
s = Waiting
}
pch = append(pch, &PieceChunk{
Status: s,
NumPieces: psr.Length,
}) })
totalPieces += psr.Length if err != nil {
} return err
ts.PieceChunks = pch
} }
ts.Hash = t.InfoHash().String() if prevStats.Same(stat) {
ts.Name = t.Name() return nil
ts.TotalPieces = totalPieces }
if t.Info() != nil {
ts.PieceSize = t.Info().PieceLength
} }
return ts data, err := json.Marshal(stat)
if err != nil {
return err
}
err = txn.Set(key, data)
if err != nil {
return err
} }
const gap time.Duration = 2 * time.Second return txn.CommitAt(ts, nil)
}
func (s *Stats) returnPreviousMeasurements(now time.Time) bool {
return now.Sub(s.gTime) < gap func (r *statsStore) AddTorrentStats(ih infohash.T, stat TorrentStats) error {
return r.addStats(ih.Bytes(), stat)
}
const totalKey = "total"
func (r *statsStore) AddTotalStats(stat TorrentStats) error {
return r.addStats([]byte(totalKey), stat)
}
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
stats := []TorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewKeyIterator([]byte(totalKey), opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var stat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &stat)
})
if err != nil {
return err
}
stats = append(stats, stat)
}
return nil
})
if err != nil {
return nil, err
}
slices.SortFunc(stats, func(a, b TorrentStats) int {
return a.Timestamp.Compare(b.Timestamp)
})
stats = slices.Compact(stats)
return stats, nil
}
func (r *statsStore) ReadTorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
stats := []TorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewKeyIterator(ih.Bytes(), opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var stat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &stat)
})
if err != nil {
return err
}
stats = append(stats, stat)
}
return nil
})
if err != nil {
return nil, err
}
slices.SortFunc(stats, func(a, b TorrentStats) int {
return a.Timestamp.Compare(b.Timestamp)
})
stats = slices.Compact(stats)
return stats, nil
}
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
stats := []TorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var stat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &stat)
})
if err != nil {
return err
}
stats = append(stats, stat)
}
return nil
})
if err != nil {
return nil, err
}
slices.SortFunc(stats, func(a, b TorrentStats) int {
return a.Timestamp.Compare(b.Timestamp)
})
stats = slices.Compact(stats)
return stats, nil
} }

View file

@ -1,127 +0,0 @@
package torrent
import (
"context"
"encoding/json"
"path"
"time"
"git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
"golang.org/x/exp/maps"
)
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
db, err := badger.OpenManaged(
badger.
DefaultOptions(path.Join(metaDir, "stats-history")).
WithNumVersionsToKeep(int(^uint(0) >> 1)).
WithLogger(log.BadgerLogger("stats")), // Infinity
)
if err != nil {
return nil, err
}
go func() {
for n := range time.NewTimer(lifetime / 2).C {
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
}
}()
return &statsStore{
db: db,
}, nil
}
type statsStore struct {
db *badger.DB
}
func (r *statsStore) AddStat(ih infohash.T, t time.Time, stat TorrentStats) error {
data, err := json.Marshal(stat)
if err != nil {
return err
}
ts := uint64(t.Unix())
txn := r.db.NewTransactionAt(ts, false)
defer txn.Discard()
err = txn.Set(ih.Bytes(), data)
if err != nil {
return err
}
return txn.CommitAt(ts, nil)
}
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TotalTorrentStats, error) {
stats := map[time.Time]TotalTorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
// k := item.Key()
var tstat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &tstat)
})
if err != nil {
return err
}
t := time.Unix(int64(item.Version()), 0)
if stat, ok := stats[t]; !ok {
stats[t] = TotalTorrentStats{
DownloadedBytes: tstat.DownloadedBytes,
UploadedBytes: stat.DownloadedBytes,
}
} else {
stat.DownloadedBytes += tstat.DownloadedBytes
stat.UploadedBytes += tstat.UploadedBytes
stats[t] = stat
}
}
return nil
})
return maps.Values(stats), err
}
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
var stats map[time.Time]TorrentStats
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewKeyIterator(ih.Bytes(), opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var tstat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &tstat)
})
if err != nil {
return err
}
t := time.Unix(int64(item.Version()), 0)
stats[t] = tstat
}
return nil
})
return maps.Values(stats), err
}

View file

@ -99,6 +99,21 @@ type Torrent {
excludedFiles: [TorrentFile!]! @resolver excludedFiles: [TorrentFile!]! @resolver
peers: [TorrentPeer!]! @resolver peers: [TorrentPeer!]! @resolver
} }
type TorrentClientStats {
bytesWritten: Int!
bytesWrittenData: Int!
bytesRead: Int!
bytesReadData: Int!
bytesReadUsefulData: Int!
bytesReadUsefulIntendedData: Int!
chunksWritten: Int!
chunksRead: Int!
chunksReadUseful: Int!
chunksReadWasted: Int!
metadataChunksRead: Int!
piecesDirtiedGood: Int!
piecesDirtiedBad: Int!
}
type TorrentDaemonMutation { type TorrentDaemonMutation {
validateTorrent(filter: TorrentFilter!): Boolean! @resolver validateTorrent(filter: TorrentFilter!): Boolean! @resolver
setTorrentPriority(infohash: String!, file: String, priority: TorrentPriority!): Boolean! @resolver setTorrentPriority(infohash: String!, file: String, priority: TorrentPriority!): Boolean! @resolver
@ -106,7 +121,8 @@ type TorrentDaemonMutation {
} }
type TorrentDaemonQuery { type TorrentDaemonQuery {
torrents(filter: TorrentsFilter): [Torrent!]! @resolver torrents(filter: TorrentsFilter): [Torrent!]! @resolver
stats: TorrentStats! @resolver clientStats: TorrentClientStats! @resolver
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
} }
type TorrentFS implements Dir & FsEntry { type TorrentFS implements Dir & FsEntry {
name: String! name: String!
@ -156,19 +172,12 @@ type TorrentProgress implements Progress {
total: Int! total: Int!
} }
type TorrentStats { type TorrentStats {
bytesWritten: Int! timestamp: DateTime!
bytesWrittenData: Int! downloadedBytes: UInt!
bytesRead: Int! uploadedBytes: UInt!
bytesReadData: Int! totalPeers: UInt!
bytesReadUsefulData: Int! activePeers: UInt!
bytesReadUsefulIntendedData: Int! connectedSeeders: UInt!
chunksWritten: Int!
chunksRead: Int!
chunksReadUseful: Int!
chunksReadWasted: Int!
metadataChunksRead: Int!
piecesDirtiedGood: Int!
piecesDirtiedBad: Int!
} }
input TorrentsFilter { input TorrentsFilter {
infohash: StringFilter infohash: StringFilter
@ -178,4 +187,5 @@ input TorrentsFilter {
peersCount: IntFilter peersCount: IntFilter
priority: TorrentPriorityFilter priority: TorrentPriorityFilter
} }
scalar UInt
scalar Upload scalar Upload