Compare commits

..

2 commits

Author SHA1 Message Date
633a1d6e25 reduce nfs file interface
All checks were successful
docker / build-docker (linux/arm64) (push) Successful in 2m32s
docker / build-docker (linux/amd64) (push) Successful in 2m35s
2024-07-16 23:58:22 +03:00
f9311284fc torrent stats 2024-07-16 23:58:06 +03:00
18 changed files with 1543 additions and 989 deletions

View file

@ -21,6 +21,9 @@ models:
model: github.com/99designs/gqlgen/graphql.Time
Int:
model: github.com/99designs/gqlgen/graphql.Int64
UInt:
model:
- github.com/99designs/gqlgen/graphql.Uint
Torrent:
extraFields:
T:

View file

@ -5,6 +5,7 @@ directive @stream on FIELD_DEFINITION
scalar DateTime
scalar Upload
scalar UInt
type Schema {
query: Query

View file

@ -1,6 +1,7 @@
type TorrentDaemonQuery {
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
stats: TorrentStats! @resolver
clientStats: TorrentClientStats! @resolver
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
}
input TorrentsFilter {

View file

@ -33,7 +33,7 @@ enum TorrentPriority {
NOW
}
type TorrentStats {
type TorrentClientStats {
bytesWritten: Int!
bytesWrittenData: Int!
bytesRead: Int!
@ -51,3 +51,12 @@ type TorrentStats {
piecesDirtiedGood: Int!
piecesDirtiedBad: Int!
}
type TorrentStats {
timestamp: DateTime!
downloadedBytes: UInt!
uploadedBytes: UInt!
totalPeers: UInt!
activePeers: UInt!
connectedSeeders: UInt!
}

View file

@ -70,7 +70,7 @@ type File interface {
// Name returns the name of the file as presented to Open.
Name() string
ctxio.Writer
ctxio.Reader
// ctxio.Reader
ctxio.ReaderAt
io.Seeker
ctxio.Closer

View file

@ -89,7 +89,7 @@ func TestNFS(t *testing.T) {
}
mf, _ := mem.OpenFile(ctx, "/helloworld.txt", os.O_RDONLY, 0)
buf := make([]byte, len(b))
if _, err = mf.Read(ctx, buf[:]); err != nil {
if _, err = mf.ReadAt(ctx, buf[:], 0); err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, b) {

View file

@ -1,126 +0,0 @@
package delivery
import (
"bytes"
"io"
"math"
"net/http"
"os"
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-gonic/gin"
)
var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerFunc {
return func(ctx *gin.Context) {
stat := gin.H{
"torrentStats": ss.GlobalStats(),
}
if fc != nil {
stat["cacheItems"] = fc.Info().NumItems
stat["cacheFilled"] = fc.Info().Filled / 1024 / 1024
stat["cacheCapacity"] = fc.Info().Capacity / 1024 / 1024
}
// TODO move to a struct
ctx.JSON(http.StatusOK, stat)
}
}
// var apiServersHandler = func(ss []*service.Server) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// var infos []*torrent.ServerInfo
// for _, s := range ss {
// infos = append(infos, s.Info())
// }
// ctx.JSON(http.StatusOK, infos)
// }
// }
// var apiRoutesHandler = func(ss *service.Stats) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// s := ss.RoutesStats()
// sort.Sort(torrent.ByName(s))
// ctx.JSON(http.StatusOK, s)
// }
// }
// var apiAddTorrentHandler = func(s *service.Service) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// route := ctx.Param("route")
// var json RouteAdd
// if err := ctx.ShouldBindJSON(&json); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
// if err := s.AddMagnet(route, json.Magnet); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
// ctx.JSON(http.StatusOK, nil)
// }
// }
// var apiDelTorrentHandler = func(s *service.Service) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// route := ctx.Param("route")
// hash := ctx.Param("torrent_hash")
// if err := s.RemoveFromHash(route, hash); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
// ctx.JSON(http.StatusOK, nil)
// }
// }
var apiLogHandler = func(path string) gin.HandlerFunc {
return func(ctx *gin.Context) {
f, err := os.Open(path)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
fi, err := f.Stat()
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
max := math.Max(float64(-fi.Size()), -1024*8*8)
_, err = f.Seek(int64(max), io.SeekEnd)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
var b bytes.Buffer
ctx.Stream(func(w io.Writer) bool {
_, err := b.ReadFrom(f)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return false
}
_, err = b.WriteTo(w)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return false
}
return true
})
if err := f.Close(); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -7,6 +7,14 @@ import (
atorrent "github.com/anacrolix/torrent"
)
func Apply[I any, O any](in []I, f func(I) O) []O {
out := make([]O, len(in))
for i, v := range in {
out[i] = f(v)
}
return out
}
func MapPeerSource(source atorrent.PeerSource) string {
switch source {
case atorrent.PeerSourceDirect:
@ -43,3 +51,14 @@ func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) {
T: t,
}, nil
}
func MapTorrentStats(s torrent.TorrentStats) *TorrentStats {
return &TorrentStats{
Timestamp: s.Timestamp,
DownloadedBytes: uint(s.DownloadedBytes),
UploadedBytes: uint(s.UploadedBytes),
TotalPeers: uint(s.TotalPeers),
ActivePeers: uint(s.ActivePeers),
ConnectedSeeders: uint(s.ConnectedSeeders),
}
}

View file

@ -184,6 +184,22 @@ type Torrent struct {
T *torrent.Controller `json:"-"`
}
type TorrentClientStats struct {
BytesWritten int64 `json:"bytesWritten"`
BytesWrittenData int64 `json:"bytesWrittenData"`
BytesRead int64 `json:"bytesRead"`
BytesReadData int64 `json:"bytesReadData"`
BytesReadUsefulData int64 `json:"bytesReadUsefulData"`
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"`
ChunksWritten int64 `json:"chunksWritten"`
ChunksRead int64 `json:"chunksRead"`
ChunksReadUseful int64 `json:"chunksReadUseful"`
ChunksReadWasted int64 `json:"chunksReadWasted"`
MetadataChunksRead int64 `json:"metadataChunksRead"`
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
}
type TorrentDaemonMutation struct {
ValidateTorrent bool `json:"validateTorrent"`
SetTorrentPriority bool `json:"setTorrentPriority"`
@ -192,7 +208,8 @@ type TorrentDaemonMutation struct {
type TorrentDaemonQuery struct {
Torrents []*Torrent `json:"torrents"`
Stats *TorrentStats `json:"stats"`
ClientStats *TorrentClientStats `json:"clientStats"`
StatsHistory []*TorrentStats `json:"statsHistory"`
}
type TorrentFs struct {
@ -271,19 +288,12 @@ func (this TorrentProgress) GetCurrent() int64 { return this.Current }
func (this TorrentProgress) GetTotal() int64 { return this.Total }
type TorrentStats struct {
BytesWritten int64 `json:"bytesWritten"`
BytesWrittenData int64 `json:"bytesWrittenData"`
BytesRead int64 `json:"bytesRead"`
BytesReadData int64 `json:"bytesReadData"`
BytesReadUsefulData int64 `json:"bytesReadUsefulData"`
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"`
ChunksWritten int64 `json:"chunksWritten"`
ChunksRead int64 `json:"chunksRead"`
ChunksReadUseful int64 `json:"chunksReadUseful"`
ChunksReadWasted int64 `json:"chunksReadWasted"`
MetadataChunksRead int64 `json:"metadataChunksRead"`
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
Timestamp time.Time `json:"timestamp"`
DownloadedBytes uint `json:"downloadedBytes"`
UploadedBytes uint `json:"uploadedBytes"`
TotalPeers uint `json:"totalPeers"`
ActivePeers uint `json:"activePeers"`
ConnectedSeeders uint `json:"connectedSeeders"`
}
type TorrentsFilter struct {

View file

@ -8,10 +8,13 @@ import (
"context"
"slices"
"strings"
"time"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
tinfohash "github.com/anacrolix/torrent/types/infohash"
)
// Torrents is the resolver for the torrents field.
@ -86,10 +89,10 @@ func (r *torrentDaemonQueryResolver) Torrents(ctx context.Context, obj *model.To
return tr, nil
}
// Stats is the resolver for the stats field.
func (r *torrentDaemonQueryResolver) Stats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentStats, error) {
// ClientStats is the resolver for the clientStats field.
func (r *torrentDaemonQueryResolver) ClientStats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentClientStats, error) {
stats := r.Service.Stats()
return &model.TorrentStats{
return &model.TorrentClientStats{
BytesWritten: stats.BytesWritten.Int64(),
BytesRead: stats.BytesRead.Int64(),
BytesWrittenData: stats.BytesWrittenData.Int64(),
@ -106,6 +109,33 @@ func (r *torrentDaemonQueryResolver) Stats(ctx context.Context, obj *model.Torre
}, nil
}
// StatsHistory is the resolver for the statsHistory field.
func (r *torrentDaemonQueryResolver) StatsHistory(ctx context.Context, obj *model.TorrentDaemonQuery, since time.Time, infohash *string) ([]*model.TorrentStats, error) {
var stats []torrent.TorrentStats
if infohash == nil {
stats, err := r.Service.StatsHistory(ctx, since)
if err != nil {
return nil, err
}
return model.Apply(stats, model.MapTorrentStats), nil
} else if *infohash == "total" {
var err error
stats, err = r.Service.TotalStatsHistory(ctx, since)
if err != nil {
return nil, err
}
} else {
ih := tinfohash.FromHexString(*infohash)
var err error
stats, err = r.Service.TorrentStatsHistory(ctx, since, ih)
if err != nil {
return nil, err
}
}
return model.Apply(stats, model.MapTorrentStats), nil
}
// TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation.
func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver {
return &torrentDaemonQueryResolver{r}

View file

@ -2,7 +2,6 @@ package torrent
import (
"log/slog"
"time"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/bep44"
@ -59,9 +58,14 @@ func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
cfg.Store = fis
cfg.Exp = 2 * time.Hour
cfg.Exp = dhtTTL
cfg.NoSecurity = false
}
return torrent.NewClient(torrentCfg)
t, err := torrent.NewClient(torrentCfg)
if err != nil {
return nil, err
}
return t, nil
}

View file

@ -48,6 +48,7 @@ type Daemon struct {
fis *dhtFileItemStore
dirsAquire kv.Store[string, DirAquire]
fileProperties kv.Store[string, FileProperties]
statsStore *statsStore
loadMutex sync.Mutex
torrentLoaded chan struct{}
@ -57,6 +58,8 @@ type Daemon struct {
log *rlog.Logger
}
const dhtTTL = 24 * time.Hour
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
s := &Daemon{
log: rlog.Component("torrent-service"),
@ -70,7 +73,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return nil, fmt.Errorf("error creating metadata folder: %w", err)
}
s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), 3*time.Hour)
s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), dhtTTL)
if err != nil {
return nil, fmt.Errorf("error starting item store: %w", err)
}
@ -86,7 +89,6 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
}
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
if err != nil {
return nil, err
}
@ -96,6 +98,11 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return nil, fmt.Errorf("error creating node ID: %w", err)
}
s.statsStore, err = newStatsStore(conf.MetadataFolder, time.Hour*24*30)
if err != nil {
return nil, err
}
s.client, err = newClient(s.Storage, s.fis, &conf, id)
if err != nil {
return nil, fmt.Errorf("error starting torrent client: %w", err)
@ -116,9 +123,77 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
close(s.torrentLoaded)
}()
go func() {
const period = time.Second * 10
<-s.torrentLoaded
timer := time.NewTicker(period)
for {
select {
case <-s.client.Closed():
return
case <-timer.C:
s.updateStats(context.Background())
}
}
}()
return s, nil
}
func (s *Daemon) updateStats(ctx context.Context) {
log := s.log
totalPeers := 0
activePeers := 0
connectedSeeders := 0
for _, v := range s.client.Torrents() {
stats := v.Stats()
err := s.statsStore.AddTorrentStats(v.InfoHash(), TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(stats.BytesRead.Int64()),
UploadedBytes: uint64(stats.BytesWritten.Int64()),
TotalPeers: uint16(stats.TotalPeers),
ActivePeers: uint16(stats.ActivePeers),
ConnectedSeeders: uint16(stats.ConnectedSeeders),
})
if err != nil {
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
}
totalPeers += stats.TotalPeers
activePeers += stats.ActivePeers
connectedSeeders += stats.ConnectedSeeders
}
totalStats := s.client.Stats()
err := s.statsStore.AddTotalStats(TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(totalStats.BytesRead.Int64()),
UploadedBytes: uint64(totalStats.BytesWritten.Int64()),
TotalPeers: uint16(totalPeers),
ActivePeers: uint16(activePeers),
ConnectedSeeders: uint16(connectedSeeders),
})
if err != nil {
log.Error(ctx, "error saving total stats", rlog.Error(err))
}
}
func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadTotalStatsHistory(ctx, since)
}
func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih)
}
func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
return s.statsStore.ReadStatsHistory(ctx, since)
}
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
func (s *Daemon) Close(ctx context.Context) error {
@ -237,7 +312,7 @@ func isValidInfoHashBytes(d []byte) bool {
}
func (s *Daemon) Stats() torrent.ConnStats {
return s.client.ConnStats()
return s.client.Stats().ConnStats
}
const loadWorkers = 5

View file

@ -85,7 +85,25 @@ func (fis *dhtFileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
}
func (fis *dhtFileItemStore) Del(t bep44.Target) error {
// ignore this
tx := fis.db.NewTransaction(true)
defer tx.Discard()
err := tx.Delete(t[:])
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
err = tx.Commit()
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
return nil
}

View file

@ -155,7 +155,6 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
return fs.filesCache, nil
}
}
DEFAULT_DIR:

View file

@ -1,218 +1,207 @@
package torrent
import (
"errors"
"sync"
"context"
"encoding/json"
"path"
"slices"
"time"
"github.com/anacrolix/torrent"
"git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
)
var ErrTorrentNotFound = errors.New("torrent not found")
type PieceStatus string
const (
Checking PieceStatus = "H"
Partial PieceStatus = "P"
Complete PieceStatus = "C"
Waiting PieceStatus = "W"
Error PieceStatus = "?"
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
db, err := badger.OpenManaged(
badger.
DefaultOptions(path.Join(metaDir, "stats")).
WithNumVersionsToKeep(int(^uint(0) >> 1)).
WithLogger(log.BadgerLogger("stats")), // Infinity
)
if err != nil {
return nil, err
}
type PieceChunk struct {
Status PieceStatus `json:"status"`
NumPieces int `json:"numPieces"`
go func() {
for n := range time.NewTimer(lifetime / 2).C {
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
}
}()
return &statsStore{
db: db,
}, nil
}
type statsStore struct {
db *badger.DB
}
type TorrentStats struct {
Name string `json:"name"`
Hash string `json:"hash"`
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
Peers int `json:"peers"`
Seeders int `json:"seeders"`
TimePassed float64 `json:"timePassed"`
PieceChunks []*PieceChunk `json:"pieceChunks"`
TotalPieces int `json:"totalPieces"`
PieceSize int64 `json:"pieceSize"`
Timestamp time.Time
DownloadedBytes uint64
UploadedBytes uint64
TotalPeers uint16
ActivePeers uint16
ConnectedSeeders uint16
}
type byName []*TorrentStats
func (a byName) Len() int { return len(a) }
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type TotalTorrentStats struct {
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
TimePassed float64 `json:"timePassed"`
func (s TorrentStats) Same(o TorrentStats) bool {
return s.DownloadedBytes == o.DownloadedBytes &&
s.UploadedBytes == o.UploadedBytes &&
s.TotalPeers == o.TotalPeers &&
s.ActivePeers == o.ActivePeers &&
s.ConnectedSeeders == o.ConnectedSeeders
}
type RouteStats struct {
Name string `json:"name"`
TorrentStats []*TorrentStats `json:"torrentStats"`
func (r *statsStore) addStats(key []byte, stat TorrentStats) error {
ts := uint64(stat.Timestamp.Unix())
txn := r.db.NewTransactionAt(ts, true)
defer txn.Discard()
item, err := txn.Get(key)
if err != nil && err != badger.ErrKeyNotFound {
return err
}
type ByName []*RouteStats
func (a ByName) Len() int { return len(a) }
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type stat struct {
totalDownloadBytes int64
downloadBytes int64
totalUploadBytes int64
uploadBytes int64
peers int
seeders int
time time.Time
}
type Stats struct {
mut sync.Mutex
torrents map[string]*torrent.Torrent
previousStats map[string]*stat
gTime time.Time
}
func NewStats() *Stats {
return &Stats{
gTime: time.Now(),
torrents: make(map[string]*torrent.Torrent),
// torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
// previousStats: make(map[string]*stat),
}
}
func (s *Stats) Add(path string, t *torrent.Torrent) {
s.mut.Lock()
defer s.mut.Unlock()
s.torrents[path] = t
s.previousStats[path] = &stat{}
}
func (s *Stats) Del(path string) {
s.mut.Lock()
defer s.mut.Unlock()
delete(s.torrents, path)
delete(s.previousStats, path)
}
func (s *Stats) Stats(path string) (*TorrentStats, error) {
s.mut.Lock()
defer s.mut.Unlock()
t, ok := s.torrents[path]
if !(ok) {
return nil, ErrTorrentNotFound
}
now := time.Now()
return s.stats(now, t, true), nil
}
func (s *Stats) GlobalStats() *TotalTorrentStats {
s.mut.Lock()
defer s.mut.Unlock()
now := time.Now()
var totalDownload int64
var totalUpload int64
for _, torrent := range s.torrents {
tStats := s.stats(now, torrent, false)
totalDownload += tStats.DownloadedBytes
totalUpload += tStats.UploadedBytes
}
timePassed := now.Sub(s.gTime)
s.gTime = now
return &TotalTorrentStats{
DownloadedBytes: totalDownload,
UploadedBytes: totalUpload,
TimePassed: timePassed.Seconds(),
}
}
func (s *Stats) stats(now time.Time, t *torrent.Torrent, chunks bool) *TorrentStats {
ts := &TorrentStats{}
prev, ok := s.previousStats[t.InfoHash().String()]
if !ok {
return &TorrentStats{}
}
if s.returnPreviousMeasurements(now) {
ts.DownloadedBytes = prev.downloadBytes
ts.UploadedBytes = prev.uploadBytes
} else {
st := t.Stats()
rd := st.BytesReadData.Int64()
wd := st.BytesWrittenData.Int64()
ist := &stat{
downloadBytes: rd - prev.totalDownloadBytes,
uploadBytes: wd - prev.totalUploadBytes,
totalDownloadBytes: rd,
totalUploadBytes: wd,
time: now,
peers: st.TotalPeers,
seeders: st.ConnectedSeeders,
}
ts.DownloadedBytes = ist.downloadBytes
ts.UploadedBytes = ist.uploadBytes
ts.Peers = ist.peers
ts.Seeders = ist.seeders
s.previousStats[t.InfoHash().String()] = ist
}
ts.TimePassed = now.Sub(prev.time).Seconds()
var totalPieces int
if chunks {
var pch []*PieceChunk
for _, psr := range t.PieceStateRuns() {
var s PieceStatus
switch {
case psr.Checking:
s = Checking
case psr.Partial:
s = Partial
case psr.Complete:
s = Complete
case !psr.Ok:
s = Error
default:
s = Waiting
}
pch = append(pch, &PieceChunk{
Status: s,
NumPieces: psr.Length,
if err != badger.ErrKeyNotFound {
var prevStats TorrentStats
err = item.Value(func(val []byte) error {
return json.Unmarshal(val, &prevStats)
})
totalPieces += psr.Length
}
ts.PieceChunks = pch
if err != nil {
return err
}
ts.Hash = t.InfoHash().String()
ts.Name = t.Name()
ts.TotalPieces = totalPieces
if t.Info() != nil {
ts.PieceSize = t.Info().PieceLength
if prevStats.Same(stat) {
return nil
}
}
return ts
data, err := json.Marshal(stat)
if err != nil {
return err
}
err = txn.Set(key, data)
if err != nil {
return err
}
const gap time.Duration = 2 * time.Second
func (s *Stats) returnPreviousMeasurements(now time.Time) bool {
return now.Sub(s.gTime) < gap
return txn.CommitAt(ts, nil)
}
func (r *statsStore) AddTorrentStats(ih infohash.T, stat TorrentStats) error {
return r.addStats(ih.Bytes(), stat)
}
const totalKey = "total"
func (r *statsStore) AddTotalStats(stat TorrentStats) error {
return r.addStats([]byte(totalKey), stat)
}
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
stats := []TorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewKeyIterator([]byte(totalKey), opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var stat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &stat)
})
if err != nil {
return err
}
stats = append(stats, stat)
}
return nil
})
if err != nil {
return nil, err
}
slices.SortFunc(stats, func(a, b TorrentStats) int {
return a.Timestamp.Compare(b.Timestamp)
})
stats = slices.Compact(stats)
return stats, nil
}
func (r *statsStore) ReadTorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
stats := []TorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewKeyIterator(ih.Bytes(), opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var stat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &stat)
})
if err != nil {
return err
}
stats = append(stats, stat)
}
return nil
})
if err != nil {
return nil, err
}
slices.SortFunc(stats, func(a, b TorrentStats) int {
return a.Timestamp.Compare(b.Timestamp)
})
stats = slices.Compact(stats)
return stats, nil
}
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
stats := []TorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var stat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &stat)
})
if err != nil {
return err
}
stats = append(stats, stat)
}
return nil
})
if err != nil {
return nil, err
}
slices.SortFunc(stats, func(a, b TorrentStats) int {
return a.Timestamp.Compare(b.Timestamp)
})
stats = slices.Compact(stats)
return stats, nil
}

View file

@ -1,127 +0,0 @@
package torrent
import (
"context"
"encoding/json"
"path"
"time"
"git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
"golang.org/x/exp/maps"
)
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
db, err := badger.OpenManaged(
badger.
DefaultOptions(path.Join(metaDir, "stats-history")).
WithNumVersionsToKeep(int(^uint(0) >> 1)).
WithLogger(log.BadgerLogger("stats")), // Infinity
)
if err != nil {
return nil, err
}
go func() {
for n := range time.NewTimer(lifetime / 2).C {
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
}
}()
return &statsStore{
db: db,
}, nil
}
type statsStore struct {
db *badger.DB
}
func (r *statsStore) AddStat(ih infohash.T, t time.Time, stat TorrentStats) error {
data, err := json.Marshal(stat)
if err != nil {
return err
}
ts := uint64(t.Unix())
txn := r.db.NewTransactionAt(ts, false)
defer txn.Discard()
err = txn.Set(ih.Bytes(), data)
if err != nil {
return err
}
return txn.CommitAt(ts, nil)
}
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TotalTorrentStats, error) {
stats := map[time.Time]TotalTorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
// k := item.Key()
var tstat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &tstat)
})
if err != nil {
return err
}
t := time.Unix(int64(item.Version()), 0)
if stat, ok := stats[t]; !ok {
stats[t] = TotalTorrentStats{
DownloadedBytes: tstat.DownloadedBytes,
UploadedBytes: stat.DownloadedBytes,
}
} else {
stat.DownloadedBytes += tstat.DownloadedBytes
stat.UploadedBytes += tstat.UploadedBytes
stats[t] = stat
}
}
return nil
})
return maps.Values(stats), err
}
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
var stats map[time.Time]TorrentStats
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewKeyIterator(ih.Bytes(), opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var tstat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &tstat)
})
if err != nil {
return err
}
t := time.Unix(int64(item.Version()), 0)
stats[t] = tstat
}
return nil
})
return maps.Values(stats), err
}

View file

@ -99,6 +99,21 @@ type Torrent {
excludedFiles: [TorrentFile!]! @resolver
peers: [TorrentPeer!]! @resolver
}
type TorrentClientStats {
bytesWritten: Int!
bytesWrittenData: Int!
bytesRead: Int!
bytesReadData: Int!
bytesReadUsefulData: Int!
bytesReadUsefulIntendedData: Int!
chunksWritten: Int!
chunksRead: Int!
chunksReadUseful: Int!
chunksReadWasted: Int!
metadataChunksRead: Int!
piecesDirtiedGood: Int!
piecesDirtiedBad: Int!
}
type TorrentDaemonMutation {
validateTorrent(filter: TorrentFilter!): Boolean! @resolver
setTorrentPriority(infohash: String!, file: String, priority: TorrentPriority!): Boolean! @resolver
@ -106,7 +121,8 @@ type TorrentDaemonMutation {
}
type TorrentDaemonQuery {
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
stats: TorrentStats! @resolver
clientStats: TorrentClientStats! @resolver
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
}
type TorrentFS implements Dir & FsEntry {
name: String!
@ -156,19 +172,12 @@ type TorrentProgress implements Progress {
total: Int!
}
type TorrentStats {
bytesWritten: Int!
bytesWrittenData: Int!
bytesRead: Int!
bytesReadData: Int!
bytesReadUsefulData: Int!
bytesReadUsefulIntendedData: Int!
chunksWritten: Int!
chunksRead: Int!
chunksReadUseful: Int!
chunksReadWasted: Int!
metadataChunksRead: Int!
piecesDirtiedGood: Int!
piecesDirtiedBad: Int!
timestamp: DateTime!
downloadedBytes: UInt!
uploadedBytes: UInt!
totalPeers: UInt!
activePeers: UInt!
connectedSeeders: UInt!
}
input TorrentsFilter {
infohash: StringFilter
@ -178,4 +187,5 @@ input TorrentsFilter {
peersCount: IntFilter
priority: TorrentPriorityFilter
}
scalar UInt
scalar Upload