torrent fix

This commit is contained in:
royalcat 2024-08-15 11:23:44 +03:00
parent 57ada71d36
commit e517332a65
26 changed files with 2240 additions and 850 deletions

View file

@ -9,13 +9,13 @@ import (
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/missinggo/v2/filecache"
echopprof "github.com/labstack/echo-contrib/pprof"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func New(fc *filecache.Cache, s *torrent.Daemon, vfs vfs.Filesystem, logPath string, cfg *config.Settings) error {
func Run(s *torrent.Daemon, vfs vfs.Filesystem, logPath string, cfg *config.Settings) error {
log := slog.With()
r := echo.New()
@ -29,12 +29,11 @@ func New(fc *filecache.Cache, s *torrent.Daemon, vfs vfs.Filesystem, logPath str
echopprof.Register(r)
r.Any("/graphql", echo.WrapHandler((GraphQLHandler(s, vfs))))
r.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
log.Info("starting webserver", "host", fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port))
go r.Start((fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port)))
return nil
return r.Start((fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port)))
}
func Logger() echo.MiddlewareFunc {

View file

@ -1,71 +1,100 @@
package torrent
import (
"crypto/rand"
"log/slog"
"os"
"git.kmsign.ru/royalcat/tstor/src/config"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/bep44"
tlog "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/storage"
"git.kmsign.ru/royalcat/tstor/src/config"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/types/infohash"
)
func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) {
func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) *torrent.ClientConfig {
l := slog.With("component", "torrent-client")
// TODO download and upload limits
torrentCfg := torrent.NewDefaultClientConfig()
torrentCfg.PeerID = string(id[:])
torrentCfg.DefaultStorage = st
torrentCfg.AlwaysWantConns = true
torrentCfg.DropMutuallyCompletePeers = true
torrentCfg.TorrentPeersLowWater = 100
torrentCfg.TorrentPeersHighWater = 1000
torrentCfg.AcceptPeerConnections = true
// torrentCfg.AlwaysWantConns = true
// torrentCfg.DropMutuallyCompletePeers = true
// torrentCfg.TorrentPeersLowWater = 100
// torrentCfg.TorrentPeersHighWater = 1000
// torrentCfg.AcceptPeerConnections = true
torrentCfg.Seed = true
torrentCfg.DisableAggressiveUpload = false
// torrentCfg.DisableAggressiveUpload = false
tl := tlog.NewLogger()
tl := tlog.NewLogger("torrent-client")
tl.SetHandlers(&dlog.Torrent{L: l})
torrentCfg.Logger = tl
torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) {
l := l.With("ip", p.RemoteAddr.String())
if p.Torrent() != nil {
l = l.With("torrent", p.Torrent().Name())
}
l.Debug("new peer")
l.With(peerAttrs(p)...).Debug("new peer")
})
torrentCfg.Callbacks.PeerClosed = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) {
l := l.With("ip", p.RemoteAddr.String())
if p.Torrent() != nil {
l = l.With("torrent", p.Torrent().Name())
}
l.Debug("peer closed")
l.With(peerAttrs(p)...).Debug("peer closed")
})
// torrentCfg.Callbacks.PeerConnClosed = append(torrentCfg.Callbacks.PeerConnClosed, func(c *torrent.PeerConn) {
// l.Debug("peer closed", "ip", c.RemoteAddr.String())
// })
torrentCfg.Callbacks.CompletedHandshake = func(pc *torrent.PeerConn, ih infohash.T) {
l.With(peerAttrs(&pc.Peer)...).Debug("completed handshake")
}
torrentCfg.Callbacks.PeerConnAdded = append(torrentCfg.Callbacks.PeerConnAdded, func(pc *torrent.PeerConn) {
l.With(peerAttrs(&pc.Peer)...).Debug("peer conn added")
})
torrentCfg.Callbacks.PeerConnClosed = func(pc *torrent.PeerConn) {
l.With(peerAttrs(&pc.Peer)...).Debug("peer conn closed")
}
torrentCfg.PeriodicallyAnnounceTorrentsToDht = true
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
cfg.Store = fis
cfg.Exp = dhtTTL
cfg.NoSecurity = false
}
t, err := torrent.NewClient(torrentCfg)
if err != nil {
return nil, err
}
return t, nil
return torrentCfg
}
var emptyBytes [20]byte
func getOrCreatePeerID(p string) ([20]byte, error) {
idb, err := os.ReadFile(p)
if err == nil {
var out [20]byte
copy(out[:], idb)
return out, nil
}
if !os.IsNotExist(err) {
return emptyBytes, err
}
var out [20]byte
_, err = rand.Read(out[:])
if err != nil {
return emptyBytes, err
}
return out, os.WriteFile(p, out[:], 0755)
}
func peerAttrs(peer *torrent.Peer) []any {
out := []any{
slog.String("ip", peer.RemoteAddr.String()),
slog.String("discovery", string(peer.Discovery)),
slog.Int("max-requests", peer.PeerMaxRequests),
slog.Bool("prefers-encryption", peer.PeerPrefersEncryption),
}
if peer.Torrent() != nil {
out = append(out, slog.String("torrent", peer.Torrent().Name()))
}
return out
}

View file

@ -20,6 +20,7 @@ import (
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
@ -32,8 +33,11 @@ import (
"github.com/royalcat/kv"
)
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent",
trace.WithInstrumentationAttributes(attribute.String("component", "torrent-daemon")),
const instrument = "git.kmsign.ru/royalcat/tstor/sources/torrent"
var (
tracer = otel.Tracer(instrument, trace.WithInstrumentationAttributes(attribute.String("component", "torrent-daemon")))
meter = otel.Meter(instrument, metric.WithInstrumentationAttributes(attribute.String("component", "torrent-daemon")))
)
type DirAquire struct {
@ -58,7 +62,7 @@ type Daemon struct {
log *rlog.Logger
}
const dhtTTL = 24 * time.Hour
const dhtTTL = 180 * 24 * time.Hour
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
s := &Daemon{
@ -103,10 +107,21 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return nil, err
}
s.client, err = newClient(s.Storage, s.fis, &conf, id)
clientConfig := newClientConfig(s.Storage, s.fis, &conf, id)
s.client, err = torrent.NewClient(clientConfig)
if err != nil {
return nil, fmt.Errorf("error starting torrent client: %w", err)
return nil, err
}
// TODO move to config
s.client.AddDhtNodes([]string{
"router.bittorrent.com:6881",
"router.utorrent.com:6881",
"dht.transmissionbt.com:6881",
"router.bitcomet.com:6881",
"dht.aelitis.com6881",
})
s.client.AddDhtNodes(conf.DHTNodes)
s.dirsAquire, err = tkv.NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire")
@ -124,17 +139,27 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
}()
go func() {
ctx := context.Background()
const period = time.Second * 10
<-s.torrentLoaded
err := registerTorrentMetrics(s.client)
if err != nil {
s.log.Error(ctx, "error registering torrent metrics", rlog.Error(err))
}
err = registerDhtMetrics(s.client)
if err != nil {
s.log.Error(ctx, "error registering dht metrics", rlog.Error(err))
}
timer := time.NewTicker(period)
for {
select {
case <-s.client.Closed():
return
case <-timer.C:
s.updateStats(context.Background())
s.updateStats(ctx)
}
}
}()
@ -142,25 +167,22 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return s, nil
}
func (s *Daemon) updateStats(ctx context.Context) {
log := s.log
func (s *Daemon) allStats(ctx context.Context) (map[infohash.T]TorrentStats, TorrentStats) {
totalPeers := 0
activePeers := 0
connectedSeeders := 0
perTorrentStats := map[infohash.T]TorrentStats{}
for _, v := range s.client.Torrents() {
stats := v.Stats()
err := s.statsStore.AddTorrentStats(v.InfoHash(), TorrentStats{
perTorrentStats[v.InfoHash()] = TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(stats.BytesRead.Int64()),
UploadedBytes: uint64(stats.BytesWritten.Int64()),
TotalPeers: uint16(stats.TotalPeers),
ActivePeers: uint16(stats.ActivePeers),
ConnectedSeeders: uint16(stats.ConnectedSeeders),
})
if err != nil {
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
}
totalPeers += stats.TotalPeers
@ -169,14 +191,29 @@ func (s *Daemon) updateStats(ctx context.Context) {
}
totalStats := s.client.Stats()
err := s.statsStore.AddTotalStats(TorrentStats{
return perTorrentStats, TorrentStats{
Timestamp: time.Now(),
DownloadedBytes: uint64(totalStats.BytesRead.Int64()),
UploadedBytes: uint64(totalStats.BytesWritten.Int64()),
TotalPeers: uint16(totalPeers),
ActivePeers: uint16(activePeers),
ConnectedSeeders: uint16(connectedSeeders),
})
}
}
func (s *Daemon) updateStats(ctx context.Context) {
log := s.log
perTorrentStats, totalStats := s.allStats(ctx)
for ih, v := range perTorrentStats {
err := s.statsStore.AddTorrentStats(ih, v)
if err != nil {
log.Error(ctx, "error saving torrent stats", rlog.Error(err))
}
}
err := s.statsStore.AddTotalStats(totalStats)
if err != nil {
log.Error(ctx, "error saving total stats", rlog.Error(err))
}

View file

@ -410,7 +410,6 @@ func openTorrentFile(ctx context.Context, name string, file *torrent.File, lastT
}
r := file.NewReader()
r.SetReadahead(1024 * 1024 * 16) // TODO configurable
_, err := r.ReadContext(ctx, make([]byte, 128))
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed initial file read: %w", err)

View file

@ -1,30 +0,0 @@
package torrent
import (
"crypto/rand"
"os"
)
var emptyBytes [20]byte
func getOrCreatePeerID(p string) ([20]byte, error) {
idb, err := os.ReadFile(p)
if err == nil {
var out [20]byte
copy(out[:], idb)
return out, nil
}
if !os.IsNotExist(err) {
return emptyBytes, err
}
var out [20]byte
_, err = rand.Read(out[:])
if err != nil {
return emptyBytes, err
}
return out, os.WriteFile(p, out[:], 0755)
}

View file

@ -0,0 +1,66 @@
package torrent
import (
"context"
"encoding/base64"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/torrent"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
func registerTorrentMetrics(client *torrent.Client) error {
meterTotalPeers, _ := meter.Int64ObservableGauge("torrent.peers.total")
meterActivePeers, _ := meter.Int64ObservableGauge("torrent.peers.active")
meterSeeders, _ := meter.Int64ObservableGauge("torrent.seeders")
meterDownloaded, _ := meter.Int64ObservableGauge("torrent.downloaded", metric.WithUnit("By"))
meterIO, _ := meter.Int64ObservableGauge("torrent.io", metric.WithUnit("By"))
_, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
for _, v := range client.Torrents() {
as := attribute.NewSet(
attribute.String("infohash", v.InfoHash().HexString()),
attribute.String("name", v.Name()),
attribute.Int64("size", v.Length()),
)
stats := v.Stats()
o.ObserveInt64(meterTotalPeers, int64(stats.TotalPeers), metric.WithAttributeSet(as))
o.ObserveInt64(meterActivePeers, int64(stats.ActivePeers), metric.WithAttributeSet(as))
o.ObserveInt64(meterSeeders, int64(stats.ConnectedSeeders), metric.WithAttributeSet(as))
o.ObserveInt64(meterIO, stats.BytesRead.Int64(), metric.WithAttributeSet(as), metric.WithAttributes(attribute.String("direction", "download")))
o.ObserveInt64(meterIO, stats.BytesWritten.Int64(), metric.WithAttributeSet(as), metric.WithAttributes(attribute.String("direction", "upload")))
o.ObserveInt64(meterDownloaded, v.BytesCompleted(), metric.WithAttributeSet(as))
}
return nil
}, meterTotalPeers, meterActivePeers, meterSeeders, meterIO, meterDownloaded)
if err != nil {
return err
}
return nil
}
func registerDhtMetrics(client *torrent.Client) error {
meterDhtNodes, _ := meter.Int64ObservableGauge("torrent.dht.nodes")
_, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
servers := client.DhtServers()
for _, dhtSrv := range servers {
stats, ok := dhtSrv.Stats().(dht.ServerStats)
if !ok {
continue
}
id := dhtSrv.ID()
as := attribute.NewSet(
attribute.String("id", base64.StdEncoding.EncodeToString(id[:])),
attribute.String("address", dhtSrv.Addr().String()),
)
o.ObserveInt64(meterDhtNodes, int64(stats.Nodes), metric.WithAttributeSet(as))
}
return nil
}, meterDhtNodes)
return err
}

View file

@ -0,0 +1,36 @@
package torrent
import (
"testing"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBoltPieceCompletion(t *testing.T) {
td := t.TempDir()
pc, err := newPieceCompletion(td)
require.NoError(t, err)
defer pc.Close()
pk := metainfo.PieceKey{}
b, err := pc.Get(pk)
require.NoError(t, err)
assert.False(t, b.Ok)
require.NoError(t, pc.Set(pk, false))
b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, storage.Completion{Complete: false, Ok: true}, b)
require.NoError(t, pc.Set(pk, true))
b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, storage.Completion{Complete: true, Ok: true}, b)
}

View file

@ -21,32 +21,9 @@ import (
// OpenTorrent implements storage.ClientImplCloser.
func (me *fileStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
ctx := context.Background()
log := me.log.With(slog.String("infohash", infoHash.HexString()))
log := me.log.With(slog.String("infohash", infoHash.HexString()), slog.String("name", info.BestName()))
// dir := torrentDir(me.baseDir, infoHash)
// legacyDir := filepath.Join(me.baseDir, info.Name)
// log = log.With(slog.String("legacy_dir", legacyDir), slog.String("dir", dir))
// if _, err := os.Stat(legacyDir); err == nil {
// log.Warn(ctx, "legacy torrent dir found, renaming", slog.String("dir", dir))
// err = os.Rename(legacyDir, dir)
// if err != nil {
// return storage.TorrentImpl{}, fmt.Errorf("error renaming legacy torrent dir: %w", err)
// }
// }
// if _, err := os.Stat(dir); errors.Is(err, fs.ErrNotExist) {
// log.Info(ctx, "new torrent, trying copy files from existing")
// dups := me.dupIndex.Includes(infoHash, info.Files)
// for _, dup := range dups {
// err := me.copyDup(ctx, infoHash, dup)
// if err != nil {
// log.Error(ctx, "error copying file", slog.String("file", dup.fileinfo.DisplayPath(info)), rlog.Error(err))
// }
// }
// }
log.Debug(ctx, "opening torrent")
impl, err := me.client.OpenTorrent(info, infoHash)
if err != nil {

View file

@ -17,10 +17,11 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
type Client struct {
@ -86,8 +87,13 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
return nil, err
}
promExporter, err := prometheus.New()
if err != nil {
return nil, fmt.Errorf("failed to initialize prometheus exporter: %w", err)
}
client.metricProvider = metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(meticExporter)),
metric.WithReader(promExporter),
metric.WithResource(r),
)
otel.SetMeterProvider(client.metricProvider)

View file

@ -2,9 +2,7 @@ package vfs
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"log/slog"
"reflect"
@ -36,7 +34,7 @@ type LogFS struct {
}
func isLoggableError(err error) bool {
return err != nil && !errors.Is(err, fs.ErrNotExist) && !errors.Is(err, io.EOF)
return err != nil // && !errors.Is(err, fs.ErrNotExist) && !errors.Is(err, io.EOF)
}
var _ Filesystem = (*LogFS)(nil)