Add and remove torrents from web interface (#84)
This commit is contained in:
parent
02842b1917
commit
2f18213660
49 changed files with 996 additions and 1170 deletions
|
@ -1,20 +1,34 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"github.com/anacrolix/log"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/dht/v2"
|
||||
"github.com/anacrolix/dht/v2/bep44"
|
||||
tlog "github.com/anacrolix/log"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/distribyted/distribyted/config"
|
||||
dlog "github.com/distribyted/distribyted/log"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func NewClient(st storage.ClientImpl, cfg *config.TorrentGlobal) (*torrent.Client, error) {
|
||||
func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentGlobal) (*torrent.Client, error) {
|
||||
// TODO download and upload limits
|
||||
torrentCfg := torrent.NewDefaultClientConfig()
|
||||
torrentCfg.Logger = log.Discard
|
||||
torrentCfg.Seed = true
|
||||
// torrentCfg.DisableWebseeds = true
|
||||
torrentCfg.DefaultStorage = st
|
||||
|
||||
torrentCfg.DisableIPv6 = cfg.DisableIPv6
|
||||
|
||||
l := log.Logger.With().Str("component", "torrent-client").Logger()
|
||||
torrentCfg.Logger = tlog.Logger{LoggerImpl: &dlog.Torrent{L: l}}
|
||||
|
||||
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
|
||||
cfg.Store = fis
|
||||
cfg.Exp = 2 * time.Hour
|
||||
cfg.NoSecurity = false
|
||||
}
|
||||
|
||||
return torrent.NewClient(torrentCfg)
|
||||
}
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/distribyted/distribyted/config"
|
||||
"github.com/distribyted/distribyted/fs"
|
||||
"github.com/distribyted/distribyted/stats"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
c *torrent.Client
|
||||
s *stats.Torrent
|
||||
|
||||
fssMu sync.Mutex
|
||||
fss map[string]fs.Filesystem
|
||||
}
|
||||
|
||||
func NewHandler(c *torrent.Client, s *stats.Torrent) *Handler {
|
||||
return &Handler{
|
||||
c: c,
|
||||
s: s,
|
||||
fss: make(map[string]fs.Filesystem),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Handler) Load(route string, ts []*config.Torrent) error {
|
||||
var torrents []*torrent.Torrent
|
||||
for _, mpcTorrent := range ts {
|
||||
var t *torrent.Torrent
|
||||
var err error
|
||||
|
||||
switch {
|
||||
case mpcTorrent.MagnetURI != "":
|
||||
t, err = s.c.AddMagnet(mpcTorrent.MagnetURI)
|
||||
case mpcTorrent.TorrentPath != "":
|
||||
t, err = s.c.AddTorrentFromFile(mpcTorrent.TorrentPath)
|
||||
default:
|
||||
err = fmt.Errorf("no magnet URI or torrent path provided")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// only get info if name is not available
|
||||
if t.Name() == "" {
|
||||
log.Info().Str("hash", t.InfoHash().String()).Msg("getting torrent info")
|
||||
<-t.GotInfo()
|
||||
}
|
||||
|
||||
s.s.Add(route, t)
|
||||
torrents = append(torrents, t)
|
||||
|
||||
log.Info().Str("name", t.Name()).Str("route", route).Msg("torrent added to mountpoint")
|
||||
}
|
||||
|
||||
folder := "/" + route
|
||||
|
||||
s.fssMu.Lock()
|
||||
defer s.fssMu.Unlock()
|
||||
s.fss[folder] = fs.NewTorrent(torrents)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Handler) Fileststems() map[string]fs.Filesystem {
|
||||
return s.fss
|
||||
}
|
||||
|
||||
func (s *Handler) RemoveAll() error {
|
||||
s.fssMu.Lock()
|
||||
defer s.fssMu.Unlock()
|
||||
|
||||
s.fss = make(map[string]fs.Filesystem)
|
||||
s.s.RemoveAll()
|
||||
return nil
|
||||
}
|
45
torrent/loader/config.go
Normal file
45
torrent/loader/config.go
Normal file
|
@ -0,0 +1,45 @@
|
|||
package loader
|
||||
|
||||
import "github.com/distribyted/distribyted/config"
|
||||
|
||||
var _ Loader = &Config{}
|
||||
|
||||
type Config struct {
|
||||
c []*config.Route
|
||||
}
|
||||
|
||||
func NewConfig(r []*config.Route) *Config {
|
||||
return &Config{
|
||||
c: r,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Config) ListMagnets() (map[string][]string, error) {
|
||||
out := make(map[string][]string)
|
||||
for _, r := range l.c {
|
||||
for _, t := range r.Torrents {
|
||||
if t.MagnetURI == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
out[r.Name] = append(out[r.Name], t.MagnetURI)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (l *Config) ListTorrentPaths() (map[string][]string, error) {
|
||||
out := make(map[string][]string)
|
||||
for _, r := range l.c {
|
||||
for _, t := range r.Torrents {
|
||||
if t.TorrentPath == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
out[r.Name] = append(out[r.Name], t.TorrentPath)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
107
torrent/loader/db.go
Normal file
107
torrent/loader/db.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package loader
|
||||
|
||||
import (
|
||||
"path"
|
||||
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
dlog "github.com/distribyted/distribyted/log"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var _ LoaderAdder = &DB{}
|
||||
|
||||
const routeRootKey = "/route/"
|
||||
|
||||
type DB struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func NewDB(path string) (*DB, error) {
|
||||
l := log.Logger.With().Str("component", "torrent-store").Logger()
|
||||
db, err := badger.Open(badger.DefaultOptions(path).WithLogger(&dlog.Badger{L: l}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = db.RunValueLogGC(0.5)
|
||||
if err != nil && err != badger.ErrNoRewrite {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &DB{
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *DB) AddMagnet(r, m string) error {
|
||||
err := l.db.Update(func(txn *badger.Txn) error {
|
||||
spec, err := metainfo.ParseMagnetUri(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ih := spec.InfoHash.HexString()
|
||||
|
||||
rp := path.Join(routeRootKey, ih, r)
|
||||
return txn.Set([]byte(rp), []byte(m))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return l.db.Sync()
|
||||
}
|
||||
|
||||
func (l *DB) RemoveFromHash(r, h string) (bool, error) {
|
||||
tx := l.db.NewTransaction(true)
|
||||
defer tx.Discard()
|
||||
|
||||
var mh metainfo.Hash
|
||||
if err := mh.FromHexString(h); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
rp := path.Join(routeRootKey, h, r)
|
||||
if _, err := tx.Get([]byte(rp)); err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err := tx.Delete([]byte(rp)); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, tx.Commit()
|
||||
}
|
||||
|
||||
func (l *DB) ListMagnets() (map[string][]string, error) {
|
||||
tx := l.db.NewTransaction(false)
|
||||
defer tx.Discard()
|
||||
|
||||
it := tx.NewIterator(badger.DefaultIteratorOptions)
|
||||
defer it.Close()
|
||||
|
||||
prefix := []byte(routeRootKey)
|
||||
out := make(map[string][]string)
|
||||
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
|
||||
_, r := path.Split(string(it.Item().Key()))
|
||||
i := it.Item()
|
||||
if err := i.Value(func(v []byte) error {
|
||||
out[r] = append(out[r], string(v))
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (l *DB) ListTorrentPaths() (map[string][]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (l *DB) Close() error {
|
||||
return l.db.Close()
|
||||
}
|
62
torrent/loader/db_test.go
Normal file
62
torrent/loader/db_test.go
Normal file
|
@ -0,0 +1,62 @@
|
|||
package loader
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const m1 = "magnet:?xt=urn:btih:c9e15763f722f23e98a29decdfae341b98d53056"
|
||||
|
||||
func TestDB(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
tmpService, err := os.MkdirTemp("", "service")
|
||||
require.NoError(err)
|
||||
tmpStorage, err := os.MkdirTemp("", "storage")
|
||||
require.NoError(err)
|
||||
|
||||
cs := storage.NewFile(tmpStorage)
|
||||
defer cs.Close()
|
||||
|
||||
s, err := NewDB(tmpService)
|
||||
require.NoError(err)
|
||||
defer s.Close()
|
||||
|
||||
err = s.AddMagnet("route1", "WRONG MAGNET")
|
||||
require.Error(err)
|
||||
|
||||
err = s.AddMagnet("route1", m1)
|
||||
require.NoError(err)
|
||||
|
||||
err = s.AddMagnet("route2", m1)
|
||||
require.NoError(err)
|
||||
|
||||
l, err := s.ListMagnets()
|
||||
require.NoError(err)
|
||||
require.Len(l, 2)
|
||||
require.Len(l["route1"], 1)
|
||||
require.Equal(l["route1"][0], m1)
|
||||
require.Len(l["route2"], 1)
|
||||
require.Equal(l["route2"][0], m1)
|
||||
|
||||
removed, err := s.RemoveFromHash("other", "c9e15763f722f23e98a29decdfae341b98d53056")
|
||||
require.NoError(err)
|
||||
require.False(removed)
|
||||
|
||||
removed, err = s.RemoveFromHash("route1", "c9e15763f722f23e98a29decdfae341b98d53056")
|
||||
require.NoError(err)
|
||||
require.True(removed)
|
||||
|
||||
l, err = s.ListMagnets()
|
||||
require.NoError(err)
|
||||
require.Len(l, 1)
|
||||
require.Len(l["route2"], 1)
|
||||
require.Equal(l["route2"][0], m1)
|
||||
|
||||
require.NoError(s.Close())
|
||||
require.NoError(cs.Close())
|
||||
|
||||
}
|
13
torrent/loader/loader.go
Normal file
13
torrent/loader/loader.go
Normal file
|
@ -0,0 +1,13 @@
|
|||
package loader
|
||||
|
||||
type Loader interface {
|
||||
ListMagnets() (map[string][]string, error)
|
||||
ListTorrentPaths() (map[string][]string, error)
|
||||
}
|
||||
|
||||
type LoaderAdder interface {
|
||||
Loader
|
||||
|
||||
RemoveFromHash(r, h string) (bool, error)
|
||||
AddMagnet(r, m string) error
|
||||
}
|
179
torrent/service.go
Normal file
179
torrent/service.go
Normal file
|
@ -0,0 +1,179 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/distribyted/distribyted/fs"
|
||||
"github.com/distribyted/distribyted/torrent/loader"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
c *torrent.Client
|
||||
|
||||
s *Stats
|
||||
|
||||
mu sync.Mutex
|
||||
fss map[string]fs.Filesystem
|
||||
|
||||
cfgLoader loader.Loader
|
||||
db loader.LoaderAdder
|
||||
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
func NewService(cfg loader.Loader, db loader.LoaderAdder, stats *Stats, c *torrent.Client) *Service {
|
||||
l := log.Logger.With().Str("component", "torrent-service").Logger()
|
||||
return &Service{
|
||||
log: l,
|
||||
s: stats,
|
||||
c: c,
|
||||
fss: make(map[string]fs.Filesystem),
|
||||
cfgLoader: cfg,
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Load() (map[string]fs.Filesystem, error) {
|
||||
// Load from config
|
||||
s.log.Info().Msg("adding torrents from configuration")
|
||||
if err := s.load(s.cfgLoader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Load from DB
|
||||
s.log.Info().Msg("adding torrents from database")
|
||||
return s.fss, s.load(s.db)
|
||||
}
|
||||
|
||||
func (s *Service) load(l loader.Loader) error {
|
||||
list, err := l.ListMagnets()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for r, ms := range list {
|
||||
for _, m := range ms {
|
||||
if err := s.addMagnet(r, m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
list, err = l.ListTorrentPaths()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for r, ms := range list {
|
||||
for _, p := range ms {
|
||||
if err := s.addTorrentPath(r, p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) AddMagnet(r, m string) error {
|
||||
if err := s.addMagnet(r, m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add to db
|
||||
return s.db.AddMagnet(r, m)
|
||||
}
|
||||
|
||||
func (s *Service) addTorrentPath(r, p string) error {
|
||||
// Add to client
|
||||
t, err := s.c.AddTorrentFromFile(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.addTorrent(r, t)
|
||||
}
|
||||
|
||||
func (s *Service) addMagnet(r, m string) error {
|
||||
// Add to client
|
||||
t, err := s.c.AddMagnet(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.addTorrent(r, t)
|
||||
|
||||
}
|
||||
|
||||
func (s *Service) addTorrent(r string, t *torrent.Torrent) error {
|
||||
// only get info if name is not available
|
||||
if t.Info() == nil {
|
||||
s.log.Info().Str("hash", t.InfoHash().String()).Msg("getting torrent info")
|
||||
<-t.GotInfo()
|
||||
}
|
||||
|
||||
// Add to stats
|
||||
s.s.Add(r, t)
|
||||
|
||||
// Add to filesystems
|
||||
folder := path.Join("/", r)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
_, ok := s.fss[folder]
|
||||
if !ok {
|
||||
s.fss[folder] = fs.NewTorrent()
|
||||
}
|
||||
|
||||
tfs, ok := s.fss[folder].(*fs.Torrent)
|
||||
if !ok {
|
||||
return errors.New("error adding torrent to filesystem")
|
||||
}
|
||||
|
||||
tfs.AddTorrent(t)
|
||||
s.log.Info().Str("name", t.Info().Name).Str("route", r).Msg("torrent added")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) RemoveFromHash(r, h string) error {
|
||||
// Remove from db
|
||||
deleted, err := s.db.RemoveFromHash(r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !deleted {
|
||||
return fmt.Errorf("element with hash %v on route %v cannot be removed", h, r)
|
||||
}
|
||||
|
||||
// Remove from stats
|
||||
s.s.Del(r, h)
|
||||
|
||||
// Remove from fs
|
||||
folder := path.Join("/", r)
|
||||
|
||||
tfs, ok := s.fss[folder].(*fs.Torrent)
|
||||
if !ok {
|
||||
return errors.New("error removing torrent from filesystem")
|
||||
}
|
||||
|
||||
tfs.RemoveTorrent(h)
|
||||
|
||||
// Remove from client
|
||||
var mh metainfo.Hash
|
||||
if err := mh.FromHexString(h); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t, ok := s.c.Torrent(metainfo.NewHashFromHex(h))
|
||||
if ok {
|
||||
t.Drop()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
261
torrent/stats.go
Normal file
261
torrent/stats.go
Normal file
|
@ -0,0 +1,261 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
)
|
||||
|
||||
var ErrTorrentNotFound = errors.New("torrent not found")
|
||||
|
||||
type PieceStatus string
|
||||
|
||||
const (
|
||||
Checking PieceStatus = "H"
|
||||
Partial PieceStatus = "P"
|
||||
Complete PieceStatus = "C"
|
||||
Waiting PieceStatus = "W"
|
||||
Error PieceStatus = "?"
|
||||
)
|
||||
|
||||
type PieceChunk struct {
|
||||
Status PieceStatus `json:"status"`
|
||||
NumPieces int `json:"numPieces"`
|
||||
}
|
||||
|
||||
type TorrentStats struct {
|
||||
Name string `json:"name"`
|
||||
Hash string `json:"hash"`
|
||||
DownloadedBytes int64 `json:"downloadedBytes"`
|
||||
UploadedBytes int64 `json:"uploadedBytes"`
|
||||
Peers int `json:"peers"`
|
||||
Seeders int `json:"seeders"`
|
||||
TimePassed float64 `json:"timePassed"`
|
||||
PieceChunks []*PieceChunk `json:"pieceChunks"`
|
||||
TotalPieces int `json:"totalPieces"`
|
||||
PieceSize int64 `json:"pieceSize"`
|
||||
}
|
||||
|
||||
type byName []*TorrentStats
|
||||
|
||||
func (a byName) Len() int { return len(a) }
|
||||
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||
|
||||
type GlobalTorrentStats struct {
|
||||
DownloadedBytes int64 `json:"downloadedBytes"`
|
||||
UploadedBytes int64 `json:"uploadedBytes"`
|
||||
TimePassed float64 `json:"timePassed"`
|
||||
}
|
||||
|
||||
type RouteStats struct {
|
||||
Name string `json:"name"`
|
||||
TorrentStats []*TorrentStats `json:"torrentStats"`
|
||||
}
|
||||
|
||||
type ByName []*RouteStats
|
||||
|
||||
func (a ByName) Len() int { return len(a) }
|
||||
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||
|
||||
type stat struct {
|
||||
totalDownloadBytes int64
|
||||
downloadBytes int64
|
||||
totalUploadBytes int64
|
||||
uploadBytes int64
|
||||
peers int
|
||||
seeders int
|
||||
time time.Time
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
mut sync.Mutex
|
||||
torrents map[string]*torrent.Torrent
|
||||
torrentsByRoute map[string]map[string]*torrent.Torrent
|
||||
previousStats map[string]*stat
|
||||
|
||||
gTime time.Time
|
||||
}
|
||||
|
||||
func NewStats() *Stats {
|
||||
return &Stats{
|
||||
gTime: time.Now(),
|
||||
torrents: make(map[string]*torrent.Torrent),
|
||||
torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
|
||||
previousStats: make(map[string]*stat),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stats) Add(route string, t *torrent.Torrent) {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
|
||||
h := t.InfoHash().String()
|
||||
|
||||
s.torrents[h] = t
|
||||
s.previousStats[h] = &stat{}
|
||||
|
||||
_, ok := s.torrentsByRoute[route]
|
||||
if !ok {
|
||||
s.torrentsByRoute[route] = make(map[string]*torrent.Torrent)
|
||||
}
|
||||
|
||||
s.torrentsByRoute[route][h] = t
|
||||
}
|
||||
|
||||
func (s *Stats) Del(route, hash string) {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
delete(s.torrents, hash)
|
||||
delete(s.previousStats, hash)
|
||||
ts, ok := s.torrentsByRoute[route]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(ts, hash)
|
||||
}
|
||||
|
||||
func (s *Stats) Stats(hash string) (*TorrentStats, error) {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
|
||||
t, ok := s.torrents[hash]
|
||||
if !(ok) {
|
||||
return nil, ErrTorrentNotFound
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
return s.stats(now, t, true), nil
|
||||
}
|
||||
|
||||
func (s *Stats) RoutesStats() []*RouteStats {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
var out []*RouteStats
|
||||
for r, tl := range s.torrentsByRoute {
|
||||
var tStats []*TorrentStats
|
||||
for _, t := range tl {
|
||||
ts := s.stats(now, t, true)
|
||||
tStats = append(tStats, ts)
|
||||
}
|
||||
|
||||
sort.Sort(byName(tStats))
|
||||
|
||||
rs := &RouteStats{
|
||||
Name: r,
|
||||
TorrentStats: tStats,
|
||||
}
|
||||
out = append(out, rs)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *Stats) GlobalStats() *GlobalTorrentStats {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
var totalDownload int64
|
||||
var totalUpload int64
|
||||
for _, torrent := range s.torrents {
|
||||
tStats := s.stats(now, torrent, false)
|
||||
totalDownload += tStats.DownloadedBytes
|
||||
totalUpload += tStats.UploadedBytes
|
||||
}
|
||||
|
||||
timePassed := now.Sub(s.gTime)
|
||||
s.gTime = now
|
||||
|
||||
return &GlobalTorrentStats{
|
||||
DownloadedBytes: totalDownload,
|
||||
UploadedBytes: totalUpload,
|
||||
TimePassed: timePassed.Seconds(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stats) stats(now time.Time, t *torrent.Torrent, chunks bool) *TorrentStats {
|
||||
ts := &TorrentStats{}
|
||||
prev, ok := s.previousStats[t.InfoHash().String()]
|
||||
if !ok {
|
||||
return &TorrentStats{}
|
||||
}
|
||||
if s.returnPreviousMeasurements(now) {
|
||||
ts.DownloadedBytes = prev.downloadBytes
|
||||
ts.UploadedBytes = prev.uploadBytes
|
||||
} else {
|
||||
st := t.Stats()
|
||||
rd := st.BytesReadData.Int64()
|
||||
wd := st.BytesWrittenData.Int64()
|
||||
ist := &stat{
|
||||
downloadBytes: rd - prev.totalDownloadBytes,
|
||||
uploadBytes: wd - prev.totalUploadBytes,
|
||||
totalDownloadBytes: rd,
|
||||
totalUploadBytes: wd,
|
||||
time: now,
|
||||
peers: st.TotalPeers,
|
||||
seeders: st.ConnectedSeeders,
|
||||
}
|
||||
|
||||
ts.DownloadedBytes = ist.downloadBytes
|
||||
ts.UploadedBytes = ist.uploadBytes
|
||||
ts.Peers = ist.peers
|
||||
ts.Seeders = ist.seeders
|
||||
|
||||
s.previousStats[t.InfoHash().String()] = ist
|
||||
}
|
||||
|
||||
ts.TimePassed = now.Sub(prev.time).Seconds()
|
||||
var totalPieces int
|
||||
if chunks {
|
||||
var pch []*PieceChunk
|
||||
for _, psr := range t.PieceStateRuns() {
|
||||
var s PieceStatus
|
||||
switch {
|
||||
case psr.Checking:
|
||||
s = Checking
|
||||
case psr.Partial:
|
||||
s = Partial
|
||||
case psr.Complete:
|
||||
s = Complete
|
||||
case !psr.Ok:
|
||||
s = Error
|
||||
default:
|
||||
s = Waiting
|
||||
}
|
||||
|
||||
pch = append(pch, &PieceChunk{
|
||||
Status: s,
|
||||
NumPieces: psr.Length,
|
||||
})
|
||||
totalPieces += psr.Length
|
||||
}
|
||||
ts.PieceChunks = pch
|
||||
}
|
||||
|
||||
ts.Hash = t.InfoHash().String()
|
||||
ts.Name = t.Name()
|
||||
ts.TotalPieces = totalPieces
|
||||
|
||||
if t.Info() != nil {
|
||||
ts.PieceSize = t.Info().PieceLength
|
||||
}
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
const gap time.Duration = 2 * time.Second
|
||||
|
||||
func (s *Stats) returnPreviousMeasurements(now time.Time) bool {
|
||||
return now.Sub(s.gTime) < gap
|
||||
}
|
92
torrent/store.go
Normal file
92
torrent/store.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/dht/v2/bep44"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
dlog "github.com/distribyted/distribyted/log"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var _ bep44.Store = &FileItemStore{}
|
||||
|
||||
type FileItemStore struct {
|
||||
ttl time.Duration
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func NewFileItemStore(path string, itemsTTL time.Duration) (*FileItemStore, error) {
|
||||
l := log.Logger.With().Str("component", "item-store").Logger()
|
||||
db, err := badger.Open(badger.DefaultOptions(path).WithLogger(&dlog.Badger{L: l}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = db.RunValueLogGC(0.5)
|
||||
if err != nil && err != badger.ErrNoRewrite {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &FileItemStore{
|
||||
db: db,
|
||||
ttl: itemsTTL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fis *FileItemStore) Put(i *bep44.Item) error {
|
||||
tx := fis.db.NewTransaction(true)
|
||||
defer tx.Discard()
|
||||
|
||||
key := i.Target()
|
||||
var value bytes.Buffer
|
||||
|
||||
enc := gob.NewEncoder(&value)
|
||||
if err := enc.Encode(i); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e := badger.NewEntry(key[:], value.Bytes()).WithTTL(fis.ttl)
|
||||
if err := tx.SetEntry(e); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (fis *FileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
|
||||
tx := fis.db.NewTransaction(false)
|
||||
defer tx.Discard()
|
||||
|
||||
dbi, err := tx.Get(t[:])
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return nil, bep44.ErrItemNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
valb, err := dbi.ValueCopy(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(valb)
|
||||
dec := gob.NewDecoder(buf)
|
||||
var i *bep44.Item
|
||||
if err := dec.Decode(&i); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (fis *FileItemStore) Del(t bep44.Target) error {
|
||||
// ignore this
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fis *FileItemStore) Close() error {
|
||||
return fis.db.Close()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue