diff --git a/src/host/torrent/stats.go b/src/host/torrent/stats.go index ffc2739..d03e961 100644 --- a/src/host/torrent/stats.go +++ b/src/host/torrent/stats.go @@ -44,7 +44,7 @@ func (a byName) Len() int { return len(a) } func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name } -type GlobalTorrentStats struct { +type TotalTorrentStats struct { DownloadedBytes int64 `json:"downloadedBytes"` UploadedBytes int64 `json:"uploadedBytes"` TimePassed float64 `json:"timePassed"` @@ -117,7 +117,7 @@ func (s *Stats) Stats(path string) (*TorrentStats, error) { return s.stats(now, t, true), nil } -func (s *Stats) GlobalStats() *GlobalTorrentStats { +func (s *Stats) GlobalStats() *TotalTorrentStats { s.mut.Lock() defer s.mut.Unlock() @@ -134,7 +134,7 @@ func (s *Stats) GlobalStats() *GlobalTorrentStats { timePassed := now.Sub(s.gTime) s.gTime = now - return &GlobalTorrentStats{ + return &TotalTorrentStats{ DownloadedBytes: totalDownload, UploadedBytes: totalUpload, TimePassed: timePassed.Seconds(), diff --git a/src/host/torrent/stats_store.go b/src/host/torrent/stats_store.go index 417df1a..eb3c029 100644 --- a/src/host/torrent/stats_store.go +++ b/src/host/torrent/stats_store.go @@ -8,21 +8,9 @@ import ( "github.com/anacrolix/torrent/types/infohash" "github.com/dgraph-io/badger/v4" - "github.com/dgraph-io/ristretto/z" + "golang.org/x/exp/maps" ) -type TorrentStat struct { - Name string `json:"name"` - Hash string `json:"hash"` - DownloadedBytes int64 `json:"downloadedBytes"` - UploadedBytes int64 `json:"uploadedBytes"` - Peers int `json:"peers"` - Seeders int `json:"seeders"` - PieceChunks []*PieceChunk `json:"pieceChunks"` - TotalPieces int `json:"totalPieces"` - PieceSize int64 `json:"pieceSize"` -} - func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) { db, err := badger.OpenManaged( badger. @@ -38,49 +26,100 @@ func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) db.SetDiscardTs(uint64(n.Add(-lifetime).Unix())) } }() - r := &statsStore{ + return &statsStore{ db: db, - } - - return r, nil + }, nil } type statsStore struct { db *badger.DB } -func (r *statsStore) AddStat(ih infohash.T, stat TorrentStat) error { +func (r *statsStore) AddStat(ih infohash.T, t time.Time, stat TorrentStats) error { data, err := json.Marshal(stat) if err != nil { return err } + ts := uint64(t.Unix()) - return r.db.Update(func(txn *badger.Txn) error { - return txn.Set(ih.Bytes(), data) - }) -} + txn := r.db.NewTransactionAt(ts, false) + defer txn.Discard() -func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) { - var stats GlobalTorrentStats - stream := r.db.NewStream() - stream.SinceTs = uint64(since.Unix()) - - var tstat TorrentStat - stream.Send = func(buf *z.Buffer) error { - err := json.Unmarshal(buf.Bytes(), &tstat) - if err != nil { - return err - } - - stats.DownloadedBytes += tstat.DownloadedBytes - stats.UploadedBytes += tstat.UploadedBytes - - return nil - } - - err := stream.Orchestrate(ctx) + err = txn.Set(ih.Bytes(), data) if err != nil { - return stats, err + return err } - return stats, nil + + return txn.CommitAt(ts, nil) +} + +func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TotalTorrentStats, error) { + stats := map[time.Time]TotalTorrentStats{} + + err := r.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.AllVersions = true + opts.SinceTs = uint64(since.Unix()) + + it := txn.NewIterator(opts) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + // k := item.Key() + var tstat TorrentStats + err := item.Value(func(v []byte) error { + return json.Unmarshal(v, &tstat) + }) + if err != nil { + return err + } + + t := time.Unix(int64(item.Version()), 0) + + if stat, ok := stats[t]; !ok { + stats[t] = TotalTorrentStats{ + DownloadedBytes: tstat.DownloadedBytes, + UploadedBytes: stat.DownloadedBytes, + } + } else { + stat.DownloadedBytes += tstat.DownloadedBytes + stat.UploadedBytes += tstat.UploadedBytes + stats[t] = stat + } + + } + return nil + }) + + return maps.Values(stats), err +} + +func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) { + var stats map[time.Time]TorrentStats + + err := r.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.AllVersions = true + opts.SinceTs = uint64(since.Unix()) + + it := txn.NewKeyIterator(ih.Bytes(), opts) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + var tstat TorrentStats + err := item.Value(func(v []byte) error { + return json.Unmarshal(v, &tstat) + }) + if err != nil { + return err + } + + t := time.Unix(int64(item.Version()), 0) + + stats[t] = tstat + } + return nil + }) + + return maps.Values(stats), err }