Compare commits
2 commits
d5aa78cb39
...
633a1d6e25
Author | SHA1 | Date | |
---|---|---|---|
633a1d6e25 | |||
f9311284fc |
18 changed files with 1543 additions and 989 deletions
|
@ -21,6 +21,9 @@ models:
|
||||||
model: github.com/99designs/gqlgen/graphql.Time
|
model: github.com/99designs/gqlgen/graphql.Time
|
||||||
Int:
|
Int:
|
||||||
model: github.com/99designs/gqlgen/graphql.Int64
|
model: github.com/99designs/gqlgen/graphql.Int64
|
||||||
|
UInt:
|
||||||
|
model:
|
||||||
|
- github.com/99designs/gqlgen/graphql.Uint
|
||||||
Torrent:
|
Torrent:
|
||||||
extraFields:
|
extraFields:
|
||||||
T:
|
T:
|
||||||
|
|
|
@ -5,6 +5,7 @@ directive @stream on FIELD_DEFINITION
|
||||||
|
|
||||||
scalar DateTime
|
scalar DateTime
|
||||||
scalar Upload
|
scalar Upload
|
||||||
|
scalar UInt
|
||||||
|
|
||||||
type Schema {
|
type Schema {
|
||||||
query: Query
|
query: Query
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
type TorrentDaemonQuery {
|
type TorrentDaemonQuery {
|
||||||
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
||||||
stats: TorrentStats! @resolver
|
clientStats: TorrentClientStats! @resolver
|
||||||
|
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
|
||||||
}
|
}
|
||||||
|
|
||||||
input TorrentsFilter {
|
input TorrentsFilter {
|
||||||
|
|
|
@ -33,7 +33,7 @@ enum TorrentPriority {
|
||||||
NOW
|
NOW
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentStats {
|
type TorrentClientStats {
|
||||||
bytesWritten: Int!
|
bytesWritten: Int!
|
||||||
bytesWrittenData: Int!
|
bytesWrittenData: Int!
|
||||||
bytesRead: Int!
|
bytesRead: Int!
|
||||||
|
@ -51,3 +51,12 @@ type TorrentStats {
|
||||||
piecesDirtiedGood: Int!
|
piecesDirtiedGood: Int!
|
||||||
piecesDirtiedBad: Int!
|
piecesDirtiedBad: Int!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TorrentStats {
|
||||||
|
timestamp: DateTime!
|
||||||
|
downloadedBytes: UInt!
|
||||||
|
uploadedBytes: UInt!
|
||||||
|
totalPeers: UInt!
|
||||||
|
activePeers: UInt!
|
||||||
|
connectedSeeders: UInt!
|
||||||
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ type File interface {
|
||||||
// Name returns the name of the file as presented to Open.
|
// Name returns the name of the file as presented to Open.
|
||||||
Name() string
|
Name() string
|
||||||
ctxio.Writer
|
ctxio.Writer
|
||||||
ctxio.Reader
|
// ctxio.Reader
|
||||||
ctxio.ReaderAt
|
ctxio.ReaderAt
|
||||||
io.Seeker
|
io.Seeker
|
||||||
ctxio.Closer
|
ctxio.Closer
|
||||||
|
|
|
@ -89,7 +89,7 @@ func TestNFS(t *testing.T) {
|
||||||
}
|
}
|
||||||
mf, _ := mem.OpenFile(ctx, "/helloworld.txt", os.O_RDONLY, 0)
|
mf, _ := mem.OpenFile(ctx, "/helloworld.txt", os.O_RDONLY, 0)
|
||||||
buf := make([]byte, len(b))
|
buf := make([]byte, len(b))
|
||||||
if _, err = mf.Read(ctx, buf[:]); err != nil {
|
if _, err = mf.ReadAt(ctx, buf[:], 0); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(buf, b) {
|
if !bytes.Equal(buf, b) {
|
||||||
|
|
|
@ -1,126 +0,0 @@
|
||||||
package delivery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
|
||||||
"github.com/anacrolix/missinggo/v2/filecache"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerFunc {
|
|
||||||
return func(ctx *gin.Context) {
|
|
||||||
stat := gin.H{
|
|
||||||
"torrentStats": ss.GlobalStats(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if fc != nil {
|
|
||||||
stat["cacheItems"] = fc.Info().NumItems
|
|
||||||
stat["cacheFilled"] = fc.Info().Filled / 1024 / 1024
|
|
||||||
stat["cacheCapacity"] = fc.Info().Capacity / 1024 / 1024
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO move to a struct
|
|
||||||
ctx.JSON(http.StatusOK, stat)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// var apiServersHandler = func(ss []*service.Server) gin.HandlerFunc {
|
|
||||||
// return func(ctx *gin.Context) {
|
|
||||||
// var infos []*torrent.ServerInfo
|
|
||||||
// for _, s := range ss {
|
|
||||||
// infos = append(infos, s.Info())
|
|
||||||
// }
|
|
||||||
// ctx.JSON(http.StatusOK, infos)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// var apiRoutesHandler = func(ss *service.Stats) gin.HandlerFunc {
|
|
||||||
// return func(ctx *gin.Context) {
|
|
||||||
// s := ss.RoutesStats()
|
|
||||||
// sort.Sort(torrent.ByName(s))
|
|
||||||
// ctx.JSON(http.StatusOK, s)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// var apiAddTorrentHandler = func(s *service.Service) gin.HandlerFunc {
|
|
||||||
// return func(ctx *gin.Context) {
|
|
||||||
// route := ctx.Param("route")
|
|
||||||
|
|
||||||
// var json RouteAdd
|
|
||||||
// if err := ctx.ShouldBindJSON(&json); err != nil {
|
|
||||||
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if err := s.AddMagnet(route, json.Magnet); err != nil {
|
|
||||||
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// ctx.JSON(http.StatusOK, nil)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// var apiDelTorrentHandler = func(s *service.Service) gin.HandlerFunc {
|
|
||||||
// return func(ctx *gin.Context) {
|
|
||||||
// route := ctx.Param("route")
|
|
||||||
// hash := ctx.Param("torrent_hash")
|
|
||||||
|
|
||||||
// if err := s.RemoveFromHash(route, hash); err != nil {
|
|
||||||
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// ctx.JSON(http.StatusOK, nil)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
var apiLogHandler = func(path string) gin.HandlerFunc {
|
|
||||||
return func(ctx *gin.Context) {
|
|
||||||
f, err := os.Open(path)
|
|
||||||
if err != nil {
|
|
||||||
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fi, err := f.Stat()
|
|
||||||
if err != nil {
|
|
||||||
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
max := math.Max(float64(-fi.Size()), -1024*8*8)
|
|
||||||
_, err = f.Seek(int64(max), io.SeekEnd)
|
|
||||||
if err != nil {
|
|
||||||
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var b bytes.Buffer
|
|
||||||
ctx.Stream(func(w io.Writer) bool {
|
|
||||||
_, err := b.ReadFrom(f)
|
|
||||||
if err != nil {
|
|
||||||
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = b.WriteTo(w)
|
|
||||||
if err != nil {
|
|
||||||
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := f.Close(); err != nil {
|
|
||||||
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load diff
|
@ -7,6 +7,14 @@ import (
|
||||||
atorrent "github.com/anacrolix/torrent"
|
atorrent "github.com/anacrolix/torrent"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func Apply[I any, O any](in []I, f func(I) O) []O {
|
||||||
|
out := make([]O, len(in))
|
||||||
|
for i, v := range in {
|
||||||
|
out[i] = f(v)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func MapPeerSource(source atorrent.PeerSource) string {
|
func MapPeerSource(source atorrent.PeerSource) string {
|
||||||
switch source {
|
switch source {
|
||||||
case atorrent.PeerSourceDirect:
|
case atorrent.PeerSourceDirect:
|
||||||
|
@ -43,3 +51,14 @@ func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) {
|
||||||
T: t,
|
T: t,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func MapTorrentStats(s torrent.TorrentStats) *TorrentStats {
|
||||||
|
return &TorrentStats{
|
||||||
|
Timestamp: s.Timestamp,
|
||||||
|
DownloadedBytes: uint(s.DownloadedBytes),
|
||||||
|
UploadedBytes: uint(s.UploadedBytes),
|
||||||
|
TotalPeers: uint(s.TotalPeers),
|
||||||
|
ActivePeers: uint(s.ActivePeers),
|
||||||
|
ConnectedSeeders: uint(s.ConnectedSeeders),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -184,6 +184,22 @@ type Torrent struct {
|
||||||
T *torrent.Controller `json:"-"`
|
T *torrent.Controller `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TorrentClientStats struct {
|
||||||
|
BytesWritten int64 `json:"bytesWritten"`
|
||||||
|
BytesWrittenData int64 `json:"bytesWrittenData"`
|
||||||
|
BytesRead int64 `json:"bytesRead"`
|
||||||
|
BytesReadData int64 `json:"bytesReadData"`
|
||||||
|
BytesReadUsefulData int64 `json:"bytesReadUsefulData"`
|
||||||
|
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"`
|
||||||
|
ChunksWritten int64 `json:"chunksWritten"`
|
||||||
|
ChunksRead int64 `json:"chunksRead"`
|
||||||
|
ChunksReadUseful int64 `json:"chunksReadUseful"`
|
||||||
|
ChunksReadWasted int64 `json:"chunksReadWasted"`
|
||||||
|
MetadataChunksRead int64 `json:"metadataChunksRead"`
|
||||||
|
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
|
||||||
|
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
|
||||||
|
}
|
||||||
|
|
||||||
type TorrentDaemonMutation struct {
|
type TorrentDaemonMutation struct {
|
||||||
ValidateTorrent bool `json:"validateTorrent"`
|
ValidateTorrent bool `json:"validateTorrent"`
|
||||||
SetTorrentPriority bool `json:"setTorrentPriority"`
|
SetTorrentPriority bool `json:"setTorrentPriority"`
|
||||||
|
@ -192,7 +208,8 @@ type TorrentDaemonMutation struct {
|
||||||
|
|
||||||
type TorrentDaemonQuery struct {
|
type TorrentDaemonQuery struct {
|
||||||
Torrents []*Torrent `json:"torrents"`
|
Torrents []*Torrent `json:"torrents"`
|
||||||
Stats *TorrentStats `json:"stats"`
|
ClientStats *TorrentClientStats `json:"clientStats"`
|
||||||
|
StatsHistory []*TorrentStats `json:"statsHistory"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentFs struct {
|
type TorrentFs struct {
|
||||||
|
@ -271,19 +288,12 @@ func (this TorrentProgress) GetCurrent() int64 { return this.Current }
|
||||||
func (this TorrentProgress) GetTotal() int64 { return this.Total }
|
func (this TorrentProgress) GetTotal() int64 { return this.Total }
|
||||||
|
|
||||||
type TorrentStats struct {
|
type TorrentStats struct {
|
||||||
BytesWritten int64 `json:"bytesWritten"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
BytesWrittenData int64 `json:"bytesWrittenData"`
|
DownloadedBytes uint `json:"downloadedBytes"`
|
||||||
BytesRead int64 `json:"bytesRead"`
|
UploadedBytes uint `json:"uploadedBytes"`
|
||||||
BytesReadData int64 `json:"bytesReadData"`
|
TotalPeers uint `json:"totalPeers"`
|
||||||
BytesReadUsefulData int64 `json:"bytesReadUsefulData"`
|
ActivePeers uint `json:"activePeers"`
|
||||||
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"`
|
ConnectedSeeders uint `json:"connectedSeeders"`
|
||||||
ChunksWritten int64 `json:"chunksWritten"`
|
|
||||||
ChunksRead int64 `json:"chunksRead"`
|
|
||||||
ChunksReadUseful int64 `json:"chunksReadUseful"`
|
|
||||||
ChunksReadWasted int64 `json:"chunksReadWasted"`
|
|
||||||
MetadataChunksRead int64 `json:"metadataChunksRead"`
|
|
||||||
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
|
|
||||||
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentsFilter struct {
|
type TorrentsFilter struct {
|
||||||
|
|
|
@ -8,10 +8,13 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||||
|
|
||||||
|
tinfohash "github.com/anacrolix/torrent/types/infohash"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Torrents is the resolver for the torrents field.
|
// Torrents is the resolver for the torrents field.
|
||||||
|
@ -86,10 +89,10 @@ func (r *torrentDaemonQueryResolver) Torrents(ctx context.Context, obj *model.To
|
||||||
return tr, nil
|
return tr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stats is the resolver for the stats field.
|
// ClientStats is the resolver for the clientStats field.
|
||||||
func (r *torrentDaemonQueryResolver) Stats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentStats, error) {
|
func (r *torrentDaemonQueryResolver) ClientStats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentClientStats, error) {
|
||||||
stats := r.Service.Stats()
|
stats := r.Service.Stats()
|
||||||
return &model.TorrentStats{
|
return &model.TorrentClientStats{
|
||||||
BytesWritten: stats.BytesWritten.Int64(),
|
BytesWritten: stats.BytesWritten.Int64(),
|
||||||
BytesRead: stats.BytesRead.Int64(),
|
BytesRead: stats.BytesRead.Int64(),
|
||||||
BytesWrittenData: stats.BytesWrittenData.Int64(),
|
BytesWrittenData: stats.BytesWrittenData.Int64(),
|
||||||
|
@ -106,6 +109,33 @@ func (r *torrentDaemonQueryResolver) Stats(ctx context.Context, obj *model.Torre
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatsHistory is the resolver for the statsHistory field.
|
||||||
|
func (r *torrentDaemonQueryResolver) StatsHistory(ctx context.Context, obj *model.TorrentDaemonQuery, since time.Time, infohash *string) ([]*model.TorrentStats, error) {
|
||||||
|
var stats []torrent.TorrentStats
|
||||||
|
if infohash == nil {
|
||||||
|
stats, err := r.Service.StatsHistory(ctx, since)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return model.Apply(stats, model.MapTorrentStats), nil
|
||||||
|
} else if *infohash == "total" {
|
||||||
|
var err error
|
||||||
|
stats, err = r.Service.TotalStatsHistory(ctx, since)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ih := tinfohash.FromHexString(*infohash)
|
||||||
|
var err error
|
||||||
|
stats, err = r.Service.TorrentStatsHistory(ctx, since, ih)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return model.Apply(stats, model.MapTorrentStats), nil
|
||||||
|
}
|
||||||
|
|
||||||
// TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation.
|
// TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation.
|
||||||
func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver {
|
func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver {
|
||||||
return &torrentDaemonQueryResolver{r}
|
return &torrentDaemonQueryResolver{r}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package torrent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/anacrolix/dht/v2"
|
"github.com/anacrolix/dht/v2"
|
||||||
"github.com/anacrolix/dht/v2/bep44"
|
"github.com/anacrolix/dht/v2/bep44"
|
||||||
|
@ -59,9 +58,14 @@ func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
|
||||||
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
|
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
|
||||||
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
|
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
|
||||||
cfg.Store = fis
|
cfg.Store = fis
|
||||||
cfg.Exp = 2 * time.Hour
|
cfg.Exp = dhtTTL
|
||||||
cfg.NoSecurity = false
|
cfg.NoSecurity = false
|
||||||
}
|
}
|
||||||
|
|
||||||
return torrent.NewClient(torrentCfg)
|
t, err := torrent.NewClient(torrentCfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ type Daemon struct {
|
||||||
fis *dhtFileItemStore
|
fis *dhtFileItemStore
|
||||||
dirsAquire kv.Store[string, DirAquire]
|
dirsAquire kv.Store[string, DirAquire]
|
||||||
fileProperties kv.Store[string, FileProperties]
|
fileProperties kv.Store[string, FileProperties]
|
||||||
|
statsStore *statsStore
|
||||||
|
|
||||||
loadMutex sync.Mutex
|
loadMutex sync.Mutex
|
||||||
torrentLoaded chan struct{}
|
torrentLoaded chan struct{}
|
||||||
|
@ -57,6 +58,8 @@ type Daemon struct {
|
||||||
log *rlog.Logger
|
log *rlog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const dhtTTL = 24 * time.Hour
|
||||||
|
|
||||||
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
|
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
|
||||||
s := &Daemon{
|
s := &Daemon{
|
||||||
log: rlog.Component("torrent-service"),
|
log: rlog.Component("torrent-service"),
|
||||||
|
@ -70,7 +73,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
return nil, fmt.Errorf("error creating metadata folder: %w", err)
|
return nil, fmt.Errorf("error creating metadata folder: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), 3*time.Hour)
|
s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), dhtTTL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error starting item store: %w", err)
|
return nil, fmt.Errorf("error starting item store: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -86,7 +89,6 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
|
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -96,6 +98,11 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
return nil, fmt.Errorf("error creating node ID: %w", err)
|
return nil, fmt.Errorf("error creating node ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.statsStore, err = newStatsStore(conf.MetadataFolder, time.Hour*24*30)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
s.client, err = newClient(s.Storage, s.fis, &conf, id)
|
s.client, err = newClient(s.Storage, s.fis, &conf, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error starting torrent client: %w", err)
|
return nil, fmt.Errorf("error starting torrent client: %w", err)
|
||||||
|
@ -116,9 +123,77 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
close(s.torrentLoaded)
|
close(s.torrentLoaded)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
const period = time.Second * 10
|
||||||
|
|
||||||
|
<-s.torrentLoaded
|
||||||
|
|
||||||
|
timer := time.NewTicker(period)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.client.Closed():
|
||||||
|
return
|
||||||
|
case <-timer.C:
|
||||||
|
s.updateStats(context.Background())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Daemon) updateStats(ctx context.Context) {
|
||||||
|
log := s.log
|
||||||
|
|
||||||
|
totalPeers := 0
|
||||||
|
activePeers := 0
|
||||||
|
connectedSeeders := 0
|
||||||
|
|
||||||
|
for _, v := range s.client.Torrents() {
|
||||||
|
stats := v.Stats()
|
||||||
|
err := s.statsStore.AddTorrentStats(v.InfoHash(), TorrentStats{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
DownloadedBytes: uint64(stats.BytesRead.Int64()),
|
||||||
|
UploadedBytes: uint64(stats.BytesWritten.Int64()),
|
||||||
|
TotalPeers: uint16(stats.TotalPeers),
|
||||||
|
ActivePeers: uint16(stats.ActivePeers),
|
||||||
|
ConnectedSeeders: uint16(stats.ConnectedSeeders),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
totalPeers += stats.TotalPeers
|
||||||
|
activePeers += stats.ActivePeers
|
||||||
|
connectedSeeders += stats.ConnectedSeeders
|
||||||
|
}
|
||||||
|
|
||||||
|
totalStats := s.client.Stats()
|
||||||
|
err := s.statsStore.AddTotalStats(TorrentStats{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
DownloadedBytes: uint64(totalStats.BytesRead.Int64()),
|
||||||
|
UploadedBytes: uint64(totalStats.BytesWritten.Int64()),
|
||||||
|
TotalPeers: uint16(totalPeers),
|
||||||
|
ActivePeers: uint16(activePeers),
|
||||||
|
ConnectedSeeders: uint16(connectedSeeders),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error(ctx, "error saving total stats", rlog.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
||||||
|
return s.statsStore.ReadTotalStatsHistory(ctx, since)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
||||||
|
return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
||||||
|
return s.statsStore.ReadStatsHistory(ctx, since)
|
||||||
|
}
|
||||||
|
|
||||||
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
|
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
|
||||||
|
|
||||||
func (s *Daemon) Close(ctx context.Context) error {
|
func (s *Daemon) Close(ctx context.Context) error {
|
||||||
|
@ -237,7 +312,7 @@ func isValidInfoHashBytes(d []byte) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Daemon) Stats() torrent.ConnStats {
|
func (s *Daemon) Stats() torrent.ConnStats {
|
||||||
return s.client.ConnStats()
|
return s.client.Stats().ConnStats
|
||||||
}
|
}
|
||||||
|
|
||||||
const loadWorkers = 5
|
const loadWorkers = 5
|
||||||
|
|
|
@ -85,7 +85,25 @@ func (fis *dhtFileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fis *dhtFileItemStore) Del(t bep44.Target) error {
|
func (fis *dhtFileItemStore) Del(t bep44.Target) error {
|
||||||
// ignore this
|
tx := fis.db.NewTransaction(true)
|
||||||
|
defer tx.Discard()
|
||||||
|
|
||||||
|
err := tx.Delete(t[:])
|
||||||
|
if err == badger.ErrKeyNotFound {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err == badger.ErrKeyNotFound {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -155,7 +155,6 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
|
||||||
|
|
||||||
return fs.filesCache, nil
|
return fs.filesCache, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DEFAULT_DIR:
|
DEFAULT_DIR:
|
||||||
|
|
|
@ -1,218 +1,207 @@
|
||||||
package torrent
|
package torrent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"context"
|
||||||
"sync"
|
"encoding/json"
|
||||||
|
"path"
|
||||||
|
"slices"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anacrolix/torrent"
|
"git.kmsign.ru/royalcat/tstor/src/log"
|
||||||
|
"github.com/anacrolix/torrent/types/infohash"
|
||||||
|
"github.com/dgraph-io/badger/v4"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrTorrentNotFound = errors.New("torrent not found")
|
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
|
||||||
|
db, err := badger.OpenManaged(
|
||||||
type PieceStatus string
|
badger.
|
||||||
|
DefaultOptions(path.Join(metaDir, "stats")).
|
||||||
const (
|
WithNumVersionsToKeep(int(^uint(0) >> 1)).
|
||||||
Checking PieceStatus = "H"
|
WithLogger(log.BadgerLogger("stats")), // Infinity
|
||||||
Partial PieceStatus = "P"
|
|
||||||
Complete PieceStatus = "C"
|
|
||||||
Waiting PieceStatus = "W"
|
|
||||||
Error PieceStatus = "?"
|
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
type PieceChunk struct {
|
go func() {
|
||||||
Status PieceStatus `json:"status"`
|
for n := range time.NewTimer(lifetime / 2).C {
|
||||||
NumPieces int `json:"numPieces"`
|
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return &statsStore{
|
||||||
|
db: db,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsStore struct {
|
||||||
|
db *badger.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentStats struct {
|
type TorrentStats struct {
|
||||||
Name string `json:"name"`
|
Timestamp time.Time
|
||||||
Hash string `json:"hash"`
|
DownloadedBytes uint64
|
||||||
DownloadedBytes int64 `json:"downloadedBytes"`
|
UploadedBytes uint64
|
||||||
UploadedBytes int64 `json:"uploadedBytes"`
|
TotalPeers uint16
|
||||||
Peers int `json:"peers"`
|
ActivePeers uint16
|
||||||
Seeders int `json:"seeders"`
|
ConnectedSeeders uint16
|
||||||
TimePassed float64 `json:"timePassed"`
|
|
||||||
PieceChunks []*PieceChunk `json:"pieceChunks"`
|
|
||||||
TotalPieces int `json:"totalPieces"`
|
|
||||||
PieceSize int64 `json:"pieceSize"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type byName []*TorrentStats
|
func (s TorrentStats) Same(o TorrentStats) bool {
|
||||||
|
return s.DownloadedBytes == o.DownloadedBytes &&
|
||||||
func (a byName) Len() int { return len(a) }
|
s.UploadedBytes == o.UploadedBytes &&
|
||||||
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
s.TotalPeers == o.TotalPeers &&
|
||||||
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
s.ActivePeers == o.ActivePeers &&
|
||||||
|
s.ConnectedSeeders == o.ConnectedSeeders
|
||||||
type TotalTorrentStats struct {
|
|
||||||
DownloadedBytes int64 `json:"downloadedBytes"`
|
|
||||||
UploadedBytes int64 `json:"uploadedBytes"`
|
|
||||||
TimePassed float64 `json:"timePassed"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type RouteStats struct {
|
func (r *statsStore) addStats(key []byte, stat TorrentStats) error {
|
||||||
Name string `json:"name"`
|
ts := uint64(stat.Timestamp.Unix())
|
||||||
TorrentStats []*TorrentStats `json:"torrentStats"`
|
|
||||||
|
txn := r.db.NewTransactionAt(ts, true)
|
||||||
|
defer txn.Discard()
|
||||||
|
|
||||||
|
item, err := txn.Get(key)
|
||||||
|
if err != nil && err != badger.ErrKeyNotFound {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type ByName []*RouteStats
|
if err != badger.ErrKeyNotFound {
|
||||||
|
var prevStats TorrentStats
|
||||||
func (a ByName) Len() int { return len(a) }
|
err = item.Value(func(val []byte) error {
|
||||||
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
return json.Unmarshal(val, &prevStats)
|
||||||
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
|
||||||
|
|
||||||
type stat struct {
|
|
||||||
totalDownloadBytes int64
|
|
||||||
downloadBytes int64
|
|
||||||
totalUploadBytes int64
|
|
||||||
uploadBytes int64
|
|
||||||
peers int
|
|
||||||
seeders int
|
|
||||||
time time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type Stats struct {
|
|
||||||
mut sync.Mutex
|
|
||||||
torrents map[string]*torrent.Torrent
|
|
||||||
previousStats map[string]*stat
|
|
||||||
|
|
||||||
gTime time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewStats() *Stats {
|
|
||||||
return &Stats{
|
|
||||||
gTime: time.Now(),
|
|
||||||
torrents: make(map[string]*torrent.Torrent),
|
|
||||||
// torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
|
|
||||||
// previousStats: make(map[string]*stat),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stats) Add(path string, t *torrent.Torrent) {
|
|
||||||
s.mut.Lock()
|
|
||||||
defer s.mut.Unlock()
|
|
||||||
|
|
||||||
s.torrents[path] = t
|
|
||||||
s.previousStats[path] = &stat{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stats) Del(path string) {
|
|
||||||
s.mut.Lock()
|
|
||||||
defer s.mut.Unlock()
|
|
||||||
delete(s.torrents, path)
|
|
||||||
delete(s.previousStats, path)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stats) Stats(path string) (*TorrentStats, error) {
|
|
||||||
s.mut.Lock()
|
|
||||||
defer s.mut.Unlock()
|
|
||||||
|
|
||||||
t, ok := s.torrents[path]
|
|
||||||
if !(ok) {
|
|
||||||
return nil, ErrTorrentNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
return s.stats(now, t, true), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stats) GlobalStats() *TotalTorrentStats {
|
|
||||||
s.mut.Lock()
|
|
||||||
defer s.mut.Unlock()
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
var totalDownload int64
|
|
||||||
var totalUpload int64
|
|
||||||
for _, torrent := range s.torrents {
|
|
||||||
tStats := s.stats(now, torrent, false)
|
|
||||||
totalDownload += tStats.DownloadedBytes
|
|
||||||
totalUpload += tStats.UploadedBytes
|
|
||||||
}
|
|
||||||
|
|
||||||
timePassed := now.Sub(s.gTime)
|
|
||||||
s.gTime = now
|
|
||||||
|
|
||||||
return &TotalTorrentStats{
|
|
||||||
DownloadedBytes: totalDownload,
|
|
||||||
UploadedBytes: totalUpload,
|
|
||||||
TimePassed: timePassed.Seconds(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stats) stats(now time.Time, t *torrent.Torrent, chunks bool) *TorrentStats {
|
|
||||||
ts := &TorrentStats{}
|
|
||||||
prev, ok := s.previousStats[t.InfoHash().String()]
|
|
||||||
if !ok {
|
|
||||||
return &TorrentStats{}
|
|
||||||
}
|
|
||||||
if s.returnPreviousMeasurements(now) {
|
|
||||||
ts.DownloadedBytes = prev.downloadBytes
|
|
||||||
ts.UploadedBytes = prev.uploadBytes
|
|
||||||
} else {
|
|
||||||
st := t.Stats()
|
|
||||||
rd := st.BytesReadData.Int64()
|
|
||||||
wd := st.BytesWrittenData.Int64()
|
|
||||||
ist := &stat{
|
|
||||||
downloadBytes: rd - prev.totalDownloadBytes,
|
|
||||||
uploadBytes: wd - prev.totalUploadBytes,
|
|
||||||
totalDownloadBytes: rd,
|
|
||||||
totalUploadBytes: wd,
|
|
||||||
time: now,
|
|
||||||
peers: st.TotalPeers,
|
|
||||||
seeders: st.ConnectedSeeders,
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.DownloadedBytes = ist.downloadBytes
|
|
||||||
ts.UploadedBytes = ist.uploadBytes
|
|
||||||
ts.Peers = ist.peers
|
|
||||||
ts.Seeders = ist.seeders
|
|
||||||
|
|
||||||
s.previousStats[t.InfoHash().String()] = ist
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.TimePassed = now.Sub(prev.time).Seconds()
|
|
||||||
var totalPieces int
|
|
||||||
if chunks {
|
|
||||||
var pch []*PieceChunk
|
|
||||||
for _, psr := range t.PieceStateRuns() {
|
|
||||||
var s PieceStatus
|
|
||||||
switch {
|
|
||||||
case psr.Checking:
|
|
||||||
s = Checking
|
|
||||||
case psr.Partial:
|
|
||||||
s = Partial
|
|
||||||
case psr.Complete:
|
|
||||||
s = Complete
|
|
||||||
case !psr.Ok:
|
|
||||||
s = Error
|
|
||||||
default:
|
|
||||||
s = Waiting
|
|
||||||
}
|
|
||||||
|
|
||||||
pch = append(pch, &PieceChunk{
|
|
||||||
Status: s,
|
|
||||||
NumPieces: psr.Length,
|
|
||||||
})
|
})
|
||||||
totalPieces += psr.Length
|
if err != nil {
|
||||||
}
|
return err
|
||||||
ts.PieceChunks = pch
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ts.Hash = t.InfoHash().String()
|
if prevStats.Same(stat) {
|
||||||
ts.Name = t.Name()
|
return nil
|
||||||
ts.TotalPieces = totalPieces
|
}
|
||||||
|
|
||||||
if t.Info() != nil {
|
|
||||||
ts.PieceSize = t.Info().PieceLength
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return ts
|
data, err := json.Marshal(stat)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = txn.Set(key, data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
const gap time.Duration = 2 * time.Second
|
return txn.CommitAt(ts, nil)
|
||||||
|
}
|
||||||
func (s *Stats) returnPreviousMeasurements(now time.Time) bool {
|
|
||||||
return now.Sub(s.gTime) < gap
|
func (r *statsStore) AddTorrentStats(ih infohash.T, stat TorrentStats) error {
|
||||||
|
return r.addStats(ih.Bytes(), stat)
|
||||||
|
}
|
||||||
|
|
||||||
|
const totalKey = "total"
|
||||||
|
|
||||||
|
func (r *statsStore) AddTotalStats(stat TorrentStats) error {
|
||||||
|
return r.addStats([]byte(totalKey), stat)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
||||||
|
stats := []TorrentStats{}
|
||||||
|
|
||||||
|
err := r.db.View(func(txn *badger.Txn) error {
|
||||||
|
opts := badger.DefaultIteratorOptions
|
||||||
|
opts.AllVersions = true
|
||||||
|
opts.SinceTs = uint64(since.Unix())
|
||||||
|
|
||||||
|
it := txn.NewKeyIterator([]byte(totalKey), opts)
|
||||||
|
defer it.Close()
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
var stat TorrentStats
|
||||||
|
err := item.Value(func(v []byte) error {
|
||||||
|
return json.Unmarshal(v, &stat)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stats = append(stats, stat)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
||||||
|
return a.Timestamp.Compare(b.Timestamp)
|
||||||
|
})
|
||||||
|
stats = slices.Compact(stats)
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *statsStore) ReadTorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
||||||
|
stats := []TorrentStats{}
|
||||||
|
|
||||||
|
err := r.db.View(func(txn *badger.Txn) error {
|
||||||
|
opts := badger.DefaultIteratorOptions
|
||||||
|
opts.AllVersions = true
|
||||||
|
opts.SinceTs = uint64(since.Unix())
|
||||||
|
|
||||||
|
it := txn.NewKeyIterator(ih.Bytes(), opts)
|
||||||
|
defer it.Close()
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
var stat TorrentStats
|
||||||
|
err := item.Value(func(v []byte) error {
|
||||||
|
return json.Unmarshal(v, &stat)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stats = append(stats, stat)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
||||||
|
return a.Timestamp.Compare(b.Timestamp)
|
||||||
|
})
|
||||||
|
stats = slices.Compact(stats)
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
||||||
|
stats := []TorrentStats{}
|
||||||
|
|
||||||
|
err := r.db.View(func(txn *badger.Txn) error {
|
||||||
|
opts := badger.DefaultIteratorOptions
|
||||||
|
opts.AllVersions = true
|
||||||
|
opts.SinceTs = uint64(since.Unix())
|
||||||
|
|
||||||
|
it := txn.NewIterator(opts)
|
||||||
|
defer it.Close()
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
var stat TorrentStats
|
||||||
|
err := item.Value(func(v []byte) error {
|
||||||
|
return json.Unmarshal(v, &stat)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stats = append(stats, stat)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
||||||
|
return a.Timestamp.Compare(b.Timestamp)
|
||||||
|
})
|
||||||
|
stats = slices.Compact(stats)
|
||||||
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,127 +0,0 @@
|
||||||
package torrent
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"path"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/src/log"
|
|
||||||
"github.com/anacrolix/torrent/types/infohash"
|
|
||||||
"github.com/dgraph-io/badger/v4"
|
|
||||||
"golang.org/x/exp/maps"
|
|
||||||
)
|
|
||||||
|
|
||||||
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
|
|
||||||
db, err := badger.OpenManaged(
|
|
||||||
badger.
|
|
||||||
DefaultOptions(path.Join(metaDir, "stats-history")).
|
|
||||||
WithNumVersionsToKeep(int(^uint(0) >> 1)).
|
|
||||||
WithLogger(log.BadgerLogger("stats")), // Infinity
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for n := range time.NewTimer(lifetime / 2).C {
|
|
||||||
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return &statsStore{
|
|
||||||
db: db,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type statsStore struct {
|
|
||||||
db *badger.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *statsStore) AddStat(ih infohash.T, t time.Time, stat TorrentStats) error {
|
|
||||||
data, err := json.Marshal(stat)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ts := uint64(t.Unix())
|
|
||||||
|
|
||||||
txn := r.db.NewTransactionAt(ts, false)
|
|
||||||
defer txn.Discard()
|
|
||||||
|
|
||||||
err = txn.Set(ih.Bytes(), data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return txn.CommitAt(ts, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TotalTorrentStats, error) {
|
|
||||||
stats := map[time.Time]TotalTorrentStats{}
|
|
||||||
|
|
||||||
err := r.db.View(func(txn *badger.Txn) error {
|
|
||||||
opts := badger.DefaultIteratorOptions
|
|
||||||
opts.AllVersions = true
|
|
||||||
opts.SinceTs = uint64(since.Unix())
|
|
||||||
|
|
||||||
it := txn.NewIterator(opts)
|
|
||||||
defer it.Close()
|
|
||||||
for it.Rewind(); it.Valid(); it.Next() {
|
|
||||||
item := it.Item()
|
|
||||||
// k := item.Key()
|
|
||||||
var tstat TorrentStats
|
|
||||||
err := item.Value(func(v []byte) error {
|
|
||||||
return json.Unmarshal(v, &tstat)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
t := time.Unix(int64(item.Version()), 0)
|
|
||||||
|
|
||||||
if stat, ok := stats[t]; !ok {
|
|
||||||
stats[t] = TotalTorrentStats{
|
|
||||||
DownloadedBytes: tstat.DownloadedBytes,
|
|
||||||
UploadedBytes: stat.DownloadedBytes,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
stat.DownloadedBytes += tstat.DownloadedBytes
|
|
||||||
stat.UploadedBytes += tstat.UploadedBytes
|
|
||||||
stats[t] = stat
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return maps.Values(stats), err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
|
||||||
var stats map[time.Time]TorrentStats
|
|
||||||
|
|
||||||
err := r.db.View(func(txn *badger.Txn) error {
|
|
||||||
opts := badger.DefaultIteratorOptions
|
|
||||||
opts.AllVersions = true
|
|
||||||
opts.SinceTs = uint64(since.Unix())
|
|
||||||
|
|
||||||
it := txn.NewKeyIterator(ih.Bytes(), opts)
|
|
||||||
defer it.Close()
|
|
||||||
for it.Rewind(); it.Valid(); it.Next() {
|
|
||||||
item := it.Item()
|
|
||||||
var tstat TorrentStats
|
|
||||||
err := item.Value(func(v []byte) error {
|
|
||||||
return json.Unmarshal(v, &tstat)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
t := time.Unix(int64(item.Version()), 0)
|
|
||||||
|
|
||||||
stats[t] = tstat
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return maps.Values(stats), err
|
|
||||||
}
|
|
|
@ -99,6 +99,21 @@ type Torrent {
|
||||||
excludedFiles: [TorrentFile!]! @resolver
|
excludedFiles: [TorrentFile!]! @resolver
|
||||||
peers: [TorrentPeer!]! @resolver
|
peers: [TorrentPeer!]! @resolver
|
||||||
}
|
}
|
||||||
|
type TorrentClientStats {
|
||||||
|
bytesWritten: Int!
|
||||||
|
bytesWrittenData: Int!
|
||||||
|
bytesRead: Int!
|
||||||
|
bytesReadData: Int!
|
||||||
|
bytesReadUsefulData: Int!
|
||||||
|
bytesReadUsefulIntendedData: Int!
|
||||||
|
chunksWritten: Int!
|
||||||
|
chunksRead: Int!
|
||||||
|
chunksReadUseful: Int!
|
||||||
|
chunksReadWasted: Int!
|
||||||
|
metadataChunksRead: Int!
|
||||||
|
piecesDirtiedGood: Int!
|
||||||
|
piecesDirtiedBad: Int!
|
||||||
|
}
|
||||||
type TorrentDaemonMutation {
|
type TorrentDaemonMutation {
|
||||||
validateTorrent(filter: TorrentFilter!): Boolean! @resolver
|
validateTorrent(filter: TorrentFilter!): Boolean! @resolver
|
||||||
setTorrentPriority(infohash: String!, file: String, priority: TorrentPriority!): Boolean! @resolver
|
setTorrentPriority(infohash: String!, file: String, priority: TorrentPriority!): Boolean! @resolver
|
||||||
|
@ -106,7 +121,8 @@ type TorrentDaemonMutation {
|
||||||
}
|
}
|
||||||
type TorrentDaemonQuery {
|
type TorrentDaemonQuery {
|
||||||
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
||||||
stats: TorrentStats! @resolver
|
clientStats: TorrentClientStats! @resolver
|
||||||
|
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
|
||||||
}
|
}
|
||||||
type TorrentFS implements Dir & FsEntry {
|
type TorrentFS implements Dir & FsEntry {
|
||||||
name: String!
|
name: String!
|
||||||
|
@ -156,19 +172,12 @@ type TorrentProgress implements Progress {
|
||||||
total: Int!
|
total: Int!
|
||||||
}
|
}
|
||||||
type TorrentStats {
|
type TorrentStats {
|
||||||
bytesWritten: Int!
|
timestamp: DateTime!
|
||||||
bytesWrittenData: Int!
|
downloadedBytes: UInt!
|
||||||
bytesRead: Int!
|
uploadedBytes: UInt!
|
||||||
bytesReadData: Int!
|
totalPeers: UInt!
|
||||||
bytesReadUsefulData: Int!
|
activePeers: UInt!
|
||||||
bytesReadUsefulIntendedData: Int!
|
connectedSeeders: UInt!
|
||||||
chunksWritten: Int!
|
|
||||||
chunksRead: Int!
|
|
||||||
chunksReadUseful: Int!
|
|
||||||
chunksReadWasted: Int!
|
|
||||||
metadataChunksRead: Int!
|
|
||||||
piecesDirtiedGood: Int!
|
|
||||||
piecesDirtiedBad: Int!
|
|
||||||
}
|
}
|
||||||
input TorrentsFilter {
|
input TorrentsFilter {
|
||||||
infohash: StringFilter
|
infohash: StringFilter
|
||||||
|
@ -178,4 +187,5 @@ input TorrentsFilter {
|
||||||
peersCount: IntFilter
|
peersCount: IntFilter
|
||||||
priority: TorrentPriorityFilter
|
priority: TorrentPriorityFilter
|
||||||
}
|
}
|
||||||
|
scalar UInt
|
||||||
scalar Upload
|
scalar Upload
|
||||||
|
|
Loading…
Reference in a new issue