207 lines
4.4 KiB
Go
207 lines
4.4 KiB
Go
package atorrent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"path"
|
|
"slices"
|
|
"time"
|
|
|
|
"git.kmsign.ru/royalcat/tstor/src/logwrap"
|
|
"github.com/anacrolix/torrent/types/infohash"
|
|
"github.com/dgraph-io/badger/v4"
|
|
)
|
|
|
|
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
|
|
db, err := badger.OpenManaged(
|
|
badger.
|
|
DefaultOptions(path.Join(metaDir, "stats")).
|
|
WithNumVersionsToKeep(int(^uint(0) >> 1)).
|
|
WithLogger(logwrap.BadgerLogger("stats")), // Infinity
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go func() {
|
|
for n := range time.NewTimer(lifetime / 2).C {
|
|
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
|
|
}
|
|
}()
|
|
return &statsStore{
|
|
db: db,
|
|
}, nil
|
|
}
|
|
|
|
type statsStore struct {
|
|
db *badger.DB
|
|
}
|
|
|
|
type TorrentStats struct {
|
|
Timestamp time.Time
|
|
DownloadedBytes uint64
|
|
UploadedBytes uint64
|
|
TotalPeers uint16
|
|
ActivePeers uint16
|
|
ConnectedSeeders uint16
|
|
}
|
|
|
|
func (s TorrentStats) Same(o TorrentStats) bool {
|
|
return s.DownloadedBytes == o.DownloadedBytes &&
|
|
s.UploadedBytes == o.UploadedBytes &&
|
|
s.TotalPeers == o.TotalPeers &&
|
|
s.ActivePeers == o.ActivePeers &&
|
|
s.ConnectedSeeders == o.ConnectedSeeders
|
|
}
|
|
|
|
func (r *statsStore) addStats(key []byte, stat TorrentStats) error {
|
|
ts := uint64(stat.Timestamp.Unix())
|
|
|
|
txn := r.db.NewTransactionAt(ts, true)
|
|
defer txn.Discard()
|
|
|
|
item, err := txn.Get(key)
|
|
if err != nil && err != badger.ErrKeyNotFound {
|
|
return err
|
|
}
|
|
|
|
if err != badger.ErrKeyNotFound {
|
|
var prevStats TorrentStats
|
|
err = item.Value(func(val []byte) error {
|
|
return json.Unmarshal(val, &prevStats)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if prevStats.Same(stat) {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
data, err := json.Marshal(stat)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = txn.Set(key, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return txn.CommitAt(ts, nil)
|
|
}
|
|
|
|
func (r *statsStore) AddTorrentStats(ih infohash.T, stat TorrentStats) error {
|
|
return r.addStats(ih.Bytes(), stat)
|
|
}
|
|
|
|
const totalKey = "total"
|
|
|
|
func (r *statsStore) AddTotalStats(stat TorrentStats) error {
|
|
return r.addStats([]byte(totalKey), stat)
|
|
}
|
|
|
|
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
|
stats := []TorrentStats{}
|
|
|
|
err := r.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.AllVersions = true
|
|
opts.SinceTs = uint64(since.Unix())
|
|
|
|
it := txn.NewKeyIterator([]byte(totalKey), opts)
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
var stat TorrentStats
|
|
err := item.Value(func(v []byte) error {
|
|
return json.Unmarshal(v, &stat)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stats = append(stats, stat)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
|
return a.Timestamp.Compare(b.Timestamp)
|
|
})
|
|
stats = slices.Compact(stats)
|
|
return stats, nil
|
|
}
|
|
|
|
func (r *statsStore) ReadTorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
|
stats := []TorrentStats{}
|
|
|
|
err := r.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.AllVersions = true
|
|
opts.SinceTs = uint64(since.Unix())
|
|
|
|
it := txn.NewKeyIterator(ih.Bytes(), opts)
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
var stat TorrentStats
|
|
err := item.Value(func(v []byte) error {
|
|
return json.Unmarshal(v, &stat)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stats = append(stats, stat)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
|
return a.Timestamp.Compare(b.Timestamp)
|
|
})
|
|
stats = slices.Compact(stats)
|
|
return stats, nil
|
|
}
|
|
|
|
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
|
stats := []TorrentStats{}
|
|
|
|
err := r.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.AllVersions = true
|
|
opts.SinceTs = uint64(since.Unix())
|
|
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
var stat TorrentStats
|
|
err := item.Value(func(v []byte) error {
|
|
return json.Unmarshal(v, &stat)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stats = append(stats, stat)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
|
return a.Timestamp.Compare(b.Timestamp)
|
|
})
|
|
stats = slices.Compact(stats)
|
|
return stats, nil
|
|
}
|