torrent stats

This commit is contained in:
royalcat 2024-05-21 10:28:53 +03:00
parent ef6680b854
commit d056ac1167
2 changed files with 85 additions and 46 deletions

View file

@ -44,7 +44,7 @@ func (a byName) Len() int { return len(a) }
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name } func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type GlobalTorrentStats struct { type TotalTorrentStats struct {
DownloadedBytes int64 `json:"downloadedBytes"` DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"` UploadedBytes int64 `json:"uploadedBytes"`
TimePassed float64 `json:"timePassed"` TimePassed float64 `json:"timePassed"`
@ -117,7 +117,7 @@ func (s *Stats) Stats(path string) (*TorrentStats, error) {
return s.stats(now, t, true), nil return s.stats(now, t, true), nil
} }
func (s *Stats) GlobalStats() *GlobalTorrentStats { func (s *Stats) GlobalStats() *TotalTorrentStats {
s.mut.Lock() s.mut.Lock()
defer s.mut.Unlock() defer s.mut.Unlock()
@ -134,7 +134,7 @@ func (s *Stats) GlobalStats() *GlobalTorrentStats {
timePassed := now.Sub(s.gTime) timePassed := now.Sub(s.gTime)
s.gTime = now s.gTime = now
return &GlobalTorrentStats{ return &TotalTorrentStats{
DownloadedBytes: totalDownload, DownloadedBytes: totalDownload,
UploadedBytes: totalUpload, UploadedBytes: totalUpload,
TimePassed: timePassed.Seconds(), TimePassed: timePassed.Seconds(),

View file

@ -8,21 +8,9 @@ import (
"github.com/anacrolix/torrent/types/infohash" "github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/ristretto/z" "golang.org/x/exp/maps"
) )
type TorrentStat struct {
Name string `json:"name"`
Hash string `json:"hash"`
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
Peers int `json:"peers"`
Seeders int `json:"seeders"`
PieceChunks []*PieceChunk `json:"pieceChunks"`
TotalPieces int `json:"totalPieces"`
PieceSize int64 `json:"pieceSize"`
}
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) { func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
db, err := badger.OpenManaged( db, err := badger.OpenManaged(
badger. badger.
@ -38,49 +26,100 @@ func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error)
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix())) db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
} }
}() }()
r := &statsStore{ return &statsStore{
db: db, db: db,
} }, nil
return r, nil
} }
type statsStore struct { type statsStore struct {
db *badger.DB db *badger.DB
} }
func (r *statsStore) AddStat(ih infohash.T, stat TorrentStat) error { func (r *statsStore) AddStat(ih infohash.T, t time.Time, stat TorrentStats) error {
data, err := json.Marshal(stat) data, err := json.Marshal(stat)
if err != nil { if err != nil {
return err return err
} }
ts := uint64(t.Unix())
return r.db.Update(func(txn *badger.Txn) error { txn := r.db.NewTransactionAt(ts, false)
return txn.Set(ih.Bytes(), data) defer txn.Discard()
})
}
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) { err = txn.Set(ih.Bytes(), data)
var stats GlobalTorrentStats
stream := r.db.NewStream()
stream.SinceTs = uint64(since.Unix())
var tstat TorrentStat
stream.Send = func(buf *z.Buffer) error {
err := json.Unmarshal(buf.Bytes(), &tstat)
if err != nil {
return err
}
stats.DownloadedBytes += tstat.DownloadedBytes
stats.UploadedBytes += tstat.UploadedBytes
return nil
}
err := stream.Orchestrate(ctx)
if err != nil { if err != nil {
return stats, err return err
} }
return stats, nil
return txn.CommitAt(ts, nil)
}
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TotalTorrentStats, error) {
stats := map[time.Time]TotalTorrentStats{}
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
// k := item.Key()
var tstat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &tstat)
})
if err != nil {
return err
}
t := time.Unix(int64(item.Version()), 0)
if stat, ok := stats[t]; !ok {
stats[t] = TotalTorrentStats{
DownloadedBytes: tstat.DownloadedBytes,
UploadedBytes: stat.DownloadedBytes,
}
} else {
stat.DownloadedBytes += tstat.DownloadedBytes
stat.UploadedBytes += tstat.UploadedBytes
stats[t] = stat
}
}
return nil
})
return maps.Values(stats), err
}
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
var stats map[time.Time]TorrentStats
err := r.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.SinceTs = uint64(since.Unix())
it := txn.NewKeyIterator(ih.Bytes(), opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var tstat TorrentStats
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &tstat)
})
if err != nil {
return err
}
t := time.Unix(int64(item.Version()), 0)
stats[t] = tstat
}
return nil
})
return maps.Values(stats), err
} }