From 35913e019007f75e4a84d4580cf3f3909f27203a Mon Sep 17 00:00:00 2001 From: royalcat Date: Fri, 23 Feb 2024 01:54:56 +0300 Subject: [PATCH] WIP --- go.mod | 6 +- go.sum | 8 +- src/delivery/graphql/model/filter.go | 2 +- .../graphql/resolver/query.resolvers.go | 6 +- .../graphql/resolver/torrent.resolvers.go | 11 +- src/host/controller/torrent.go | 19 +++- .../{storage_files.go => storage.go} | 13 ++- src/host/service/service.go | 41 ++++++- src/host/store/client.go | 29 ++++- src/host/store/info.go | 21 +++- src/host/store/stats.go | 106 ++++++++++++++++++ src/host/vfs/torrent.go | 4 + 12 files changed, 236 insertions(+), 30 deletions(-) rename src/host/filestorage/{storage_files.go => storage.go} (93%) create mode 100644 src/host/store/stats.go diff --git a/go.mod b/go.mod index 8477be3..eb52aa8 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,13 @@ go 1.21 require ( github.com/99designs/gqlgen v0.17.43 github.com/anacrolix/dht/v2 v2.21.0 - github.com/anacrolix/log v0.14.5 + github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4 github.com/anacrolix/missinggo/v2 v2.7.3 - github.com/anacrolix/torrent v1.53.2 + github.com/anacrolix/torrent v1.54.0 github.com/billziss-gh/cgofuse v1.5.0 github.com/bodgit/sevenzip v1.4.5 github.com/dgraph-io/badger/v4 v4.2.0 + github.com/dgraph-io/ristretto v0.1.1 github.com/gin-contrib/pprof v1.4.0 github.com/gin-gonic/gin v1.9.1 github.com/go-git/go-billy/v5 v5.5.0 @@ -64,7 +65,6 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgraph-io/badger v1.6.0 // indirect - github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect diff --git a/go.sum b/go.sum index acc9f71..9475909 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,8 @@ github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgw github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= github.com/anacrolix/log v0.10.1-0.20220123034749-3920702c17f8/go.mod h1:GmnE2c0nvz8pOIPUSC9Rawgefy1sDXqposC2wgtBZE4= github.com/anacrolix/log v0.13.1/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68= -github.com/anacrolix/log v0.14.5 h1:OkMjBquVSRb742LkecSGDGaGpNoSrw4syRIm0eRdmrg= -github.com/anacrolix/log v0.14.5/go.mod h1:1OmJESOtxQGNMlUO5rcv96Vpp9mfMqXXbe2RdinFLdY= +github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4 h1:CdVK9IoqoqklXQQ4+L2aew64xsz14KdOD+rnKdTQajg= +github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4/go.mod h1:1OmJESOtxQGNMlUO5rcv96Vpp9mfMqXXbe2RdinFLdY= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM= github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s= @@ -96,8 +96,8 @@ github.com/anacrolix/sync v0.5.1/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DC github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= -github.com/anacrolix/torrent v1.53.2 h1:dW+ficSC8sJaGrUvZJizORPBLTP7XR8idl2oGlrUutQ= -github.com/anacrolix/torrent v1.53.2/go.mod h1:d1NANCFAd9/nv9vmHnYUobLdyBSAoFYohojHjGmcAsw= +github.com/anacrolix/torrent v1.54.0 h1:sl+2J1pHjJWq6+5G861+Yc74k2XTc/m8ijaMQR/8+2k= +github.com/anacrolix/torrent v1.54.0/go.mod h1:is8GNob5qDeZ5Kq+pKPiE2xqYUi1ms7IgSB+CftZETk= github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 h1:QAVZ3pN/J4/UziniAhJR2OZ9Ox5kOY2053tBbbqUPYA= github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96/go.mod h1:Wa6n8cYIdaG35x15aH3Zy6d03f7P728QfdcDeD/IEOs= github.com/anacrolix/utp v0.1.0 h1:FOpQOmIwYsnENnz7tAGohA+r6iXpRjrq8ssKSre2Cp4= diff --git a/src/delivery/graphql/model/filter.go b/src/delivery/graphql/model/filter.go index 911216b..0c96e46 100644 --- a/src/delivery/graphql/model/filter.go +++ b/src/delivery/graphql/model/filter.go @@ -2,7 +2,7 @@ package model import "slices" -func (f *IntFilter) IsValid(v int64) bool { +func (f *IntFilter) Include(v int64) bool { if f.Eq != nil { return v == *f.Eq } else if f.Gt != nil { diff --git a/src/delivery/graphql/resolver/query.resolvers.go b/src/delivery/graphql/resolver/query.resolvers.go index f014380..9aecef7 100644 --- a/src/delivery/graphql/resolver/query.resolvers.go +++ b/src/delivery/graphql/resolver/query.resolvers.go @@ -23,17 +23,17 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt if filter != nil { if filter.BytesCompleted != nil { filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool { - return filter.BytesCompleted.IsValid(torrent.BytesCompleted) + return filter.BytesCompleted.Include(torrent.BytesCompleted) }) } if filter.BytesMissing != nil { filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool { - return filter.BytesMissing.IsValid(torrent.BytesMissing) + return filter.BytesMissing.Include(torrent.BytesMissing) }) } if filter.PeersCount != nil { filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool { - return filter.PeersCount.IsValid( + return filter.PeersCount.Include( int64(len(torrent.T.Torrent().PeerConns())), ) }) diff --git a/src/delivery/graphql/resolver/torrent.resolvers.go b/src/delivery/graphql/resolver/torrent.resolvers.go index dfc9fd0..778fbf6 100644 --- a/src/delivery/graphql/resolver/torrent.resolvers.go +++ b/src/delivery/graphql/resolver/torrent.resolvers.go @@ -55,13 +55,16 @@ func (r *torrentResolver) ExcludedFiles(ctx context.Context, obj *model.Torrent) func (r *torrentResolver) Peers(ctx context.Context, obj *model.Torrent) ([]*model.TorrentPeer, error) { peers := []*model.TorrentPeer{} for _, peer := range obj.T.Torrent().PeerConns() { + clientName, _ := peer.PeerClientName.Load().(string) + peers = append(peers, &model.TorrentPeer{ IP: peer.RemoteAddr.String(), DownloadRate: peer.DownloadRate(), - Discovery: model.MapPeerSource(peer.Discovery), - Port: int64(peer.PeerListenPort), - ClientName: peer.PeerClientName.Load().(string), - F: peer, + + Discovery: model.MapPeerSource(peer.Discovery), + Port: int64(peer.PeerListenPort), + ClientName: clientName, + F: peer, }) } return peers, nil diff --git a/src/host/controller/torrent.go b/src/host/controller/torrent.go index 3a04b41..911a4d3 100644 --- a/src/host/controller/torrent.go +++ b/src/host/controller/torrent.go @@ -28,7 +28,11 @@ func (s *Torrent) Torrent() *torrent.Torrent { func (s *Torrent) Name() string { <-s.t.GotInfo() - return s.t.Name() + if name := s.t.Name(); name != "" { + return name + } + + return s.InfoHash() } func (s *Torrent) InfoHash() string { @@ -68,9 +72,22 @@ func (s *Torrent) Files() ([]*torrent.File, error) { return true }) + for _, tf := range files { + s.isFileComplete(tf.BeginPieceIndex(), tf.EndPieceIndex()) + } + return files, nil } +func (s *Torrent) isFileComplete(startIndex int, endIndex int) bool { + for i := startIndex; i < endIndex; i++ { + if !s.t.Piece(i).State().Complete { + return false + } + } + return true +} + func (s *Torrent) ExcludedFiles() ([]*torrent.File, error) { excludedFiles, err := s.rep.ExcludedFiles(s.t.InfoHash()) if err != nil { diff --git a/src/host/filestorage/storage_files.go b/src/host/filestorage/storage.go similarity index 93% rename from src/host/filestorage/storage_files.go rename to src/host/filestorage/storage.go index 8b3fa8c..cc933e2 100644 --- a/src/host/filestorage/storage_files.go +++ b/src/host/filestorage/storage.go @@ -47,7 +47,12 @@ func (me *FileStorage) Close() error { } func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string { - return filepath.Join(baseDir, info.Name) + dirName := info.Name + if dirName == "" { + dirName = infoHash.HexString() + } + + return filepath.Join(baseDir, dirName) } func filePath(opts storage.FilePathMakerOpts) string { @@ -120,6 +125,12 @@ func (fs *FileStorage) CleanupDirs(ctx context.Context, expected []*controller.T return len(toDelete), nil } +// func (fs *FileStorage) IsCompatable(ctx context.Context, addition *controller.Torrent, dryRun bool) (bool, error) { +// log := fs.log.With("function", "IsCompatable", "addition", addition.Name()) + +// ifp +// } + func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) { log := fs.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun) diff --git a/src/host/service/service.go b/src/host/service/service.go index d99cb1a..718e30e 100644 --- a/src/host/service/service.go +++ b/src/host/service/service.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "path/filepath" + "slices" "strings" "time" @@ -102,8 +103,14 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, infoBytes = nil } else { for _, t := range s.c.Torrents() { - if t.Name() == info.BestName() { - return nil, fmt.Errorf("torrent with name '%s' already exists", t.Name()) + if t.Name() == info.BestName() && t.InfoHash() != spec.InfoHash { + <-t.GotInfo() + if !isTorrentCompatable(*t.Info(), info) { + return nil, fmt.Errorf( + "torrent with name '%s' not compatable existing infohash: %s, new: %s", + t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(), + ) + } } } } @@ -115,6 +122,8 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, ChunkSize: spec.ChunkSize, }) t.AllowDataDownload() + t.AllowDataUpload() + t.DownloadAll() select { case <-ctx.Done(): @@ -134,6 +143,34 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, return t, nil } +func isTorrentCompatable(existingInfo, newInfo metainfo.Info) bool { + existingFiles := slices.Clone(existingInfo.Files) + newFiles := slices.Clone(newInfo.Files) + + pathCmp := func(a, b metainfo.FileInfo) int { + return slices.Compare(a.BestPath(), b.BestPath()) + } + slices.SortStableFunc(existingFiles, pathCmp) + slices.SortStableFunc(newFiles, pathCmp) + + // torrents basically equals + if slices.EqualFunc(existingFiles, newFiles, func(fi1, fi2 metainfo.FileInfo) bool { + return fi1.Length == fi2.Length && slices.Equal(fi1.BestPath(), fi1.BestPath()) + }) { + return true + } + + if len(newFiles) > len(existingFiles) { + all := append(existingFiles, newFiles...) + slices.SortStableFunc(all, pathCmp) + slices.CompactFunc(all, func(fi1, fi2 metainfo.FileInfo) bool { + return slices.Equal(fi1.BestPath(), fi2.BestPath()) && fi1.Length == fi2.Length + }) + } + + return false +} + func isValidInfoHashBytes(d []byte) bool { var info metainfo.Info err := bencode.Unmarshal(d, &info) diff --git a/src/host/store/client.go b/src/host/store/client.go index 8275424..904251d 100644 --- a/src/host/store/client.go +++ b/src/host/store/client.go @@ -20,9 +20,11 @@ func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient torrentCfg := torrent.NewDefaultClientConfig() torrentCfg.PeerID = string(id[:]) torrentCfg.DefaultStorage = st - // torrentCfg.AlwaysWantConns = true - // torrentCfg.DisableAggressiveUpload = true - // torrentCfg.Seed = true + torrentCfg.AlwaysWantConns = true + torrentCfg.AcceptPeerConnections = true + torrentCfg.DisableAggressiveUpload = false + + torrentCfg.Seed = true // torrentCfg.DownloadRateLimiter = rate.NewLimiter(rate.Inf, 0) // torrentCfg @@ -30,13 +32,28 @@ func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient tl.SetHandlers(&dlog.Torrent{L: l}) torrentCfg.Logger = tl torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) { - l.Debug("new peer", "ip", p.RemoteAddr.String()) + l := l.With("ip", p.RemoteAddr.String()) + if p.Torrent() != nil { + l = l.With("torrent", p.Torrent().Name()) + } + + l.Debug("new peer") + }) - torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) { - l.Debug("peer closed", "ip", p.RemoteAddr.String()) + torrentCfg.Callbacks.PeerClosed = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) { + l := l.With("ip", p.RemoteAddr.String()) + if p.Torrent() != nil { + l = l.With("torrent", p.Torrent().Name()) + } + + l.Debug("peer closed") }) + // torrentCfg.Callbacks.PeerConnClosed = append(torrentCfg.Callbacks.PeerConnClosed, func(c *torrent.PeerConn) { + // l.Debug("peer closed", "ip", c.RemoteAddr.String()) + // }) + // torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) { // cfg.Store = fis // cfg.Exp = 2 * time.Hour diff --git a/src/host/store/info.go b/src/host/store/info.go index 44f5880..205a60c 100644 --- a/src/host/store/info.go +++ b/src/host/store/info.go @@ -56,16 +56,27 @@ func (k *InfoBytes) Get(ih infohash.T) (*metainfo.MetaInfo, error) { return metainfo.Load(bytes.NewReader(data)) } -func (me *InfoBytes) SetBytes(ih infohash.T, bytes []byte) error { +func (me *InfoBytes) SetBytes(ih infohash.T, data []byte) error { return me.db.Update(func(txn *badger.Txn) error { - return txn.Set(ih.Bytes(), bytes) + item, err := txn.Get(ih.Bytes()) + if err != nil { + if err == badger.ErrKeyNotFound { + return txn.Set(ih.Bytes(), data) + } + return err + } + + return item.Value(func(val []byte) error { + if !bytes.Equal(val, data) { + return txn.Set(ih.Bytes(), data) + } + return nil + }) }) } func (me *InfoBytes) Set(ih infohash.T, info metainfo.MetaInfo) error { - return me.db.Update(func(txn *badger.Txn) error { - return txn.Set(ih.Bytes(), info.InfoBytes) - }) + return me.SetBytes(ih, info.InfoBytes) } func (k *InfoBytes) Delete(ih infohash.T) error { diff --git a/src/host/store/stats.go b/src/host/store/stats.go new file mode 100644 index 0000000..0986566 --- /dev/null +++ b/src/host/store/stats.go @@ -0,0 +1,106 @@ +package store + +import ( + "context" + "encoding/json" + "path" + "time" + + "github.com/anacrolix/torrent/types/infohash" + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/ristretto/z" +) + +func NewStatsHistory(metaDir string, lifetime time.Duration) (*StatsHistory, error) { + db, err := badger.OpenManaged( + badger. + DefaultOptions(path.Join(metaDir, "stats-history")). + WithNumVersionsToKeep(int(^uint(0) >> 1)), // Infinity + ) + if err != nil { + return nil, err + } + + go func() { + for n := range time.NewTimer(lifetime / 2).C { + db.SetDiscardTs(uint64(n.Add(-lifetime).Unix())) + } + }() + r := &StatsHistory{ + db: db, + } + + return r, nil +} + +type StatsHistory struct { + db *badger.DB +} + +type TorrentStat struct { + Name string `json:"name"` + Hash string `json:"hash"` + DownloadedBytes int64 `json:"downloadedBytes"` + UploadedBytes int64 `json:"uploadedBytes"` + Peers int `json:"peers"` + Seeders int `json:"seeders"` + PieceChunks []*PieceChunk `json:"pieceChunks"` + TotalPieces int `json:"totalPieces"` + PieceSize int64 `json:"pieceSize"` +} + +type PieceChunk struct { + Status PieceStatus `json:"status"` + NumPieces int `json:"numPieces"` +} + +type PieceStatus string + +const ( + Checking PieceStatus = "H" + Partial PieceStatus = "P" + Complete PieceStatus = "C" + Waiting PieceStatus = "W" + Error PieceStatus = "?" +) + +func (r *StatsHistory) AddStat(ih infohash.T, stat TorrentStat) error { + data, err := json.Marshal(stat) + if err != nil { + return err + } + + return r.db.Update(func(txn *badger.Txn) error { + return txn.Set(ih.Bytes(), data) + }) +} + +func (r *StatsHistory) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) { + var stats GlobalTorrentStats + stream := r.db.NewStream() + stream.SinceTs = uint64(since.Unix()) + + var tstat TorrentStat + stream.Send = func(buf *z.Buffer) error { + err := json.Unmarshal(buf.Bytes(), &tstat) + if err != nil { + return err + } + + stats.DownloadedBytes += tstat.DownloadedBytes + stats.UploadedBytes += tstat.UploadedBytes + + return nil + } + + err := stream.Orchestrate(ctx) + if err != nil { + return stats, err + } + return stats, nil +} + +type GlobalTorrentStats struct { + DownloadedBytes int64 `json:"downloadedBytes"` + UploadedBytes int64 `json:"uploadedBytes"` +} diff --git a/src/host/vfs/torrent.go b/src/host/vfs/torrent.go index d793626..6d3069e 100644 --- a/src/host/vfs/torrent.go +++ b/src/host/vfs/torrent.go @@ -53,6 +53,10 @@ func (fs *TorrentFs) files() (map[string]File, error) { fs.filesCache = make(map[string]File) for _, file := range files { + if file.BytesCompleted() == 0 { + continue + } + p := AbsPath(file.Path()) fs.filesCache[p] = &torrentFile{