This commit is contained in:
royalcat 2024-02-23 01:54:56 +03:00
parent b97dcc8d8f
commit 35913e0190
12 changed files with 236 additions and 30 deletions

6
go.mod
View file

@ -5,12 +5,13 @@ go 1.21
require ( require (
github.com/99designs/gqlgen v0.17.43 github.com/99designs/gqlgen v0.17.43
github.com/anacrolix/dht/v2 v2.21.0 github.com/anacrolix/dht/v2 v2.21.0
github.com/anacrolix/log v0.14.5 github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4
github.com/anacrolix/missinggo/v2 v2.7.3 github.com/anacrolix/missinggo/v2 v2.7.3
github.com/anacrolix/torrent v1.53.2 github.com/anacrolix/torrent v1.54.0
github.com/billziss-gh/cgofuse v1.5.0 github.com/billziss-gh/cgofuse v1.5.0
github.com/bodgit/sevenzip v1.4.5 github.com/bodgit/sevenzip v1.4.5
github.com/dgraph-io/badger/v4 v4.2.0 github.com/dgraph-io/badger/v4 v4.2.0
github.com/dgraph-io/ristretto v0.1.1
github.com/gin-contrib/pprof v1.4.0 github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
github.com/go-git/go-billy/v5 v5.5.0 github.com/go-git/go-billy/v5 v5.5.0
@ -64,7 +65,6 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/badger v1.6.0 // indirect github.com/dgraph-io/badger v1.6.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect

8
go.sum
View file

@ -64,8 +64,8 @@ github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgw
github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
github.com/anacrolix/log v0.10.1-0.20220123034749-3920702c17f8/go.mod h1:GmnE2c0nvz8pOIPUSC9Rawgefy1sDXqposC2wgtBZE4= github.com/anacrolix/log v0.10.1-0.20220123034749-3920702c17f8/go.mod h1:GmnE2c0nvz8pOIPUSC9Rawgefy1sDXqposC2wgtBZE4=
github.com/anacrolix/log v0.13.1/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68= github.com/anacrolix/log v0.13.1/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68=
github.com/anacrolix/log v0.14.5 h1:OkMjBquVSRb742LkecSGDGaGpNoSrw4syRIm0eRdmrg= github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4 h1:CdVK9IoqoqklXQQ4+L2aew64xsz14KdOD+rnKdTQajg=
github.com/anacrolix/log v0.14.5/go.mod h1:1OmJESOtxQGNMlUO5rcv96Vpp9mfMqXXbe2RdinFLdY= github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4/go.mod h1:1OmJESOtxQGNMlUO5rcv96Vpp9mfMqXXbe2RdinFLdY=
github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM=
github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM=
github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s= github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s=
@ -96,8 +96,8 @@ github.com/anacrolix/sync v0.5.1/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DC
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
github.com/anacrolix/torrent v1.53.2 h1:dW+ficSC8sJaGrUvZJizORPBLTP7XR8idl2oGlrUutQ= github.com/anacrolix/torrent v1.54.0 h1:sl+2J1pHjJWq6+5G861+Yc74k2XTc/m8ijaMQR/8+2k=
github.com/anacrolix/torrent v1.53.2/go.mod h1:d1NANCFAd9/nv9vmHnYUobLdyBSAoFYohojHjGmcAsw= github.com/anacrolix/torrent v1.54.0/go.mod h1:is8GNob5qDeZ5Kq+pKPiE2xqYUi1ms7IgSB+CftZETk=
github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 h1:QAVZ3pN/J4/UziniAhJR2OZ9Ox5kOY2053tBbbqUPYA= github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 h1:QAVZ3pN/J4/UziniAhJR2OZ9Ox5kOY2053tBbbqUPYA=
github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96/go.mod h1:Wa6n8cYIdaG35x15aH3Zy6d03f7P728QfdcDeD/IEOs= github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96/go.mod h1:Wa6n8cYIdaG35x15aH3Zy6d03f7P728QfdcDeD/IEOs=
github.com/anacrolix/utp v0.1.0 h1:FOpQOmIwYsnENnz7tAGohA+r6iXpRjrq8ssKSre2Cp4= github.com/anacrolix/utp v0.1.0 h1:FOpQOmIwYsnENnz7tAGohA+r6iXpRjrq8ssKSre2Cp4=

View file

@ -2,7 +2,7 @@ package model
import "slices" import "slices"
func (f *IntFilter) IsValid(v int64) bool { func (f *IntFilter) Include(v int64) bool {
if f.Eq != nil { if f.Eq != nil {
return v == *f.Eq return v == *f.Eq
} else if f.Gt != nil { } else if f.Gt != nil {

View file

@ -23,17 +23,17 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt
if filter != nil { if filter != nil {
if filter.BytesCompleted != nil { if filter.BytesCompleted != nil {
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool { filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
return filter.BytesCompleted.IsValid(torrent.BytesCompleted) return filter.BytesCompleted.Include(torrent.BytesCompleted)
}) })
} }
if filter.BytesMissing != nil { if filter.BytesMissing != nil {
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool { filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
return filter.BytesMissing.IsValid(torrent.BytesMissing) return filter.BytesMissing.Include(torrent.BytesMissing)
}) })
} }
if filter.PeersCount != nil { if filter.PeersCount != nil {
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool { filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
return filter.PeersCount.IsValid( return filter.PeersCount.Include(
int64(len(torrent.T.Torrent().PeerConns())), int64(len(torrent.T.Torrent().PeerConns())),
) )
}) })

View file

@ -55,13 +55,16 @@ func (r *torrentResolver) ExcludedFiles(ctx context.Context, obj *model.Torrent)
func (r *torrentResolver) Peers(ctx context.Context, obj *model.Torrent) ([]*model.TorrentPeer, error) { func (r *torrentResolver) Peers(ctx context.Context, obj *model.Torrent) ([]*model.TorrentPeer, error) {
peers := []*model.TorrentPeer{} peers := []*model.TorrentPeer{}
for _, peer := range obj.T.Torrent().PeerConns() { for _, peer := range obj.T.Torrent().PeerConns() {
clientName, _ := peer.PeerClientName.Load().(string)
peers = append(peers, &model.TorrentPeer{ peers = append(peers, &model.TorrentPeer{
IP: peer.RemoteAddr.String(), IP: peer.RemoteAddr.String(),
DownloadRate: peer.DownloadRate(), DownloadRate: peer.DownloadRate(),
Discovery: model.MapPeerSource(peer.Discovery),
Port: int64(peer.PeerListenPort), Discovery: model.MapPeerSource(peer.Discovery),
ClientName: peer.PeerClientName.Load().(string), Port: int64(peer.PeerListenPort),
F: peer, ClientName: clientName,
F: peer,
}) })
} }
return peers, nil return peers, nil

View file

@ -28,7 +28,11 @@ func (s *Torrent) Torrent() *torrent.Torrent {
func (s *Torrent) Name() string { func (s *Torrent) Name() string {
<-s.t.GotInfo() <-s.t.GotInfo()
return s.t.Name() if name := s.t.Name(); name != "" {
return name
}
return s.InfoHash()
} }
func (s *Torrent) InfoHash() string { func (s *Torrent) InfoHash() string {
@ -68,9 +72,22 @@ func (s *Torrent) Files() ([]*torrent.File, error) {
return true return true
}) })
for _, tf := range files {
s.isFileComplete(tf.BeginPieceIndex(), tf.EndPieceIndex())
}
return files, nil return files, nil
} }
func (s *Torrent) isFileComplete(startIndex int, endIndex int) bool {
for i := startIndex; i < endIndex; i++ {
if !s.t.Piece(i).State().Complete {
return false
}
}
return true
}
func (s *Torrent) ExcludedFiles() ([]*torrent.File, error) { func (s *Torrent) ExcludedFiles() ([]*torrent.File, error) {
excludedFiles, err := s.rep.ExcludedFiles(s.t.InfoHash()) excludedFiles, err := s.rep.ExcludedFiles(s.t.InfoHash())
if err != nil { if err != nil {

View file

@ -47,7 +47,12 @@ func (me *FileStorage) Close() error {
} }
func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string { func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string {
return filepath.Join(baseDir, info.Name) dirName := info.Name
if dirName == "" {
dirName = infoHash.HexString()
}
return filepath.Join(baseDir, dirName)
} }
func filePath(opts storage.FilePathMakerOpts) string { func filePath(opts storage.FilePathMakerOpts) string {
@ -120,6 +125,12 @@ func (fs *FileStorage) CleanupDirs(ctx context.Context, expected []*controller.T
return len(toDelete), nil return len(toDelete), nil
} }
// func (fs *FileStorage) IsCompatable(ctx context.Context, addition *controller.Torrent, dryRun bool) (bool, error) {
// log := fs.log.With("function", "IsCompatable", "addition", addition.Name())
// ifp
// }
func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) { func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) {
log := fs.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun) log := fs.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun)

View file

@ -6,6 +6,7 @@ import (
"log/slog" "log/slog"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"strings" "strings"
"time" "time"
@ -102,8 +103,14 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
infoBytes = nil infoBytes = nil
} else { } else {
for _, t := range s.c.Torrents() { for _, t := range s.c.Torrents() {
if t.Name() == info.BestName() { if t.Name() == info.BestName() && t.InfoHash() != spec.InfoHash {
return nil, fmt.Errorf("torrent with name '%s' already exists", t.Name()) <-t.GotInfo()
if !isTorrentCompatable(*t.Info(), info) {
return nil, fmt.Errorf(
"torrent with name '%s' not compatable existing infohash: %s, new: %s",
t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
)
}
} }
} }
} }
@ -115,6 +122,8 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
ChunkSize: spec.ChunkSize, ChunkSize: spec.ChunkSize,
}) })
t.AllowDataDownload() t.AllowDataDownload()
t.AllowDataUpload()
t.DownloadAll()
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -134,6 +143,34 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
return t, nil return t, nil
} }
func isTorrentCompatable(existingInfo, newInfo metainfo.Info) bool {
existingFiles := slices.Clone(existingInfo.Files)
newFiles := slices.Clone(newInfo.Files)
pathCmp := func(a, b metainfo.FileInfo) int {
return slices.Compare(a.BestPath(), b.BestPath())
}
slices.SortStableFunc(existingFiles, pathCmp)
slices.SortStableFunc(newFiles, pathCmp)
// torrents basically equals
if slices.EqualFunc(existingFiles, newFiles, func(fi1, fi2 metainfo.FileInfo) bool {
return fi1.Length == fi2.Length && slices.Equal(fi1.BestPath(), fi1.BestPath())
}) {
return true
}
if len(newFiles) > len(existingFiles) {
all := append(existingFiles, newFiles...)
slices.SortStableFunc(all, pathCmp)
slices.CompactFunc(all, func(fi1, fi2 metainfo.FileInfo) bool {
return slices.Equal(fi1.BestPath(), fi2.BestPath()) && fi1.Length == fi2.Length
})
}
return false
}
func isValidInfoHashBytes(d []byte) bool { func isValidInfoHashBytes(d []byte) bool {
var info metainfo.Info var info metainfo.Info
err := bencode.Unmarshal(d, &info) err := bencode.Unmarshal(d, &info)

View file

@ -20,9 +20,11 @@ func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
torrentCfg := torrent.NewDefaultClientConfig() torrentCfg := torrent.NewDefaultClientConfig()
torrentCfg.PeerID = string(id[:]) torrentCfg.PeerID = string(id[:])
torrentCfg.DefaultStorage = st torrentCfg.DefaultStorage = st
// torrentCfg.AlwaysWantConns = true torrentCfg.AlwaysWantConns = true
// torrentCfg.DisableAggressiveUpload = true torrentCfg.AcceptPeerConnections = true
// torrentCfg.Seed = true torrentCfg.DisableAggressiveUpload = false
torrentCfg.Seed = true
// torrentCfg.DownloadRateLimiter = rate.NewLimiter(rate.Inf, 0) // torrentCfg.DownloadRateLimiter = rate.NewLimiter(rate.Inf, 0)
// torrentCfg // torrentCfg
@ -30,13 +32,28 @@ func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
tl.SetHandlers(&dlog.Torrent{L: l}) tl.SetHandlers(&dlog.Torrent{L: l})
torrentCfg.Logger = tl torrentCfg.Logger = tl
torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) { torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) {
l.Debug("new peer", "ip", p.RemoteAddr.String()) l := l.With("ip", p.RemoteAddr.String())
if p.Torrent() != nil {
l = l.With("torrent", p.Torrent().Name())
}
l.Debug("new peer")
}) })
torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) { torrentCfg.Callbacks.PeerClosed = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) {
l.Debug("peer closed", "ip", p.RemoteAddr.String()) l := l.With("ip", p.RemoteAddr.String())
if p.Torrent() != nil {
l = l.With("torrent", p.Torrent().Name())
}
l.Debug("peer closed")
}) })
// torrentCfg.Callbacks.PeerConnClosed = append(torrentCfg.Callbacks.PeerConnClosed, func(c *torrent.PeerConn) {
// l.Debug("peer closed", "ip", c.RemoteAddr.String())
// })
// torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) { // torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
// cfg.Store = fis // cfg.Store = fis
// cfg.Exp = 2 * time.Hour // cfg.Exp = 2 * time.Hour

View file

@ -56,16 +56,27 @@ func (k *InfoBytes) Get(ih infohash.T) (*metainfo.MetaInfo, error) {
return metainfo.Load(bytes.NewReader(data)) return metainfo.Load(bytes.NewReader(data))
} }
func (me *InfoBytes) SetBytes(ih infohash.T, bytes []byte) error { func (me *InfoBytes) SetBytes(ih infohash.T, data []byte) error {
return me.db.Update(func(txn *badger.Txn) error { return me.db.Update(func(txn *badger.Txn) error {
return txn.Set(ih.Bytes(), bytes) item, err := txn.Get(ih.Bytes())
if err != nil {
if err == badger.ErrKeyNotFound {
return txn.Set(ih.Bytes(), data)
}
return err
}
return item.Value(func(val []byte) error {
if !bytes.Equal(val, data) {
return txn.Set(ih.Bytes(), data)
}
return nil
})
}) })
} }
func (me *InfoBytes) Set(ih infohash.T, info metainfo.MetaInfo) error { func (me *InfoBytes) Set(ih infohash.T, info metainfo.MetaInfo) error {
return me.db.Update(func(txn *badger.Txn) error { return me.SetBytes(ih, info.InfoBytes)
return txn.Set(ih.Bytes(), info.InfoBytes)
})
} }
func (k *InfoBytes) Delete(ih infohash.T) error { func (k *InfoBytes) Delete(ih infohash.T) error {

106
src/host/store/stats.go Normal file
View file

@ -0,0 +1,106 @@
package store
import (
"context"
"encoding/json"
"path"
"time"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/ristretto/z"
)
func NewStatsHistory(metaDir string, lifetime time.Duration) (*StatsHistory, error) {
db, err := badger.OpenManaged(
badger.
DefaultOptions(path.Join(metaDir, "stats-history")).
WithNumVersionsToKeep(int(^uint(0) >> 1)), // Infinity
)
if err != nil {
return nil, err
}
go func() {
for n := range time.NewTimer(lifetime / 2).C {
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
}
}()
r := &StatsHistory{
db: db,
}
return r, nil
}
type StatsHistory struct {
db *badger.DB
}
type TorrentStat struct {
Name string `json:"name"`
Hash string `json:"hash"`
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
Peers int `json:"peers"`
Seeders int `json:"seeders"`
PieceChunks []*PieceChunk `json:"pieceChunks"`
TotalPieces int `json:"totalPieces"`
PieceSize int64 `json:"pieceSize"`
}
type PieceChunk struct {
Status PieceStatus `json:"status"`
NumPieces int `json:"numPieces"`
}
type PieceStatus string
const (
Checking PieceStatus = "H"
Partial PieceStatus = "P"
Complete PieceStatus = "C"
Waiting PieceStatus = "W"
Error PieceStatus = "?"
)
func (r *StatsHistory) AddStat(ih infohash.T, stat TorrentStat) error {
data, err := json.Marshal(stat)
if err != nil {
return err
}
return r.db.Update(func(txn *badger.Txn) error {
return txn.Set(ih.Bytes(), data)
})
}
func (r *StatsHistory) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) {
var stats GlobalTorrentStats
stream := r.db.NewStream()
stream.SinceTs = uint64(since.Unix())
var tstat TorrentStat
stream.Send = func(buf *z.Buffer) error {
err := json.Unmarshal(buf.Bytes(), &tstat)
if err != nil {
return err
}
stats.DownloadedBytes += tstat.DownloadedBytes
stats.UploadedBytes += tstat.UploadedBytes
return nil
}
err := stream.Orchestrate(ctx)
if err != nil {
return stats, err
}
return stats, nil
}
type GlobalTorrentStats struct {
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
}

View file

@ -53,6 +53,10 @@ func (fs *TorrentFs) files() (map[string]File, error) {
fs.filesCache = make(map[string]File) fs.filesCache = make(map[string]File)
for _, file := range files { for _, file := range files {
if file.BytesCompleted() == 0 {
continue
}
p := AbsPath(file.Path()) p := AbsPath(file.Path())
fs.filesCache[p] = &torrentFile{ fs.filesCache[p] = &torrentFile{