Compare commits
No commits in common. "633a1d6e2518fd12f61eb63d994fb3243362cefd" and "d5aa78cb39c4acc253a95288cf71c0238e757183" have entirely different histories.
633a1d6e25
...
d5aa78cb39
18 changed files with 991 additions and 1545 deletions
|
@ -21,9 +21,6 @@ models:
|
||||||
model: github.com/99designs/gqlgen/graphql.Time
|
model: github.com/99designs/gqlgen/graphql.Time
|
||||||
Int:
|
Int:
|
||||||
model: github.com/99designs/gqlgen/graphql.Int64
|
model: github.com/99designs/gqlgen/graphql.Int64
|
||||||
UInt:
|
|
||||||
model:
|
|
||||||
- github.com/99designs/gqlgen/graphql.Uint
|
|
||||||
Torrent:
|
Torrent:
|
||||||
extraFields:
|
extraFields:
|
||||||
T:
|
T:
|
||||||
|
|
|
@ -5,7 +5,6 @@ directive @stream on FIELD_DEFINITION
|
||||||
|
|
||||||
scalar DateTime
|
scalar DateTime
|
||||||
scalar Upload
|
scalar Upload
|
||||||
scalar UInt
|
|
||||||
|
|
||||||
type Schema {
|
type Schema {
|
||||||
query: Query
|
query: Query
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
type TorrentDaemonQuery {
|
type TorrentDaemonQuery {
|
||||||
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
||||||
clientStats: TorrentClientStats! @resolver
|
stats: TorrentStats! @resolver
|
||||||
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
|
|
||||||
}
|
}
|
||||||
|
|
||||||
input TorrentsFilter {
|
input TorrentsFilter {
|
||||||
|
|
|
@ -33,7 +33,7 @@ enum TorrentPriority {
|
||||||
NOW
|
NOW
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentClientStats {
|
type TorrentStats {
|
||||||
bytesWritten: Int!
|
bytesWritten: Int!
|
||||||
bytesWrittenData: Int!
|
bytesWrittenData: Int!
|
||||||
bytesRead: Int!
|
bytesRead: Int!
|
||||||
|
@ -51,12 +51,3 @@ type TorrentClientStats {
|
||||||
piecesDirtiedGood: Int!
|
piecesDirtiedGood: Int!
|
||||||
piecesDirtiedBad: Int!
|
piecesDirtiedBad: Int!
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentStats {
|
|
||||||
timestamp: DateTime!
|
|
||||||
downloadedBytes: UInt!
|
|
||||||
uploadedBytes: UInt!
|
|
||||||
totalPeers: UInt!
|
|
||||||
activePeers: UInt!
|
|
||||||
connectedSeeders: UInt!
|
|
||||||
}
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ type File interface {
|
||||||
// Name returns the name of the file as presented to Open.
|
// Name returns the name of the file as presented to Open.
|
||||||
Name() string
|
Name() string
|
||||||
ctxio.Writer
|
ctxio.Writer
|
||||||
// ctxio.Reader
|
ctxio.Reader
|
||||||
ctxio.ReaderAt
|
ctxio.ReaderAt
|
||||||
io.Seeker
|
io.Seeker
|
||||||
ctxio.Closer
|
ctxio.Closer
|
||||||
|
|
|
@ -89,7 +89,7 @@ func TestNFS(t *testing.T) {
|
||||||
}
|
}
|
||||||
mf, _ := mem.OpenFile(ctx, "/helloworld.txt", os.O_RDONLY, 0)
|
mf, _ := mem.OpenFile(ctx, "/helloworld.txt", os.O_RDONLY, 0)
|
||||||
buf := make([]byte, len(b))
|
buf := make([]byte, len(b))
|
||||||
if _, err = mf.ReadAt(ctx, buf[:], 0); err != nil {
|
if _, err = mf.Read(ctx, buf[:]); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(buf, b) {
|
if !bytes.Equal(buf, b) {
|
||||||
|
|
126
src/delivery/api.go
Normal file
126
src/delivery/api.go
Normal file
|
@ -0,0 +1,126 @@
|
||||||
|
package delivery
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||||
|
"github.com/anacrolix/missinggo/v2/filecache"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerFunc {
|
||||||
|
return func(ctx *gin.Context) {
|
||||||
|
stat := gin.H{
|
||||||
|
"torrentStats": ss.GlobalStats(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if fc != nil {
|
||||||
|
stat["cacheItems"] = fc.Info().NumItems
|
||||||
|
stat["cacheFilled"] = fc.Info().Filled / 1024 / 1024
|
||||||
|
stat["cacheCapacity"] = fc.Info().Capacity / 1024 / 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO move to a struct
|
||||||
|
ctx.JSON(http.StatusOK, stat)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// var apiServersHandler = func(ss []*service.Server) gin.HandlerFunc {
|
||||||
|
// return func(ctx *gin.Context) {
|
||||||
|
// var infos []*torrent.ServerInfo
|
||||||
|
// for _, s := range ss {
|
||||||
|
// infos = append(infos, s.Info())
|
||||||
|
// }
|
||||||
|
// ctx.JSON(http.StatusOK, infos)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var apiRoutesHandler = func(ss *service.Stats) gin.HandlerFunc {
|
||||||
|
// return func(ctx *gin.Context) {
|
||||||
|
// s := ss.RoutesStats()
|
||||||
|
// sort.Sort(torrent.ByName(s))
|
||||||
|
// ctx.JSON(http.StatusOK, s)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var apiAddTorrentHandler = func(s *service.Service) gin.HandlerFunc {
|
||||||
|
// return func(ctx *gin.Context) {
|
||||||
|
// route := ctx.Param("route")
|
||||||
|
|
||||||
|
// var json RouteAdd
|
||||||
|
// if err := ctx.ShouldBindJSON(&json); err != nil {
|
||||||
|
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if err := s.AddMagnet(route, json.Magnet); err != nil {
|
||||||
|
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// ctx.JSON(http.StatusOK, nil)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var apiDelTorrentHandler = func(s *service.Service) gin.HandlerFunc {
|
||||||
|
// return func(ctx *gin.Context) {
|
||||||
|
// route := ctx.Param("route")
|
||||||
|
// hash := ctx.Param("torrent_hash")
|
||||||
|
|
||||||
|
// if err := s.RemoveFromHash(route, hash); err != nil {
|
||||||
|
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// ctx.JSON(http.StatusOK, nil)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
var apiLogHandler = func(path string) gin.HandlerFunc {
|
||||||
|
return func(ctx *gin.Context) {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fi, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
max := math.Max(float64(-fi.Size()), -1024*8*8)
|
||||||
|
_, err = f.Seek(int64(max), io.SeekEnd)
|
||||||
|
if err != nil {
|
||||||
|
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
ctx.Stream(func(w io.Writer) bool {
|
||||||
|
_, err := b.ReadFrom(f)
|
||||||
|
if err != nil {
|
||||||
|
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = b.WriteTo(w)
|
||||||
|
if err != nil {
|
||||||
|
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load diff
|
@ -7,14 +7,6 @@ import (
|
||||||
atorrent "github.com/anacrolix/torrent"
|
atorrent "github.com/anacrolix/torrent"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Apply[I any, O any](in []I, f func(I) O) []O {
|
|
||||||
out := make([]O, len(in))
|
|
||||||
for i, v := range in {
|
|
||||||
out[i] = f(v)
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func MapPeerSource(source atorrent.PeerSource) string {
|
func MapPeerSource(source atorrent.PeerSource) string {
|
||||||
switch source {
|
switch source {
|
||||||
case atorrent.PeerSourceDirect:
|
case atorrent.PeerSourceDirect:
|
||||||
|
@ -51,14 +43,3 @@ func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) {
|
||||||
T: t,
|
T: t,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func MapTorrentStats(s torrent.TorrentStats) *TorrentStats {
|
|
||||||
return &TorrentStats{
|
|
||||||
Timestamp: s.Timestamp,
|
|
||||||
DownloadedBytes: uint(s.DownloadedBytes),
|
|
||||||
UploadedBytes: uint(s.UploadedBytes),
|
|
||||||
TotalPeers: uint(s.TotalPeers),
|
|
||||||
ActivePeers: uint(s.ActivePeers),
|
|
||||||
ConnectedSeeders: uint(s.ConnectedSeeders),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -184,22 +184,6 @@ type Torrent struct {
|
||||||
T *torrent.Controller `json:"-"`
|
T *torrent.Controller `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentClientStats struct {
|
|
||||||
BytesWritten int64 `json:"bytesWritten"`
|
|
||||||
BytesWrittenData int64 `json:"bytesWrittenData"`
|
|
||||||
BytesRead int64 `json:"bytesRead"`
|
|
||||||
BytesReadData int64 `json:"bytesReadData"`
|
|
||||||
BytesReadUsefulData int64 `json:"bytesReadUsefulData"`
|
|
||||||
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"`
|
|
||||||
ChunksWritten int64 `json:"chunksWritten"`
|
|
||||||
ChunksRead int64 `json:"chunksRead"`
|
|
||||||
ChunksReadUseful int64 `json:"chunksReadUseful"`
|
|
||||||
ChunksReadWasted int64 `json:"chunksReadWasted"`
|
|
||||||
MetadataChunksRead int64 `json:"metadataChunksRead"`
|
|
||||||
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
|
|
||||||
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TorrentDaemonMutation struct {
|
type TorrentDaemonMutation struct {
|
||||||
ValidateTorrent bool `json:"validateTorrent"`
|
ValidateTorrent bool `json:"validateTorrent"`
|
||||||
SetTorrentPriority bool `json:"setTorrentPriority"`
|
SetTorrentPriority bool `json:"setTorrentPriority"`
|
||||||
|
@ -208,8 +192,7 @@ type TorrentDaemonMutation struct {
|
||||||
|
|
||||||
type TorrentDaemonQuery struct {
|
type TorrentDaemonQuery struct {
|
||||||
Torrents []*Torrent `json:"torrents"`
|
Torrents []*Torrent `json:"torrents"`
|
||||||
ClientStats *TorrentClientStats `json:"clientStats"`
|
Stats *TorrentStats `json:"stats"`
|
||||||
StatsHistory []*TorrentStats `json:"statsHistory"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentFs struct {
|
type TorrentFs struct {
|
||||||
|
@ -288,12 +271,19 @@ func (this TorrentProgress) GetCurrent() int64 { return this.Current }
|
||||||
func (this TorrentProgress) GetTotal() int64 { return this.Total }
|
func (this TorrentProgress) GetTotal() int64 { return this.Total }
|
||||||
|
|
||||||
type TorrentStats struct {
|
type TorrentStats struct {
|
||||||
Timestamp time.Time `json:"timestamp"`
|
BytesWritten int64 `json:"bytesWritten"`
|
||||||
DownloadedBytes uint `json:"downloadedBytes"`
|
BytesWrittenData int64 `json:"bytesWrittenData"`
|
||||||
UploadedBytes uint `json:"uploadedBytes"`
|
BytesRead int64 `json:"bytesRead"`
|
||||||
TotalPeers uint `json:"totalPeers"`
|
BytesReadData int64 `json:"bytesReadData"`
|
||||||
ActivePeers uint `json:"activePeers"`
|
BytesReadUsefulData int64 `json:"bytesReadUsefulData"`
|
||||||
ConnectedSeeders uint `json:"connectedSeeders"`
|
BytesReadUsefulIntendedData int64 `json:"bytesReadUsefulIntendedData"`
|
||||||
|
ChunksWritten int64 `json:"chunksWritten"`
|
||||||
|
ChunksRead int64 `json:"chunksRead"`
|
||||||
|
ChunksReadUseful int64 `json:"chunksReadUseful"`
|
||||||
|
ChunksReadWasted int64 `json:"chunksReadWasted"`
|
||||||
|
MetadataChunksRead int64 `json:"metadataChunksRead"`
|
||||||
|
PiecesDirtiedGood int64 `json:"piecesDirtiedGood"`
|
||||||
|
PiecesDirtiedBad int64 `json:"piecesDirtiedBad"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentsFilter struct {
|
type TorrentsFilter struct {
|
||||||
|
|
|
@ -8,13 +8,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||||
|
|
||||||
tinfohash "github.com/anacrolix/torrent/types/infohash"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Torrents is the resolver for the torrents field.
|
// Torrents is the resolver for the torrents field.
|
||||||
|
@ -89,10 +86,10 @@ func (r *torrentDaemonQueryResolver) Torrents(ctx context.Context, obj *model.To
|
||||||
return tr, nil
|
return tr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClientStats is the resolver for the clientStats field.
|
// Stats is the resolver for the stats field.
|
||||||
func (r *torrentDaemonQueryResolver) ClientStats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentClientStats, error) {
|
func (r *torrentDaemonQueryResolver) Stats(ctx context.Context, obj *model.TorrentDaemonQuery) (*model.TorrentStats, error) {
|
||||||
stats := r.Service.Stats()
|
stats := r.Service.Stats()
|
||||||
return &model.TorrentClientStats{
|
return &model.TorrentStats{
|
||||||
BytesWritten: stats.BytesWritten.Int64(),
|
BytesWritten: stats.BytesWritten.Int64(),
|
||||||
BytesRead: stats.BytesRead.Int64(),
|
BytesRead: stats.BytesRead.Int64(),
|
||||||
BytesWrittenData: stats.BytesWrittenData.Int64(),
|
BytesWrittenData: stats.BytesWrittenData.Int64(),
|
||||||
|
@ -109,33 +106,6 @@ func (r *torrentDaemonQueryResolver) ClientStats(ctx context.Context, obj *model
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StatsHistory is the resolver for the statsHistory field.
|
|
||||||
func (r *torrentDaemonQueryResolver) StatsHistory(ctx context.Context, obj *model.TorrentDaemonQuery, since time.Time, infohash *string) ([]*model.TorrentStats, error) {
|
|
||||||
var stats []torrent.TorrentStats
|
|
||||||
if infohash == nil {
|
|
||||||
stats, err := r.Service.StatsHistory(ctx, since)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return model.Apply(stats, model.MapTorrentStats), nil
|
|
||||||
} else if *infohash == "total" {
|
|
||||||
var err error
|
|
||||||
stats, err = r.Service.TotalStatsHistory(ctx, since)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ih := tinfohash.FromHexString(*infohash)
|
|
||||||
var err error
|
|
||||||
stats, err = r.Service.TorrentStatsHistory(ctx, since, ih)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return model.Apply(stats, model.MapTorrentStats), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation.
|
// TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation.
|
||||||
func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver {
|
func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver {
|
||||||
return &torrentDaemonQueryResolver{r}
|
return &torrentDaemonQueryResolver{r}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package torrent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/anacrolix/dht/v2"
|
"github.com/anacrolix/dht/v2"
|
||||||
"github.com/anacrolix/dht/v2/bep44"
|
"github.com/anacrolix/dht/v2/bep44"
|
||||||
|
@ -58,14 +59,9 @@ func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
|
||||||
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
|
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
|
||||||
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
|
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
|
||||||
cfg.Store = fis
|
cfg.Store = fis
|
||||||
cfg.Exp = dhtTTL
|
cfg.Exp = 2 * time.Hour
|
||||||
cfg.NoSecurity = false
|
cfg.NoSecurity = false
|
||||||
}
|
}
|
||||||
|
|
||||||
t, err := torrent.NewClient(torrentCfg)
|
return torrent.NewClient(torrentCfg)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return t, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,6 @@ type Daemon struct {
|
||||||
fis *dhtFileItemStore
|
fis *dhtFileItemStore
|
||||||
dirsAquire kv.Store[string, DirAquire]
|
dirsAquire kv.Store[string, DirAquire]
|
||||||
fileProperties kv.Store[string, FileProperties]
|
fileProperties kv.Store[string, FileProperties]
|
||||||
statsStore *statsStore
|
|
||||||
|
|
||||||
loadMutex sync.Mutex
|
loadMutex sync.Mutex
|
||||||
torrentLoaded chan struct{}
|
torrentLoaded chan struct{}
|
||||||
|
@ -58,8 +57,6 @@ type Daemon struct {
|
||||||
log *rlog.Logger
|
log *rlog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
const dhtTTL = 24 * time.Hour
|
|
||||||
|
|
||||||
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
|
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
|
||||||
s := &Daemon{
|
s := &Daemon{
|
||||||
log: rlog.Component("torrent-service"),
|
log: rlog.Component("torrent-service"),
|
||||||
|
@ -73,7 +70,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
return nil, fmt.Errorf("error creating metadata folder: %w", err)
|
return nil, fmt.Errorf("error creating metadata folder: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), dhtTTL)
|
s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), 3*time.Hour)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error starting item store: %w", err)
|
return nil, fmt.Errorf("error starting item store: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -89,6 +86,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
|
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -98,11 +96,6 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
return nil, fmt.Errorf("error creating node ID: %w", err)
|
return nil, fmt.Errorf("error creating node ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.statsStore, err = newStatsStore(conf.MetadataFolder, time.Hour*24*30)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.client, err = newClient(s.Storage, s.fis, &conf, id)
|
s.client, err = newClient(s.Storage, s.fis, &conf, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error starting torrent client: %w", err)
|
return nil, fmt.Errorf("error starting torrent client: %w", err)
|
||||||
|
@ -123,77 +116,9 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
|
||||||
close(s.torrentLoaded)
|
close(s.torrentLoaded)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
|
||||||
const period = time.Second * 10
|
|
||||||
|
|
||||||
<-s.torrentLoaded
|
|
||||||
|
|
||||||
timer := time.NewTicker(period)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.client.Closed():
|
|
||||||
return
|
|
||||||
case <-timer.C:
|
|
||||||
s.updateStats(context.Background())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Daemon) updateStats(ctx context.Context) {
|
|
||||||
log := s.log
|
|
||||||
|
|
||||||
totalPeers := 0
|
|
||||||
activePeers := 0
|
|
||||||
connectedSeeders := 0
|
|
||||||
|
|
||||||
for _, v := range s.client.Torrents() {
|
|
||||||
stats := v.Stats()
|
|
||||||
err := s.statsStore.AddTorrentStats(v.InfoHash(), TorrentStats{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
DownloadedBytes: uint64(stats.BytesRead.Int64()),
|
|
||||||
UploadedBytes: uint64(stats.BytesWritten.Int64()),
|
|
||||||
TotalPeers: uint16(stats.TotalPeers),
|
|
||||||
ActivePeers: uint16(stats.ActivePeers),
|
|
||||||
ConnectedSeeders: uint16(stats.ConnectedSeeders),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
totalPeers += stats.TotalPeers
|
|
||||||
activePeers += stats.ActivePeers
|
|
||||||
connectedSeeders += stats.ConnectedSeeders
|
|
||||||
}
|
|
||||||
|
|
||||||
totalStats := s.client.Stats()
|
|
||||||
err := s.statsStore.AddTotalStats(TorrentStats{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
DownloadedBytes: uint64(totalStats.BytesRead.Int64()),
|
|
||||||
UploadedBytes: uint64(totalStats.BytesWritten.Int64()),
|
|
||||||
TotalPeers: uint16(totalPeers),
|
|
||||||
ActivePeers: uint16(activePeers),
|
|
||||||
ConnectedSeeders: uint16(connectedSeeders),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Error(ctx, "error saving total stats", rlog.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
|
||||||
return s.statsStore.ReadTotalStatsHistory(ctx, since)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
|
||||||
return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
|
||||||
return s.statsStore.ReadStatsHistory(ctx, since)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
|
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
|
||||||
|
|
||||||
func (s *Daemon) Close(ctx context.Context) error {
|
func (s *Daemon) Close(ctx context.Context) error {
|
||||||
|
@ -312,7 +237,7 @@ func isValidInfoHashBytes(d []byte) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Daemon) Stats() torrent.ConnStats {
|
func (s *Daemon) Stats() torrent.ConnStats {
|
||||||
return s.client.Stats().ConnStats
|
return s.client.ConnStats()
|
||||||
}
|
}
|
||||||
|
|
||||||
const loadWorkers = 5
|
const loadWorkers = 5
|
||||||
|
|
|
@ -85,25 +85,7 @@ func (fis *dhtFileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fis *dhtFileItemStore) Del(t bep44.Target) error {
|
func (fis *dhtFileItemStore) Del(t bep44.Target) error {
|
||||||
tx := fis.db.NewTransaction(true)
|
// ignore this
|
||||||
defer tx.Discard()
|
|
||||||
|
|
||||||
err := tx.Delete(t[:])
|
|
||||||
if err == badger.ErrKeyNotFound {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = tx.Commit()
|
|
||||||
if err == badger.ErrKeyNotFound {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,7 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
|
||||||
|
|
||||||
return fs.filesCache, nil
|
return fs.filesCache, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DEFAULT_DIR:
|
DEFAULT_DIR:
|
||||||
|
|
|
@ -1,207 +1,218 @@
|
||||||
package torrent
|
package torrent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"errors"
|
||||||
"encoding/json"
|
"sync"
|
||||||
"path"
|
|
||||||
"slices"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/src/log"
|
"github.com/anacrolix/torrent"
|
||||||
"github.com/anacrolix/torrent/types/infohash"
|
|
||||||
"github.com/dgraph-io/badger/v4"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
|
var ErrTorrentNotFound = errors.New("torrent not found")
|
||||||
db, err := badger.OpenManaged(
|
|
||||||
badger.
|
|
||||||
DefaultOptions(path.Join(metaDir, "stats")).
|
|
||||||
WithNumVersionsToKeep(int(^uint(0) >> 1)).
|
|
||||||
WithLogger(log.BadgerLogger("stats")), // Infinity
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
type PieceStatus string
|
||||||
for n := range time.NewTimer(lifetime / 2).C {
|
|
||||||
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return &statsStore{
|
|
||||||
db: db,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type statsStore struct {
|
const (
|
||||||
db *badger.DB
|
Checking PieceStatus = "H"
|
||||||
|
Partial PieceStatus = "P"
|
||||||
|
Complete PieceStatus = "C"
|
||||||
|
Waiting PieceStatus = "W"
|
||||||
|
Error PieceStatus = "?"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PieceChunk struct {
|
||||||
|
Status PieceStatus `json:"status"`
|
||||||
|
NumPieces int `json:"numPieces"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TorrentStats struct {
|
type TorrentStats struct {
|
||||||
Timestamp time.Time
|
Name string `json:"name"`
|
||||||
DownloadedBytes uint64
|
Hash string `json:"hash"`
|
||||||
UploadedBytes uint64
|
DownloadedBytes int64 `json:"downloadedBytes"`
|
||||||
TotalPeers uint16
|
UploadedBytes int64 `json:"uploadedBytes"`
|
||||||
ActivePeers uint16
|
Peers int `json:"peers"`
|
||||||
ConnectedSeeders uint16
|
Seeders int `json:"seeders"`
|
||||||
|
TimePassed float64 `json:"timePassed"`
|
||||||
|
PieceChunks []*PieceChunk `json:"pieceChunks"`
|
||||||
|
TotalPieces int `json:"totalPieces"`
|
||||||
|
PieceSize int64 `json:"pieceSize"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s TorrentStats) Same(o TorrentStats) bool {
|
type byName []*TorrentStats
|
||||||
return s.DownloadedBytes == o.DownloadedBytes &&
|
|
||||||
s.UploadedBytes == o.UploadedBytes &&
|
func (a byName) Len() int { return len(a) }
|
||||||
s.TotalPeers == o.TotalPeers &&
|
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
s.ActivePeers == o.ActivePeers &&
|
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||||
s.ConnectedSeeders == o.ConnectedSeeders
|
|
||||||
|
type TotalTorrentStats struct {
|
||||||
|
DownloadedBytes int64 `json:"downloadedBytes"`
|
||||||
|
UploadedBytes int64 `json:"uploadedBytes"`
|
||||||
|
TimePassed float64 `json:"timePassed"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *statsStore) addStats(key []byte, stat TorrentStats) error {
|
type RouteStats struct {
|
||||||
ts := uint64(stat.Timestamp.Unix())
|
Name string `json:"name"`
|
||||||
|
TorrentStats []*TorrentStats `json:"torrentStats"`
|
||||||
txn := r.db.NewTransactionAt(ts, true)
|
|
||||||
defer txn.Discard()
|
|
||||||
|
|
||||||
item, err := txn.Get(key)
|
|
||||||
if err != nil && err != badger.ErrKeyNotFound {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != badger.ErrKeyNotFound {
|
|
||||||
var prevStats TorrentStats
|
|
||||||
err = item.Value(func(val []byte) error {
|
|
||||||
return json.Unmarshal(val, &prevStats)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if prevStats.Same(stat) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(stat)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = txn.Set(key, data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return txn.CommitAt(ts, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *statsStore) AddTorrentStats(ih infohash.T, stat TorrentStats) error {
|
type ByName []*RouteStats
|
||||||
return r.addStats(ih.Bytes(), stat)
|
|
||||||
|
func (a ByName) Len() int { return len(a) }
|
||||||
|
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||||
|
|
||||||
|
type stat struct {
|
||||||
|
totalDownloadBytes int64
|
||||||
|
downloadBytes int64
|
||||||
|
totalUploadBytes int64
|
||||||
|
uploadBytes int64
|
||||||
|
peers int
|
||||||
|
seeders int
|
||||||
|
time time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
const totalKey = "total"
|
type Stats struct {
|
||||||
|
mut sync.Mutex
|
||||||
|
torrents map[string]*torrent.Torrent
|
||||||
|
previousStats map[string]*stat
|
||||||
|
|
||||||
func (r *statsStore) AddTotalStats(stat TorrentStats) error {
|
gTime time.Time
|
||||||
return r.addStats([]byte(totalKey), stat)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
func NewStats() *Stats {
|
||||||
stats := []TorrentStats{}
|
return &Stats{
|
||||||
|
gTime: time.Now(),
|
||||||
err := r.db.View(func(txn *badger.Txn) error {
|
torrents: make(map[string]*torrent.Torrent),
|
||||||
opts := badger.DefaultIteratorOptions
|
// torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
|
||||||
opts.AllVersions = true
|
// previousStats: make(map[string]*stat),
|
||||||
opts.SinceTs = uint64(since.Unix())
|
|
||||||
|
|
||||||
it := txn.NewKeyIterator([]byte(totalKey), opts)
|
|
||||||
defer it.Close()
|
|
||||||
for it.Rewind(); it.Valid(); it.Next() {
|
|
||||||
item := it.Item()
|
|
||||||
var stat TorrentStats
|
|
||||||
err := item.Value(func(v []byte) error {
|
|
||||||
return json.Unmarshal(v, &stat)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stats = append(stats, stat)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
|
||||||
return a.Timestamp.Compare(b.Timestamp)
|
|
||||||
})
|
|
||||||
stats = slices.Compact(stats)
|
|
||||||
return stats, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *statsStore) ReadTorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
func (s *Stats) Add(path string, t *torrent.Torrent) {
|
||||||
stats := []TorrentStats{}
|
s.mut.Lock()
|
||||||
|
defer s.mut.Unlock()
|
||||||
|
|
||||||
err := r.db.View(func(txn *badger.Txn) error {
|
s.torrents[path] = t
|
||||||
opts := badger.DefaultIteratorOptions
|
s.previousStats[path] = &stat{}
|
||||||
opts.AllVersions = true
|
|
||||||
opts.SinceTs = uint64(since.Unix())
|
|
||||||
|
|
||||||
it := txn.NewKeyIterator(ih.Bytes(), opts)
|
|
||||||
defer it.Close()
|
|
||||||
for it.Rewind(); it.Valid(); it.Next() {
|
|
||||||
item := it.Item()
|
|
||||||
var stat TorrentStats
|
|
||||||
err := item.Value(func(v []byte) error {
|
|
||||||
return json.Unmarshal(v, &stat)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
stats = append(stats, stat)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
|
||||||
return a.Timestamp.Compare(b.Timestamp)
|
|
||||||
})
|
|
||||||
stats = slices.Compact(stats)
|
|
||||||
return stats, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) {
|
func (s *Stats) Del(path string) {
|
||||||
stats := []TorrentStats{}
|
s.mut.Lock()
|
||||||
|
defer s.mut.Unlock()
|
||||||
err := r.db.View(func(txn *badger.Txn) error {
|
delete(s.torrents, path)
|
||||||
opts := badger.DefaultIteratorOptions
|
delete(s.previousStats, path)
|
||||||
opts.AllVersions = true
|
}
|
||||||
opts.SinceTs = uint64(since.Unix())
|
|
||||||
|
func (s *Stats) Stats(path string) (*TorrentStats, error) {
|
||||||
it := txn.NewIterator(opts)
|
s.mut.Lock()
|
||||||
defer it.Close()
|
defer s.mut.Unlock()
|
||||||
for it.Rewind(); it.Valid(); it.Next() {
|
|
||||||
item := it.Item()
|
t, ok := s.torrents[path]
|
||||||
var stat TorrentStats
|
if !(ok) {
|
||||||
err := item.Value(func(v []byte) error {
|
return nil, ErrTorrentNotFound
|
||||||
return json.Unmarshal(v, &stat)
|
}
|
||||||
})
|
|
||||||
if err != nil {
|
now := time.Now()
|
||||||
return err
|
|
||||||
}
|
return s.stats(now, t, true), nil
|
||||||
|
}
|
||||||
stats = append(stats, stat)
|
|
||||||
}
|
func (s *Stats) GlobalStats() *TotalTorrentStats {
|
||||||
return nil
|
s.mut.Lock()
|
||||||
})
|
defer s.mut.Unlock()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
now := time.Now()
|
||||||
}
|
|
||||||
|
var totalDownload int64
|
||||||
slices.SortFunc(stats, func(a, b TorrentStats) int {
|
var totalUpload int64
|
||||||
return a.Timestamp.Compare(b.Timestamp)
|
for _, torrent := range s.torrents {
|
||||||
})
|
tStats := s.stats(now, torrent, false)
|
||||||
stats = slices.Compact(stats)
|
totalDownload += tStats.DownloadedBytes
|
||||||
return stats, nil
|
totalUpload += tStats.UploadedBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
timePassed := now.Sub(s.gTime)
|
||||||
|
s.gTime = now
|
||||||
|
|
||||||
|
return &TotalTorrentStats{
|
||||||
|
DownloadedBytes: totalDownload,
|
||||||
|
UploadedBytes: totalUpload,
|
||||||
|
TimePassed: timePassed.Seconds(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stats) stats(now time.Time, t *torrent.Torrent, chunks bool) *TorrentStats {
|
||||||
|
ts := &TorrentStats{}
|
||||||
|
prev, ok := s.previousStats[t.InfoHash().String()]
|
||||||
|
if !ok {
|
||||||
|
return &TorrentStats{}
|
||||||
|
}
|
||||||
|
if s.returnPreviousMeasurements(now) {
|
||||||
|
ts.DownloadedBytes = prev.downloadBytes
|
||||||
|
ts.UploadedBytes = prev.uploadBytes
|
||||||
|
} else {
|
||||||
|
st := t.Stats()
|
||||||
|
rd := st.BytesReadData.Int64()
|
||||||
|
wd := st.BytesWrittenData.Int64()
|
||||||
|
ist := &stat{
|
||||||
|
downloadBytes: rd - prev.totalDownloadBytes,
|
||||||
|
uploadBytes: wd - prev.totalUploadBytes,
|
||||||
|
totalDownloadBytes: rd,
|
||||||
|
totalUploadBytes: wd,
|
||||||
|
time: now,
|
||||||
|
peers: st.TotalPeers,
|
||||||
|
seeders: st.ConnectedSeeders,
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.DownloadedBytes = ist.downloadBytes
|
||||||
|
ts.UploadedBytes = ist.uploadBytes
|
||||||
|
ts.Peers = ist.peers
|
||||||
|
ts.Seeders = ist.seeders
|
||||||
|
|
||||||
|
s.previousStats[t.InfoHash().String()] = ist
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.TimePassed = now.Sub(prev.time).Seconds()
|
||||||
|
var totalPieces int
|
||||||
|
if chunks {
|
||||||
|
var pch []*PieceChunk
|
||||||
|
for _, psr := range t.PieceStateRuns() {
|
||||||
|
var s PieceStatus
|
||||||
|
switch {
|
||||||
|
case psr.Checking:
|
||||||
|
s = Checking
|
||||||
|
case psr.Partial:
|
||||||
|
s = Partial
|
||||||
|
case psr.Complete:
|
||||||
|
s = Complete
|
||||||
|
case !psr.Ok:
|
||||||
|
s = Error
|
||||||
|
default:
|
||||||
|
s = Waiting
|
||||||
|
}
|
||||||
|
|
||||||
|
pch = append(pch, &PieceChunk{
|
||||||
|
Status: s,
|
||||||
|
NumPieces: psr.Length,
|
||||||
|
})
|
||||||
|
totalPieces += psr.Length
|
||||||
|
}
|
||||||
|
ts.PieceChunks = pch
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.Hash = t.InfoHash().String()
|
||||||
|
ts.Name = t.Name()
|
||||||
|
ts.TotalPieces = totalPieces
|
||||||
|
|
||||||
|
if t.Info() != nil {
|
||||||
|
ts.PieceSize = t.Info().PieceLength
|
||||||
|
}
|
||||||
|
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
|
||||||
|
const gap time.Duration = 2 * time.Second
|
||||||
|
|
||||||
|
func (s *Stats) returnPreviousMeasurements(now time.Time) bool {
|
||||||
|
return now.Sub(s.gTime) < gap
|
||||||
}
|
}
|
||||||
|
|
127
src/sources/torrent/stats_store.go
Normal file
127
src/sources/torrent/stats_store.go
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
package torrent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.kmsign.ru/royalcat/tstor/src/log"
|
||||||
|
"github.com/anacrolix/torrent/types/infohash"
|
||||||
|
"github.com/dgraph-io/badger/v4"
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
|
||||||
|
db, err := badger.OpenManaged(
|
||||||
|
badger.
|
||||||
|
DefaultOptions(path.Join(metaDir, "stats-history")).
|
||||||
|
WithNumVersionsToKeep(int(^uint(0) >> 1)).
|
||||||
|
WithLogger(log.BadgerLogger("stats")), // Infinity
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for n := range time.NewTimer(lifetime / 2).C {
|
||||||
|
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return &statsStore{
|
||||||
|
db: db,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsStore struct {
|
||||||
|
db *badger.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *statsStore) AddStat(ih infohash.T, t time.Time, stat TorrentStats) error {
|
||||||
|
data, err := json.Marshal(stat)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ts := uint64(t.Unix())
|
||||||
|
|
||||||
|
txn := r.db.NewTransactionAt(ts, false)
|
||||||
|
defer txn.Discard()
|
||||||
|
|
||||||
|
err = txn.Set(ih.Bytes(), data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return txn.CommitAt(ts, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TotalTorrentStats, error) {
|
||||||
|
stats := map[time.Time]TotalTorrentStats{}
|
||||||
|
|
||||||
|
err := r.db.View(func(txn *badger.Txn) error {
|
||||||
|
opts := badger.DefaultIteratorOptions
|
||||||
|
opts.AllVersions = true
|
||||||
|
opts.SinceTs = uint64(since.Unix())
|
||||||
|
|
||||||
|
it := txn.NewIterator(opts)
|
||||||
|
defer it.Close()
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
// k := item.Key()
|
||||||
|
var tstat TorrentStats
|
||||||
|
err := item.Value(func(v []byte) error {
|
||||||
|
return json.Unmarshal(v, &tstat)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t := time.Unix(int64(item.Version()), 0)
|
||||||
|
|
||||||
|
if stat, ok := stats[t]; !ok {
|
||||||
|
stats[t] = TotalTorrentStats{
|
||||||
|
DownloadedBytes: tstat.DownloadedBytes,
|
||||||
|
UploadedBytes: stat.DownloadedBytes,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stat.DownloadedBytes += tstat.DownloadedBytes
|
||||||
|
stat.UploadedBytes += tstat.UploadedBytes
|
||||||
|
stats[t] = stat
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return maps.Values(stats), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
||||||
|
var stats map[time.Time]TorrentStats
|
||||||
|
|
||||||
|
err := r.db.View(func(txn *badger.Txn) error {
|
||||||
|
opts := badger.DefaultIteratorOptions
|
||||||
|
opts.AllVersions = true
|
||||||
|
opts.SinceTs = uint64(since.Unix())
|
||||||
|
|
||||||
|
it := txn.NewKeyIterator(ih.Bytes(), opts)
|
||||||
|
defer it.Close()
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
var tstat TorrentStats
|
||||||
|
err := item.Value(func(v []byte) error {
|
||||||
|
return json.Unmarshal(v, &tstat)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t := time.Unix(int64(item.Version()), 0)
|
||||||
|
|
||||||
|
stats[t] = tstat
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return maps.Values(stats), err
|
||||||
|
}
|
|
@ -99,21 +99,6 @@ type Torrent {
|
||||||
excludedFiles: [TorrentFile!]! @resolver
|
excludedFiles: [TorrentFile!]! @resolver
|
||||||
peers: [TorrentPeer!]! @resolver
|
peers: [TorrentPeer!]! @resolver
|
||||||
}
|
}
|
||||||
type TorrentClientStats {
|
|
||||||
bytesWritten: Int!
|
|
||||||
bytesWrittenData: Int!
|
|
||||||
bytesRead: Int!
|
|
||||||
bytesReadData: Int!
|
|
||||||
bytesReadUsefulData: Int!
|
|
||||||
bytesReadUsefulIntendedData: Int!
|
|
||||||
chunksWritten: Int!
|
|
||||||
chunksRead: Int!
|
|
||||||
chunksReadUseful: Int!
|
|
||||||
chunksReadWasted: Int!
|
|
||||||
metadataChunksRead: Int!
|
|
||||||
piecesDirtiedGood: Int!
|
|
||||||
piecesDirtiedBad: Int!
|
|
||||||
}
|
|
||||||
type TorrentDaemonMutation {
|
type TorrentDaemonMutation {
|
||||||
validateTorrent(filter: TorrentFilter!): Boolean! @resolver
|
validateTorrent(filter: TorrentFilter!): Boolean! @resolver
|
||||||
setTorrentPriority(infohash: String!, file: String, priority: TorrentPriority!): Boolean! @resolver
|
setTorrentPriority(infohash: String!, file: String, priority: TorrentPriority!): Boolean! @resolver
|
||||||
|
@ -121,8 +106,7 @@ type TorrentDaemonMutation {
|
||||||
}
|
}
|
||||||
type TorrentDaemonQuery {
|
type TorrentDaemonQuery {
|
||||||
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
|
||||||
clientStats: TorrentClientStats! @resolver
|
stats: TorrentStats! @resolver
|
||||||
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
|
|
||||||
}
|
}
|
||||||
type TorrentFS implements Dir & FsEntry {
|
type TorrentFS implements Dir & FsEntry {
|
||||||
name: String!
|
name: String!
|
||||||
|
@ -172,12 +156,19 @@ type TorrentProgress implements Progress {
|
||||||
total: Int!
|
total: Int!
|
||||||
}
|
}
|
||||||
type TorrentStats {
|
type TorrentStats {
|
||||||
timestamp: DateTime!
|
bytesWritten: Int!
|
||||||
downloadedBytes: UInt!
|
bytesWrittenData: Int!
|
||||||
uploadedBytes: UInt!
|
bytesRead: Int!
|
||||||
totalPeers: UInt!
|
bytesReadData: Int!
|
||||||
activePeers: UInt!
|
bytesReadUsefulData: Int!
|
||||||
connectedSeeders: UInt!
|
bytesReadUsefulIntendedData: Int!
|
||||||
|
chunksWritten: Int!
|
||||||
|
chunksRead: Int!
|
||||||
|
chunksReadUseful: Int!
|
||||||
|
chunksReadWasted: Int!
|
||||||
|
metadataChunksRead: Int!
|
||||||
|
piecesDirtiedGood: Int!
|
||||||
|
piecesDirtiedBad: Int!
|
||||||
}
|
}
|
||||||
input TorrentsFilter {
|
input TorrentsFilter {
|
||||||
infohash: StringFilter
|
infohash: StringFilter
|
||||||
|
@ -187,5 +178,4 @@ input TorrentsFilter {
|
||||||
peersCount: IntFilter
|
peersCount: IntFilter
|
||||||
priority: TorrentPriorityFilter
|
priority: TorrentPriorityFilter
|
||||||
}
|
}
|
||||||
scalar UInt
|
|
||||||
scalar Upload
|
scalar Upload
|
||||||
|
|
Loading…
Reference in a new issue