From ae4501ae211a4c99a4640a9283ffbed79a0654c9 Mon Sep 17 00:00:00 2001 From: royalcat Date: Fri, 23 Aug 2024 01:16:16 +0300 Subject: [PATCH] seek, load only with priority, qbittorrent --- cmd/tstor/main.go | 10 +- go.mod | 17 +- go.sum | 33 +- pkg/ctxbilly/uring.go | 648 ++++++++++++------------ src/sources/qbittorrent/client.go | 85 ++++ src/sources/qbittorrent/daemon.go | 101 ++++ src/sources/qbittorrent/fs.go | 228 +++++++++ src/sources/qbittorrent/install.go | 139 +++++ src/sources/qbittorrent/install_test.go | 18 + src/sources/torrent/client.go | 30 +- src/sources/torrent/daemon.go | 257 +--------- src/sources/torrent/daemon_load.go | 246 +++++++++ src/sources/torrent/daemon_stats.go | 73 +++ src/sources/torrent/fs.go | 12 +- src/sources/torrent/metrics.go | 5 +- src/sources/torrent/peer_store.go | 24 + src/sources/torrent/storage_open.go | 7 +- src/telemetry/setup.go | 2 + src/vfs/archive.go | 14 + src/vfs/ctxbillyfs.go | 5 + src/vfs/dir.go | 5 + src/vfs/dummy.go | 5 + src/vfs/fs.go | 1 + src/vfs/log.go | 5 + src/vfs/memory.go | 5 + src/vfs/os.go | 5 + 26 files changed, 1357 insertions(+), 623 deletions(-) create mode 100644 src/sources/qbittorrent/client.go create mode 100644 src/sources/qbittorrent/daemon.go create mode 100644 src/sources/qbittorrent/fs.go create mode 100644 src/sources/qbittorrent/install.go create mode 100644 src/sources/qbittorrent/install_test.go create mode 100644 src/sources/torrent/daemon_load.go create mode 100644 src/sources/torrent/daemon_stats.go create mode 100644 src/sources/torrent/peer_store.go diff --git a/cmd/tstor/main.go b/cmd/tstor/main.go index eed01fb..6673f89 100644 --- a/cmd/tstor/main.go +++ b/cmd/tstor/main.go @@ -81,17 +81,17 @@ func run(configPath string) error { log := rlog.Component("run") // TODO make optional - err = syscall.Setpriority(syscall.PRIO_PGRP, 0, 19) - if err != nil { - log.Error(ctx, "set priority failed", rlog.Error(err)) - } + // err = syscall.Setpriority(syscall.PRIO_PGRP, 0, 19) + // if err != nil { + // log.Error(ctx, "set priority failed", rlog.Error(err)) + // } if err := os.MkdirAll(conf.SourceDir, 0744); err != nil { return fmt.Errorf("error creating data folder: %w", err) } sourceFs := osfs.New(conf.SourceDir, osfs.WithBoundOS()) - tsrv, err := torrent.NewService(sourceFs, conf.TorrentClient) + tsrv, err := torrent.NewDaemon(sourceFs, conf.TorrentClient) if err != nil { return fmt.Errorf("error creating service: %w", err) } diff --git a/go.mod b/go.mod index 713d872..e949487 100644 --- a/go.mod +++ b/go.mod @@ -2,16 +2,17 @@ module git.kmsign.ru/royalcat/tstor go 1.22.3 +replace github.com/bytedance/sonic v1.11.9 => github.com/bytedance/sonic v1.12.1 + require ( github.com/99designs/gqlgen v0.17.49 github.com/agoda-com/opentelemetry-go/otelslog v0.1.1 github.com/agoda-com/opentelemetry-logs-go v0.5.0 github.com/anacrolix/dht/v2 v2.21.1 - github.com/anacrolix/log v0.15.2 - github.com/anacrolix/torrent v1.56.1 + github.com/anacrolix/log v0.15.3-0.20240627045001-cd912c641d83 + github.com/anacrolix/torrent v1.56.2-0.20240813010934-f4711825e84e github.com/billziss-gh/cgofuse v1.5.0 github.com/bodgit/sevenzip v1.5.1 - github.com/cyphar/filepath-securejoin v0.2.5 github.com/dgraph-io/badger/v4 v4.2.0 github.com/dustin/go-humanize v1.0.1 github.com/gin-gonic/gin v1.9.1 @@ -20,9 +21,8 @@ require ( github.com/google/go-github/v63 v63.0.0 github.com/google/uuid v1.6.0 github.com/grafana/otel-profiling-go v0.5.1 - github.com/grafana/pyroscope-go v1.1.1 + github.com/grafana/pyroscope-go v1.1.2 github.com/hashicorp/golang-lru/v2 v2.0.7 - github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90 github.com/knadh/koanf/parsers/yaml v0.1.0 github.com/knadh/koanf/providers/env v0.1.0 github.com/knadh/koanf/providers/file v0.1.0 @@ -67,7 +67,7 @@ require ( github.com/agnivade/levenshtein v1.1.1 // indirect github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect github.com/alecthomas/atomic v0.1.0-alpha2 // indirect - github.com/anacrolix/chansync v0.4.0 // indirect + github.com/anacrolix/chansync v0.4.1-0.20240627045151-1aa1ac392fe8 // indirect github.com/anacrolix/envpprof v1.3.0 // indirect github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab // indirect github.com/anacrolix/go-libutp v1.3.1 // indirect @@ -89,13 +89,14 @@ require ( github.com/bodgit/windows v1.0.1 // indirect github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect github.com/bytedance/sonic v1.11.9 // indirect - github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/cyphar/filepath-securejoin v0.2.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect @@ -123,7 +124,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/gorilla/schema v1.4.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect - github.com/grafana/pyroscope-go/godeltaprof v0.1.7 // indirect + github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/go.sum b/go.sum index a193487..a7f7ce9 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/anacrolix/chansync v0.4.0 h1:Md0HM7zYCAO4KwNwgcIRgxNsMxiRuk7D1Ha0Uo+2y60= -github.com/anacrolix/chansync v0.4.0/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= +github.com/anacrolix/chansync v0.4.1-0.20240627045151-1aa1ac392fe8 h1:eyb0bBaQKMOh5Se/Qg54shijc8K4zpQiOjEhKFADkQM= +github.com/anacrolix/chansync v0.4.1-0.20240627045151-1aa1ac392fe8/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= github.com/anacrolix/dht/v2 v2.21.1 h1:s1rKkfLLcmBHKv4v/mtMkIeHIEptzEFiB6xVu54+5/o= github.com/anacrolix/dht/v2 v2.21.1/go.mod h1:SDGC+sEs1pnO2sJGYuhvIis7T8749dDHNfcjtdH4e3g= github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= @@ -68,8 +68,8 @@ github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgw github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= github.com/anacrolix/log v0.13.1/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68= github.com/anacrolix/log v0.14.2/go.mod h1:1OmJESOtxQGNMlUO5rcv96Vpp9mfMqXXbe2RdinFLdY= -github.com/anacrolix/log v0.15.2 h1:LTSf5Wm6Q4GNWPFMBP7NPYV6UBVZzZLKckL+/Lj72Oo= -github.com/anacrolix/log v0.15.2/go.mod h1:m0poRtlr41mriZlXBQ9SOVZ8yZBkLjOkDhd5Li5pITA= +github.com/anacrolix/log v0.15.3-0.20240627045001-cd912c641d83 h1:9o/yVzzLzYaBDFx8B27yhkvBLhNnRAuSTK7Y+yZKVtU= +github.com/anacrolix/log v0.15.3-0.20240627045001-cd912c641d83/go.mod h1:xvHjsYWWP7yO8PZwtuIp/k0DBlu07pSJqH4SEC78Vwc= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM= github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s= @@ -99,8 +99,8 @@ github.com/anacrolix/sync v0.5.1/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DC github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= -github.com/anacrolix/torrent v1.56.1 h1:QeJMOP0NuhpQ5dATsOqEL0vUO85aPMNMGP2FACNt0Eg= -github.com/anacrolix/torrent v1.56.1/go.mod h1:5DMHbeIM1TuC5wTQ99XieKKLiYZYz6iB2lyZpKZEr6w= +github.com/anacrolix/torrent v1.56.2-0.20240813010934-f4711825e84e h1:gfu86Ozd6rvq4mwSgy1s6SRlS8UeeCORKoqnXvlXtY0= +github.com/anacrolix/torrent v1.56.2-0.20240813010934-f4711825e84e/go.mod h1:m6Jl1mdUG3wcapLuvn8ZwENi49DUCmiacom6plQ5rcI= github.com/anacrolix/upnp v0.1.4 h1:+2t2KA6QOhm/49zeNyeVwDu1ZYS9dB9wfxyVvh/wk7U= github.com/anacrolix/upnp v0.1.4/go.mod h1:Qyhbqo69gwNWvEk1xNTXsS5j7hMHef9hdr984+9fIic= github.com/anacrolix/utp v0.2.0 h1:65Cdmr6q9WSw2KsM+rtJFu7rqDzLl2bdysf4KlNPcFI= @@ -136,10 +136,11 @@ github.com/bradfitz/iter v0.0.0-20140124041915-454541ec3da2/go.mod h1:PyRFw1Lt2w github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c/go.mod h1:PyRFw1Lt2wKX4ZVSQ2mk+PeDa1rxyObEDlApuIsUKuo= github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaqKnf+7Qs6GbEPfd4iMOitWzXJx8= github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og= -github.com/bytedance/sonic v1.11.9 h1:LFHENlIY/SLzDWverzdOvgMztTxcfcF+cqNsz9pK5zg= -github.com/bytedance/sonic v1.11.9/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= -github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= +github.com/bytedance/sonic v1.12.1 h1:jWl5Qz1fy7X1ioY74WqO0KjAMtAGQs4sYnjiEBiyX24= +github.com/bytedance/sonic v1.12.1/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM= +github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -328,10 +329,10 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grafana/otel-profiling-go v0.5.1 h1:stVPKAFZSa7eGiqbYuG25VcqYksR6iWvF3YH66t4qL8= github.com/grafana/otel-profiling-go v0.5.1/go.mod h1:ftN/t5A/4gQI19/8MoWurBEtC6gFw8Dns1sJZ9W4Tls= -github.com/grafana/pyroscope-go v1.1.1 h1:PQoUU9oWtO3ve/fgIiklYuGilvsm8qaGhlY4Vw6MAcQ= -github.com/grafana/pyroscope-go v1.1.1/go.mod h1:Mw26jU7jsL/KStNSGGuuVYdUq7Qghem5P8aXYXSXG88= -github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY= -github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE= +github.com/grafana/pyroscope-go v1.1.2 h1:7vCfdORYQMCxIzI3NlYAs3FcBP760+gWuYWOyiVyYx8= +github.com/grafana/pyroscope-go v1.1.2/go.mod h1:HSSmHo2KRn6FasBA4vK7BMiQqyQq8KSuBKvrhkXxYPU= +github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= +github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -353,8 +354,6 @@ github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90 h1:xrtfZokN++5kencK33hn2Kx3Uj8tGnjMEhdt6FMvHD0= -github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90/go.mod h1:LEzdaZarZ5aqROlLIwJ4P7h3+4o71008fSy6wpaEB+s= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -367,7 +366,6 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -701,7 +699,6 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go4.org v0.0.0-20230225012048-214862532bf5 h1:nifaUDeh+rPaBCMPMQHZmvJf+QdpLFnuQPwx+LxVmtc= go4.org v0.0.0-20230225012048-214862532bf5/go.mod h1:F57wTi5Lrj6WLyswp5EYV1ncrEbFGHD4hhz6S1ZYeaU= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -826,7 +823,6 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200413165638-669c56c373c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1023,7 +1019,6 @@ modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= -rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= zombiezen.com/go/sqlite v1.3.0 h1:98g1gnCm+CNz6AuQHu0gqyw7gR2WU3O3PJufDOStpUs= diff --git a/pkg/ctxbilly/uring.go b/pkg/ctxbilly/uring.go index 3d54ae6..fad7318 100644 --- a/pkg/ctxbilly/uring.go +++ b/pkg/ctxbilly/uring.go @@ -1,355 +1,355 @@ package ctxbilly -import ( - "context" - "errors" - "fmt" - "os" - "path/filepath" - "strings" +// import ( +// "context" +// "errors" +// "fmt" +// "os" +// "path/filepath" +// "strings" - securejoin "github.com/cyphar/filepath-securejoin" - "github.com/iceber/iouring-go" -) +// securejoin "github.com/cyphar/filepath-securejoin" +// "github.com/iceber/iouring-go" +// ) -func NewURingFS() (*UringFS, error) { - ur, err := iouring.New(64, iouring.WithAsync()) - if err != nil { - return nil, err - } - - return &UringFS{ - ur: ur, - }, nil -} - -var _ Filesystem = (*UringFS)(nil) - -const ( - defaultDirectoryMode = 0o755 - defaultCreateMode = 0o666 -) - -// UringFS is a fs implementation based on the OS filesystem which is bound to -// a base dir. -// Prefer this fs implementation over ChrootOS. -// -// Behaviours of note: -// 1. Read and write operations can only be directed to files which descends -// from the base dir. -// 2. Symlinks don't have their targets modified, and therefore can point -// to locations outside the base dir or to non-existent paths. -// 3. Readlink and Lstat ensures that the link file is located within the base -// dir, evaluating any symlinks that file or base dir may contain. -type UringFS struct { - ur *iouring.IOURing - baseDir string -} - -func newBoundOS(d string) *UringFS { - return &UringFS{baseDir: d} -} - -func (fs *UringFS) Create(ctx context.Context, filename string) (File, error) { - return fs.OpenFile(ctx, filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultCreateMode) -} - -func (fs *UringFS) OpenFile(ctx context.Context, filename string, flag int, perm os.FileMode) (File, error) { - fn, err := fs.abs(filename) - if err != nil { - return nil, err - } - - f, err := os.OpenFile(fn, flag, perm) - if err != nil { - return nil, err - } - - return newFile(fs.ur, f) -} - -func (fs *UringFS) ReadDir(ctx context.Context, path string) ([]os.FileInfo, error) { - dir, err := fs.abs(path) - if err != nil { - return nil, err - } - - entries, err := os.ReadDir(dir) - if err != nil { - return nil, err - } - infos := make([]os.FileInfo, 0, len(entries)) - for _, v := range entries { - info, err := v.Info() - if err != nil { - return nil, err - } - - infos = append(infos, info) - } - - return infos, nil -} - -func (fs *UringFS) Rename(ctx context.Context, from, to string) error { - f, err := fs.abs(from) - if err != nil { - return err - } - t, err := fs.abs(to) - if err != nil { - return err - } - - // MkdirAll for target name. - if err := fs.createDir(t); err != nil { - return err - } - - return os.Rename(f, t) -} - -func (fs *UringFS) MkdirAll(ctx context.Context, path string, perm os.FileMode) error { - dir, err := fs.abs(path) - if err != nil { - return err - } - return os.MkdirAll(dir, perm) -} - -func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) { - filename, err := fs.abs(filename) - if err != nil { - return nil, err - } - return os.Stat(filename) -} - -func (fs *UringFS) Remove(ctx context.Context, filename string) error { - fn, err := fs.abs(filename) - if err != nil { - return err - } - return os.Remove(fn) -} - -func (fs *UringFS) Join(elem ...string) string { - return filepath.Join(elem...) -} - -func (fs *UringFS) RemoveAll(path string) error { - dir, err := fs.abs(path) - if err != nil { - return err - } - return os.RemoveAll(dir) -} - -func (fs *UringFS) Symlink(ctx context.Context, target, link string) error { - ln, err := fs.abs(link) - if err != nil { - return err - } - // MkdirAll for containing dir. - if err := fs.createDir(ln); err != nil { - return err - } - return os.Symlink(target, ln) -} - -func (fs *UringFS) Lstat(ctx context.Context, filename string) (os.FileInfo, error) { - filename = filepath.Clean(filename) - if !filepath.IsAbs(filename) { - filename = filepath.Join(fs.baseDir, filename) - } - if ok, err := fs.insideBaseDirEval(filename); !ok { - return nil, err - } - return os.Lstat(filename) -} - -func (fs *UringFS) Readlink(ctx context.Context, link string) (string, error) { - if !filepath.IsAbs(link) { - link = filepath.Clean(filepath.Join(fs.baseDir, link)) - } - if ok, err := fs.insideBaseDirEval(link); !ok { - return "", err - } - return os.Readlink(link) -} - -// Chroot returns a new OS filesystem, with the base dir set to the -// result of joining the provided path with the underlying base dir. -// func (fs *UringFS) Chroot(path string) (Filesystem, error) { -// joined, err := securejoin.SecureJoin(fs.baseDir, path) +// func NewURingFS() (*UringFS, error) { +// ur, err := iouring.New(64, iouring.WithAsync()) // if err != nil { // return nil, err // } -// return newBoundOS(joined), nil + +// return &UringFS{ +// ur: ur, +// }, nil // } -// Root returns the current base dir of the billy.Filesystem. -// This is required in order for this implementation to be a drop-in -// replacement for other upstream implementations (e.g. memory and osfs). -func (fs *UringFS) Root() string { - return fs.baseDir -} +// var _ Filesystem = (*UringFS)(nil) -func (fs *UringFS) createDir(fullpath string) error { - dir := filepath.Dir(fullpath) - if dir != "." { - if err := os.MkdirAll(dir, defaultDirectoryMode); err != nil { - return err - } - } +// const ( +// defaultDirectoryMode = 0o755 +// defaultCreateMode = 0o666 +// ) - return nil -} +// // UringFS is a fs implementation based on the OS filesystem which is bound to +// // a base dir. +// // Prefer this fs implementation over ChrootOS. +// // +// // Behaviours of note: +// // 1. Read and write operations can only be directed to files which descends +// // from the base dir. +// // 2. Symlinks don't have their targets modified, and therefore can point +// // to locations outside the base dir or to non-existent paths. +// // 3. Readlink and Lstat ensures that the link file is located within the base +// // dir, evaluating any symlinks that file or base dir may contain. +// type UringFS struct { +// ur *iouring.IOURing +// baseDir string +// } -// abs transforms filename to an absolute path, taking into account the base dir. -// Relative paths won't be allowed to ascend the base dir, so `../file` will become -// `/working-dir/file`. -// -// Note that if filename is a symlink, the returned address will be the target of the -// symlink. -func (fs *UringFS) abs(filename string) (string, error) { - if filename == fs.baseDir { - filename = string(filepath.Separator) - } +// func newBoundOS(d string) *UringFS { +// return &UringFS{baseDir: d} +// } - path, err := securejoin.SecureJoin(fs.baseDir, filename) - if err != nil { - return "", nil - } +// func (fs *UringFS) Create(ctx context.Context, filename string) (File, error) { +// return fs.OpenFile(ctx, filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultCreateMode) +// } - return path, nil -} +// func (fs *UringFS) OpenFile(ctx context.Context, filename string, flag int, perm os.FileMode) (File, error) { +// fn, err := fs.abs(filename) +// if err != nil { +// return nil, err +// } -// insideBaseDirEval checks whether filename is contained within -// a dir that is within the fs.baseDir, by first evaluating any symlinks -// that either filename or fs.baseDir may contain. -func (fs *UringFS) insideBaseDirEval(filename string) (bool, error) { - dir, err := filepath.EvalSymlinks(filepath.Dir(filename)) - if dir == "" || os.IsNotExist(err) { - dir = filepath.Dir(filename) - } - wd, err := filepath.EvalSymlinks(fs.baseDir) - if wd == "" || os.IsNotExist(err) { - wd = fs.baseDir - } - if filename != wd && dir != wd && !strings.HasPrefix(dir, wd+string(filepath.Separator)) { - return false, fmt.Errorf("path outside base dir") - } - return true, nil -} +// f, err := os.OpenFile(fn, flag, perm) +// if err != nil { +// return nil, err +// } -func newFile(fsur *iouring.IOURing, f *os.File) (*URingFile, error) { - ur, err := iouring.New(64, iouring.WithAttachWQ(fsur)) - if err != nil { - return nil, err - } +// return newFile(fs.ur, f) +// } - return &URingFile{ - ur: ur, - f: f, - }, nil -} +// func (fs *UringFS) ReadDir(ctx context.Context, path string) ([]os.FileInfo, error) { +// dir, err := fs.abs(path) +// if err != nil { +// return nil, err +// } -type URingFile struct { - ur *iouring.IOURing - f *os.File -} +// entries, err := os.ReadDir(dir) +// if err != nil { +// return nil, err +// } +// infos := make([]os.FileInfo, 0, len(entries)) +// for _, v := range entries { +// info, err := v.Info() +// if err != nil { +// return nil, err +// } -// Close implements File. -func (o *URingFile) Close(ctx context.Context) error { - return errors.Join(o.ur.UnregisterFile(o.f), o.Close(ctx)) -} +// infos = append(infos, info) +// } -// Name implements File. -func (o *URingFile) Name() string { - return o.f.Name() -} +// return infos, nil +// } -// Read implements File. -func (o *URingFile) Read(ctx context.Context, p []byte) (n int, err error) { - req, err := o.ur.Read(o.f, p, nil) - if err != nil { - return 0, err - } - defer req.Cancel() +// func (fs *UringFS) Rename(ctx context.Context, from, to string) error { +// f, err := fs.abs(from) +// if err != nil { +// return err +// } +// t, err := fs.abs(to) +// if err != nil { +// return err +// } - select { - case <-req.Done(): - return req.GetRes() - case <-ctx.Done(): - req.Cancel() - <-req.Done() - return req.GetRes() - } -} +// // MkdirAll for target name. +// if err := fs.createDir(t); err != nil { +// return err +// } -// ReadAt implements File. -func (o *URingFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { - req, err := o.ur.Pread(o.f, p, uint64(off), nil) - if err != nil { - return 0, err - } - defer req.Cancel() +// return os.Rename(f, t) +// } - select { - case <-req.Done(): - return req.GetRes() - case <-ctx.Done(): - req.Cancel() - <-req.Done() - return req.GetRes() - } -} +// func (fs *UringFS) MkdirAll(ctx context.Context, path string, perm os.FileMode) error { +// dir, err := fs.abs(path) +// if err != nil { +// return err +// } +// return os.MkdirAll(dir, perm) +// } -// Write implements File. -func (o *URingFile) Write(ctx context.Context, p []byte) (n int, err error) { - req, err := o.ur.Write(o.f, p, nil) - if err != nil { - return 0, err - } - defer req.Cancel() +// func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) { +// filename, err := fs.abs(filename) +// if err != nil { +// return nil, err +// } +// return os.Stat(filename) +// } - select { - case <-req.Done(): - return req.GetRes() - case <-ctx.Done(): - req.Cancel() - <-req.Done() - return req.GetRes() - } -} +// func (fs *UringFS) Remove(ctx context.Context, filename string) error { +// fn, err := fs.abs(filename) +// if err != nil { +// return err +// } +// return os.Remove(fn) +// } -// WriteAt implements File. -func (o *URingFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) { - req, err := o.ur.Pwrite(o.f, p, uint64(off), nil) - if err != nil { - return 0, err - } - defer req.Cancel() +// func (fs *UringFS) Join(elem ...string) string { +// return filepath.Join(elem...) +// } - select { - case <-req.Done(): - return req.GetRes() - case <-ctx.Done(): - req.Cancel() - <-req.Done() - return req.GetRes() - } -} +// func (fs *UringFS) RemoveAll(path string) error { +// dir, err := fs.abs(path) +// if err != nil { +// return err +// } +// return os.RemoveAll(dir) +// } -// Seek implements File. -func (o *URingFile) Seek(offset int64, whence int) (int64, error) { - return o.f.Seek(offset, whence) -} +// func (fs *UringFS) Symlink(ctx context.Context, target, link string) error { +// ln, err := fs.abs(link) +// if err != nil { +// return err +// } +// // MkdirAll for containing dir. +// if err := fs.createDir(ln); err != nil { +// return err +// } +// return os.Symlink(target, ln) +// } -// Truncate implements File. -func (o *URingFile) Truncate(ctx context.Context, size int64) error { - return o.f.Truncate(size) -} +// func (fs *UringFS) Lstat(ctx context.Context, filename string) (os.FileInfo, error) { +// filename = filepath.Clean(filename) +// if !filepath.IsAbs(filename) { +// filename = filepath.Join(fs.baseDir, filename) +// } +// if ok, err := fs.insideBaseDirEval(filename); !ok { +// return nil, err +// } +// return os.Lstat(filename) +// } -var _ File = (*URingFile)(nil) +// func (fs *UringFS) Readlink(ctx context.Context, link string) (string, error) { +// if !filepath.IsAbs(link) { +// link = filepath.Clean(filepath.Join(fs.baseDir, link)) +// } +// if ok, err := fs.insideBaseDirEval(link); !ok { +// return "", err +// } +// return os.Readlink(link) +// } + +// // Chroot returns a new OS filesystem, with the base dir set to the +// // result of joining the provided path with the underlying base dir. +// // func (fs *UringFS) Chroot(path string) (Filesystem, error) { +// // joined, err := securejoin.SecureJoin(fs.baseDir, path) +// // if err != nil { +// // return nil, err +// // } +// // return newBoundOS(joined), nil +// // } + +// // Root returns the current base dir of the billy.Filesystem. +// // This is required in order for this implementation to be a drop-in +// // replacement for other upstream implementations (e.g. memory and osfs). +// func (fs *UringFS) Root() string { +// return fs.baseDir +// } + +// func (fs *UringFS) createDir(fullpath string) error { +// dir := filepath.Dir(fullpath) +// if dir != "." { +// if err := os.MkdirAll(dir, defaultDirectoryMode); err != nil { +// return err +// } +// } + +// return nil +// } + +// // abs transforms filename to an absolute path, taking into account the base dir. +// // Relative paths won't be allowed to ascend the base dir, so `../file` will become +// // `/working-dir/file`. +// // +// // Note that if filename is a symlink, the returned address will be the target of the +// // symlink. +// func (fs *UringFS) abs(filename string) (string, error) { +// if filename == fs.baseDir { +// filename = string(filepath.Separator) +// } + +// path, err := securejoin.SecureJoin(fs.baseDir, filename) +// if err != nil { +// return "", nil +// } + +// return path, nil +// } + +// // insideBaseDirEval checks whether filename is contained within +// // a dir that is within the fs.baseDir, by first evaluating any symlinks +// // that either filename or fs.baseDir may contain. +// func (fs *UringFS) insideBaseDirEval(filename string) (bool, error) { +// dir, err := filepath.EvalSymlinks(filepath.Dir(filename)) +// if dir == "" || os.IsNotExist(err) { +// dir = filepath.Dir(filename) +// } +// wd, err := filepath.EvalSymlinks(fs.baseDir) +// if wd == "" || os.IsNotExist(err) { +// wd = fs.baseDir +// } +// if filename != wd && dir != wd && !strings.HasPrefix(dir, wd+string(filepath.Separator)) { +// return false, fmt.Errorf("path outside base dir") +// } +// return true, nil +// } + +// func newFile(fsur *iouring.IOURing, f *os.File) (*URingFile, error) { +// ur, err := iouring.New(64, iouring.WithAttachWQ(fsur)) +// if err != nil { +// return nil, err +// } + +// return &URingFile{ +// ur: ur, +// f: f, +// }, nil +// } + +// type URingFile struct { +// ur *iouring.IOURing +// f *os.File +// } + +// // Close implements File. +// func (o *URingFile) Close(ctx context.Context) error { +// return errors.Join(o.ur.UnregisterFile(o.f), o.Close(ctx)) +// } + +// // Name implements File. +// func (o *URingFile) Name() string { +// return o.f.Name() +// } + +// // Read implements File. +// func (o *URingFile) Read(ctx context.Context, p []byte) (n int, err error) { +// req, err := o.ur.Read(o.f, p, nil) +// if err != nil { +// return 0, err +// } +// defer req.Cancel() + +// select { +// case <-req.Done(): +// return req.GetRes() +// case <-ctx.Done(): +// req.Cancel() +// <-req.Done() +// return req.GetRes() +// } +// } + +// // ReadAt implements File. +// func (o *URingFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { +// req, err := o.ur.Pread(o.f, p, uint64(off), nil) +// if err != nil { +// return 0, err +// } +// defer req.Cancel() + +// select { +// case <-req.Done(): +// return req.GetRes() +// case <-ctx.Done(): +// req.Cancel() +// <-req.Done() +// return req.GetRes() +// } +// } + +// // Write implements File. +// func (o *URingFile) Write(ctx context.Context, p []byte) (n int, err error) { +// req, err := o.ur.Write(o.f, p, nil) +// if err != nil { +// return 0, err +// } +// defer req.Cancel() + +// select { +// case <-req.Done(): +// return req.GetRes() +// case <-ctx.Done(): +// req.Cancel() +// <-req.Done() +// return req.GetRes() +// } +// } + +// // WriteAt implements File. +// func (o *URingFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) { +// req, err := o.ur.Pwrite(o.f, p, uint64(off), nil) +// if err != nil { +// return 0, err +// } +// defer req.Cancel() + +// select { +// case <-req.Done(): +// return req.GetRes() +// case <-ctx.Done(): +// req.Cancel() +// <-req.Done() +// return req.GetRes() +// } +// } + +// // Seek implements File. +// func (o *URingFile) Seek(offset int64, whence int) (int64, error) { +// return o.f.Seek(offset, whence) +// } + +// // Truncate implements File. +// func (o *URingFile) Truncate(ctx context.Context, size int64) error { +// return o.f.Truncate(size) +// } + +// var _ File = (*URingFile)(nil) diff --git a/src/sources/qbittorrent/client.go b/src/sources/qbittorrent/client.go new file mode 100644 index 0000000..5fb2810 --- /dev/null +++ b/src/sources/qbittorrent/client.go @@ -0,0 +1,85 @@ +package qbittorrent + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/xuthus5/qbittorrent-client-go/qbittorrent" +) + +type client struct { + qb qbittorrent.Client +} + +func wrapClient(qb qbittorrent.Client) *client { + return &client{qb: qb} +} + +func (f *client) getFileContent(ctx context.Context, hash string, contextIndex int) (*qbittorrent.TorrentContent, error) { + contents, err := f.qb.Torrent().GetContents(hash) + if err != nil { + return nil, err + } + + contentIndex := slices.IndexFunc(contents, func(c *qbittorrent.TorrentContent) bool { + return c.Index == contextIndex + }) + if contentIndex == -1 { + return nil, fmt.Errorf("content not found") + } + + return contents[contentIndex], nil +} + +func (f *client) isPieceComplete(ctx context.Context, hash string, pieceIndex int) (bool, error) { + completion, err := f.qb.Torrent().GetPiecesStates(hash) + if err != nil { + return false, err + } + + if completion[pieceIndex] == 2 { + return true, nil + } + + return false, nil +} + +func (f *client) waitPieceToComplete(ctx context.Context, hash string, pieceIndex int) error { + const checkingInterval = 1 * time.Second + + ok, err := f.isPieceComplete(ctx, hash, pieceIndex) + if err != nil { + return err + } + if ok { + return nil + } + + if deadline, ok := ctx.Deadline(); ok && time.Until(deadline) < checkingInterval { + return context.DeadlineExceeded + } + + ticker := time.NewTicker(checkingInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + ok, err := f.isPieceComplete(ctx, hash, pieceIndex) + if err != nil { + return err + } + if ok { + return nil + } + + if deadline, ok := ctx.Deadline(); ok && time.Until(deadline) < checkingInterval { + return context.DeadlineExceeded + } + } + } +} diff --git a/src/sources/qbittorrent/daemon.go b/src/sources/qbittorrent/daemon.go new file mode 100644 index 0000000..abb19ae --- /dev/null +++ b/src/sources/qbittorrent/daemon.go @@ -0,0 +1,101 @@ +package qbittorrent + +import ( + "bytes" + "context" + "path" + + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/types/infohash" + "github.com/royalcat/ctxio" + "github.com/xuthus5/qbittorrent-client-go/qbittorrent" +) + +type Daemon struct { + qb qbittorrent.Client + client *client + dataDir string +} + +func NewDaemon(dir string) (*Daemon, error) { + + dataDir := dir + "/data" + qb, err := qbittorrent.NewClient(&qbittorrent.Config{ + Address: "localhost:8080", + }) + if err != nil { + return nil, err + } + + return &Daemon{ + qb: qb, + dataDir: dataDir, + client: wrapClient(qb), + }, nil +} + +func (fs *Daemon) torrentPath(ih infohash.T) string { + return path.Join(fs.dataDir, ih.HexString()) +} + +func (fs *Daemon) addTorrent(ctx context.Context, f vfs.File) error { + file, err := ctxio.ReadAll(ctx, f) + if err != nil { + return err + } + + mi, err := metainfo.Load(bytes.NewBuffer(file)) + if err != nil { + return err + } + ih := mi.HashInfoBytes() + + err = fs.qb.Torrent().AddNewTorrent(&qbittorrent.TorrentAddOption{ + Torrents: []*qbittorrent.TorrentAddFileMetadata{ + { + Data: file, + }, + }, + SavePath: fs.torrentPath(ih), + // SequentialDownload: "true", + // FirstLastPiecePrio: "true", + }) + if err != nil { + return err + } + + return nil +} + +func (fs *Daemon) TorrentFS(ctx context.Context, file vfs.File) (*FS, error) { + ih, err := readInfoHash(ctx, file) + if err != nil { + return nil, err + } + + existing, err := fs.qb.Torrent().GetTorrents(&qbittorrent.TorrentOption{ + Hashes: []string{ih.HexString()}, + }) + if err != nil { + return nil, err + } + + if len(existing) == 0 { + err := fs.addTorrent(ctx, file) + if err != nil { + return nil, err + } + } + + return newTorrentFS(fs.client, file.Name(), ih.HexString(), fs.torrentPath(ih)) +} + +// TODO caching +func readInfoHash(ctx context.Context, file vfs.File) (infohash.T, error) { + mi, err := metainfo.Load(ctxio.IoReader(ctx, file)) + if err != nil { + return infohash.T{}, err + } + return mi.HashInfoBytes(), nil +} diff --git a/src/sources/qbittorrent/fs.go b/src/sources/qbittorrent/fs.go new file mode 100644 index 0000000..30acf06 --- /dev/null +++ b/src/sources/qbittorrent/fs.go @@ -0,0 +1,228 @@ +package qbittorrent + +import ( + "context" + "io" + "io/fs" + "os" + "path" + "time" + + "git.kmsign.ru/royalcat/tstor/src/vfs" +) + +type FS struct { + client *client + name string + hash string + dataDir string +} + +var _ vfs.Filesystem = (*FS)(nil) + +func newTorrentFS(client *client, name string, hash string, dataDir string) (*FS, error) { + return &FS{ + client: client, + name: name, + hash: hash, + dataDir: dataDir, + }, nil +} + +// Info implements vfs.Filesystem. +func (f *FS) Info() (fs.FileInfo, error) { + return vfs.NewDirInfo(f.name), nil +} + +// IsDir implements vfs.Filesystem. +func (f *FS) IsDir() bool { + return true +} + +// Name implements vfs.Filesystem. +func (f *FS) Name() string { + return path.Base(f.dataDir) +} + +// Open implements vfs.Filesystem. +func (f *FS) Open(ctx context.Context, filename string) (vfs.File, error) { + panic("unimplemented") +} + +// ReadDir implements vfs.Filesystem. +func (f *FS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { + panic("unimplemented") +} + +// Stat implements vfs.Filesystem. +func (f *FS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { + return vfs.NewDirInfo(f.name), nil +} + +// Type implements vfs.Filesystem. +func (f *FS) Type() fs.FileMode { + return vfs.ROMode +} + +// Unlink implements vfs.Filesystem. +func (f *FS) Unlink(ctx context.Context, filename string) error { + panic("unimplemented") +} + +func openFile(ctx context.Context, client client, hash, filePath string) *File { + client.getFileContent(ctx, hash, 0) + + return &File{ + client: client, + hash: hash, + filePath: filePath, + } +} + +type File struct { + client client + hash string + dataDir string + filePath string // path inside a torrent directory + contentIndex int + pieceSize int + fileSize int64 + + offset int64 + osfile *os.File +} + +var _ vfs.File = (*File)(nil) + +// Close implements vfs.File. +func (f *File) Close(ctx context.Context) error { + if f.osfile != nil { + err := f.osfile.Close() + f.osfile = nil + return err + } + return nil +} + +// Info implements vfs.File. +func (f *File) Info() (fs.FileInfo, error) { + return &fileInfo{name: path.Base(f.filePath), size: f.fileSize}, nil +} + +// IsDir implements vfs.File. +func (f *File) IsDir() bool { + return false +} + +// Seek implements vfs.File. +func (f *File) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + f.offset = offset + case io.SeekCurrent: + f.offset += offset + case io.SeekEnd: + f.offset = f.fileSize + offset + } + return f.offset, nil +} + +// Name implements vfs.File. +func (f *File) Name() string { + return path.Base(f.filePath) +} + +// Read implements vfs.File. +func (f *File) Read(ctx context.Context, p []byte) (n int, err error) { + pieceIndex := int(f.offset / int64(f.pieceSize)) + err = f.client.waitPieceToComplete(ctx, f.hash, pieceIndex) + if err != nil { + return 0, err + } + + descriptor, err := f.descriptor() + if err != nil { + return 0, err + } + + n, err = descriptor.ReadAt(p, f.offset) + f.offset += int64(n) + return n, err +} + +// ReadAt implements vfs.File. +func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + pieceIndex := int(off / int64(f.pieceSize)) + err = f.client.waitPieceToComplete(ctx, f.hash, pieceIndex) + if err != nil { + return 0, err + } + + descriptor, err := f.descriptor() + if err != nil { + return 0, err + } + + return descriptor.ReadAt(p, off) +} + +// Size implements vfs.File. +func (f *File) Size() int64 { + return f.fileSize +} + +// Type implements vfs.File. +func (f *File) Type() fs.FileMode { + return vfs.ROMode +} + +func (f *File) descriptor() (*os.File, error) { + if f.osfile != nil { + return f.osfile, nil + } + + osfile, err := os.Open(path.Join(f.dataDir, f.filePath)) + if err != nil { + return nil, err + } + f.osfile = osfile + + return f.osfile, nil +} + +type fileInfo struct { + name string + size int64 +} + +var _ fs.FileInfo = (*fileInfo)(nil) + +// IsDir implements fs.FileInfo. +func (f *fileInfo) IsDir() bool { + return false +} + +// ModTime implements fs.FileInfo. +func (f *fileInfo) ModTime() time.Time { + return time.Time{} +} + +// Mode implements fs.FileInfo. +func (f *fileInfo) Mode() fs.FileMode { + return vfs.ROMode +} + +// Name implements fs.FileInfo. +func (f *fileInfo) Name() string { + return f.name +} + +// Size implements fs.FileInfo. +func (f *fileInfo) Size() int64 { + return f.size +} + +// Sys implements fs.FileInfo. +func (f *fileInfo) Sys() any { + return nil +} diff --git a/src/sources/qbittorrent/install.go b/src/sources/qbittorrent/install.go new file mode 100644 index 0000000..5e47a25 --- /dev/null +++ b/src/sources/qbittorrent/install.go @@ -0,0 +1,139 @@ +package qbittorrent + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path" + "runtime" + "time" + + "github.com/google/go-github/v63/github" + "golang.org/x/sys/cpu" +) + +const ( + repoOwner = "userdocs" + repoName = "qbittorrent-nox-static" + binName = "qbittorrent-nox" +) + +func runQBittorrent(binDir string, profileDir string, stdout, stderr io.Writer) (*os.Process, error) { + cmd := exec.Command( + path.Join(binDir, binName), + fmt.Sprintf("--profile=%s", profileDir), + ) + cmd.Stdin = bytes.NewReader([]byte("y\n")) + cmd.Stdout = stdout + cmd.Stderr = stderr + err := cmd.Start() + if err != nil { + return nil, err + } + return cmd.Process, nil +} + +func downloadLatestRelease(ctx context.Context, binPath string) error { + client := github.NewClient(nil) + rel, _, err := client.Repositories.GetLatestRelease(ctx, repoOwner, repoName) + if err != nil { + return err + } + + arch := "" + switch runtime.GOARCH { + case "amd64": + arch = "x86_64" + case "arm": + arch = "armhf" // this is a safe version, go does not distinguish between armv6 and armv7 + if cpu.ARM.HasNEON { + arch = "armv7" + } + case "arm64": + arch = "aarch64" + } + + if arch == "" { + return errors.New("unsupported architecture") + } + + binName := arch + "-qbittorrent-nox" + + var targetRelease *github.ReleaseAsset + for _, v := range rel.Assets { + if v.GetName() == binName { + targetRelease = v + break + } + } + if targetRelease == nil { + return fmt.Errorf("target asset %s not found", binName) + } + + downloadUrl := targetRelease.GetBrowserDownloadURL() + if downloadUrl == "" { + return errors.New("download url is empty") + } + + err = os.MkdirAll(path.Dir(binPath), 0755) + if err != nil { + return err + } + return downloadFile(binPath, downloadUrl) +} + +func downloadFile(filepath string, webUrl string) error { + if stat, err := os.Stat(filepath); err == nil { + resp, err := http.Head(webUrl) + if err != nil { + return err + } + defer resp.Body.Close() + + var lastModified time.Time + + lastModifiedHeader := resp.Header.Get("Last-Modified") + if lastModifiedHeader != "" { + lastModified, err = time.Parse(http.TimeFormat, lastModifiedHeader) + if err != nil { + return err + } + } + + if resp.ContentLength == stat.Size() && lastModified.Before(stat.ModTime()) { + return nil + } + } + + // Create the file + out, err := os.Create(filepath) + if err != nil { + return err + } + defer out.Close() + + // Get the data + resp, err := http.Get(webUrl) + if err != nil { + return err + } + defer resp.Body.Close() + + // Check server response + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("bad status: %s", resp.Status) + } + + // Writer the body to file + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + + return nil +} diff --git a/src/sources/qbittorrent/install_test.go b/src/sources/qbittorrent/install_test.go new file mode 100644 index 0000000..62f4975 --- /dev/null +++ b/src/sources/qbittorrent/install_test.go @@ -0,0 +1,18 @@ +package qbittorrent + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDownloadQBittorent(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + require := require.New(t) + err := downloadLatestRelease(ctx, tempDir) + require.NoError(err) + err = downloadLatestRelease(ctx, tempDir) + require.NoError(err) +} diff --git a/src/sources/torrent/client.go b/src/sources/torrent/client.go index 4b9576d..e6273d7 100644 --- a/src/sources/torrent/client.go +++ b/src/sources/torrent/client.go @@ -7,7 +7,6 @@ import ( "git.kmsign.ru/royalcat/tstor/src/config" dlog "git.kmsign.ru/royalcat/tstor/src/log" - "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/bep44" tlog "github.com/anacrolix/log" "github.com/anacrolix/torrent" @@ -23,12 +22,19 @@ func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.Torrent torrentCfg.PeerID = string(id[:]) torrentCfg.DefaultStorage = st // torrentCfg.AlwaysWantConns = true - // torrentCfg.DropMutuallyCompletePeers = true + torrentCfg.DropMutuallyCompletePeers = true // torrentCfg.TorrentPeersLowWater = 100 // torrentCfg.TorrentPeersHighWater = 1000 // torrentCfg.AcceptPeerConnections = true torrentCfg.Seed = true - // torrentCfg.DisableAggressiveUpload = false + torrentCfg.DisableAggressiveUpload = false + + torrentCfg.PeriodicallyAnnounceTorrentsToDht = true + // torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) { + // cfg.Store = fis + // cfg.Exp = dhtTTL + // cfg.PeerStore = fis + // } tl := tlog.NewLogger("torrent-client") tl.SetHandlers(&dlog.Torrent{L: l}) @@ -37,12 +43,12 @@ func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.Torrent torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) { l.With(peerAttrs(p)...).Debug("new peer") }) - torrentCfg.Callbacks.PeerClosed = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) { l.With(peerAttrs(p)...).Debug("peer closed") }) torrentCfg.Callbacks.CompletedHandshake = func(pc *torrent.PeerConn, ih infohash.T) { - l.With(peerAttrs(&pc.Peer)...).Debug("completed handshake") + attrs := append(peerAttrs(&pc.Peer), slog.String("infohash", ih.HexString())) + l.With(attrs...).Debug("completed handshake") } torrentCfg.Callbacks.PeerConnAdded = append(torrentCfg.Callbacks.PeerConnAdded, func(pc *torrent.PeerConn) { l.With(peerAttrs(&pc.Peer)...).Debug("peer conn added") @@ -50,12 +56,16 @@ func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.Torrent torrentCfg.Callbacks.PeerConnClosed = func(pc *torrent.PeerConn) { l.With(peerAttrs(&pc.Peer)...).Debug("peer conn closed") } - - torrentCfg.PeriodicallyAnnounceTorrentsToDht = true - torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) { - cfg.Store = fis - cfg.Exp = dhtTTL + torrentCfg.Callbacks.CompletedHandshake = func(pc *torrent.PeerConn, ih infohash.T) { + attrs := append(peerAttrs(&pc.Peer), slog.String("infohash", ih.HexString())) + l.With(attrs...).Debug("completed handshake") } + torrentCfg.Callbacks.ReceivedRequested = append(torrentCfg.Callbacks.ReceivedRequested, func(pme torrent.PeerMessageEvent) { + l.With(peerAttrs(pme.Peer)...).Debug("received requested") + }) + torrentCfg.Callbacks.ReceivedUsefulData = append(torrentCfg.Callbacks.ReceivedUsefulData, func(pme torrent.PeerMessageEvent) { + l.With(peerAttrs(pme.Peer)...).Debug("received useful data") + }) return torrentCfg } diff --git a/src/sources/torrent/daemon.go b/src/sources/torrent/daemon.go index 807590f..a627f6a 100644 --- a/src/sources/torrent/daemon.go +++ b/src/sources/torrent/daemon.go @@ -1,23 +1,18 @@ package torrent import ( - "bufio" "context" "errors" "fmt" - "log/slog" "os" "path/filepath" - "strings" "sync" "time" - "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/tkv" "git.kmsign.ru/royalcat/tstor/src/vfs" - "github.com/royalcat/ctxio" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -29,7 +24,6 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/types/infohash" "github.com/go-git/go-billy/v5" - "github.com/go-git/go-billy/v5/util" "github.com/royalcat/kv" ) @@ -54,8 +48,7 @@ type Daemon struct { fileProperties kv.Store[string, FileProperties] statsStore *statsStore - loadMutex sync.Mutex - torrentLoaded chan struct{} + loadMutex sync.Mutex sourceFs billy.Filesystem @@ -64,12 +57,11 @@ type Daemon struct { const dhtTTL = 180 * 24 * time.Hour -func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) { +func NewDaemon(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) { s := &Daemon{ - log: rlog.Component("torrent-service"), - sourceFs: sourceFs, - torrentLoaded: make(chan struct{}), - loadMutex: sync.Mutex{}, + log: rlog.Component("torrent-service"), + sourceFs: sourceFs, + loadMutex: sync.Mutex{}, } err := os.MkdirAll(conf.MetadataFolder, 0744) @@ -131,19 +123,16 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, go func() { ctx := context.Background() - err := s.loadTorrentFiles(ctx) + err := s.backgroudFileLoad(ctx) if err != nil { s.log.Error(ctx, "initial torrent load failed", rlog.Error(err)) } - close(s.torrentLoaded) }() go func() { ctx := context.Background() const period = time.Second * 10 - <-s.torrentLoaded - err := registerTorrentMetrics(s.client) if err != nil { s.log.Error(ctx, "error registering torrent metrics", rlog.Error(err)) @@ -167,70 +156,6 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, return s, nil } -func (s *Daemon) allStats(ctx context.Context) (map[infohash.T]TorrentStats, TorrentStats) { - totalPeers := 0 - activePeers := 0 - connectedSeeders := 0 - - perTorrentStats := map[infohash.T]TorrentStats{} - - for _, v := range s.client.Torrents() { - stats := v.Stats() - perTorrentStats[v.InfoHash()] = TorrentStats{ - Timestamp: time.Now(), - DownloadedBytes: uint64(stats.BytesRead.Int64()), - UploadedBytes: uint64(stats.BytesWritten.Int64()), - TotalPeers: uint16(stats.TotalPeers), - ActivePeers: uint16(stats.ActivePeers), - ConnectedSeeders: uint16(stats.ConnectedSeeders), - } - - totalPeers += stats.TotalPeers - activePeers += stats.ActivePeers - connectedSeeders += stats.ConnectedSeeders - } - - totalStats := s.client.Stats() - - return perTorrentStats, TorrentStats{ - Timestamp: time.Now(), - DownloadedBytes: uint64(totalStats.BytesRead.Int64()), - UploadedBytes: uint64(totalStats.BytesWritten.Int64()), - TotalPeers: uint16(totalPeers), - ActivePeers: uint16(activePeers), - ConnectedSeeders: uint16(connectedSeeders), - } -} - -func (s *Daemon) updateStats(ctx context.Context) { - log := s.log - - perTorrentStats, totalStats := s.allStats(ctx) - for ih, v := range perTorrentStats { - err := s.statsStore.AddTorrentStats(ih, v) - if err != nil { - log.Error(ctx, "error saving torrent stats", rlog.Error(err)) - } - } - - err := s.statsStore.AddTotalStats(totalStats) - if err != nil { - log.Error(ctx, "error saving total stats", rlog.Error(err)) - } -} - -func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) { - return s.statsStore.ReadTotalStatsHistory(ctx, since) -} - -func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) { - return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih) -} - -func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) { - return s.statsStore.ReadStatsHistory(ctx, since) -} - var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs func (s *Daemon) Close(ctx context.Context) error { @@ -244,104 +169,6 @@ func (s *Daemon) Close(ctx context.Context) error { )...) } -func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) { - ctx, span := tracer.Start(ctx, "loadTorrent") - defer span.End() - log := s.log - - stat, err := f.Info() - if err != nil { - return nil, fmt.Errorf("call stat failed: %w", err) - } - - span.SetAttributes(attribute.String("filename", stat.Name())) - - mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f))) - if err != nil { - return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err) - } - - var ctl *Controller - t, ok := s.client.Torrent(mi.HashInfoBytes()) - if ok { - ctl = s.newController(t) - } else { - span.AddEvent("torrent not found, loading from file") - log.Info(ctx, "torrent not found, loading from file") - - spec, err := torrent.TorrentSpecFromMetaInfoErr(mi) - if err != nil { - return nil, fmt.Errorf("parse spec from metadata: %w", err) - } - infoBytes := spec.InfoBytes - - if !isValidInfoHashBytes(infoBytes) { - log.Warn(ctx, "info loaded from spec not valid") - infoBytes = nil - } - - if len(infoBytes) == 0 { - log.Info(ctx, "no info loaded from file, try to load from cache") - infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash) - if err != nil && err != errNotFound { - return nil, fmt.Errorf("get info bytes from database: %w", err) - } - } - - t, _ = s.client.AddTorrentOpt(torrent.AddTorrentOpts{ - InfoHash: spec.InfoHash, - InfoHashV2: spec.InfoHashV2, - Storage: s.Storage, - InfoBytes: infoBytes, - ChunkSize: spec.ChunkSize, - }) - - t.AllowDataDownload() - t.AllowDataUpload() - - span.AddEvent("torrent added to client") - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-t.GotInfo(): - err := s.infoBytes.Set(t.InfoHash(), t.Metainfo()) - if err != nil { - log.Error(ctx, "error setting info bytes for torrent", - slog.String("torrent-name", t.Name()), - rlog.Error(err), - ) - } - } - span.AddEvent("got info") - - ctl = s.newController(t) - - err = ctl.initializeTorrentPriories(ctx) - if err != nil { - return nil, fmt.Errorf("initialize torrent priorities: %w", err) - } - - // info := t.Info() - // if info == nil { - // return nil, fmt.Errorf("info is nil") - // } - - // compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, *info) - // if err != nil { - // return nil, err - // } - // if !compatable { - // return nil, fmt.Errorf( - // "torrent with name '%s' not compatable existing infohash: %s, new: %s", - // t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(), - // ) - // } - } - - return ctl, nil -} - func isValidInfoHashBytes(d []byte) bool { var info metainfo.Info err := bencode.Unmarshal(d, &info) @@ -352,74 +179,6 @@ func (s *Daemon) Stats() torrent.ConnStats { return s.client.Stats().ConnStats } -const loadWorkers = 5 - -func (s *Daemon) loadTorrentFiles(ctx context.Context) error { - ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes( - attribute.Int("workers", loadWorkers), - )) - defer span.End() - log := s.log - - loaderPaths := make(chan string, loadWorkers*5) - wg := sync.WaitGroup{} - - defer func() { - close(loaderPaths) - wg.Wait() - }() - - loaderWorker := func() { - for path := range loaderPaths { - info, err := s.sourceFs.Stat(path) - if err != nil { - log.Error(ctx, "error stat torrent file", slog.String("filename", path), rlog.Error(err)) - continue - } - - file, err := s.sourceFs.Open(path) - if err != nil { - log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err)) - continue - } - defer file.Close() - - vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file)) - - _, err = s.loadTorrent(ctx, vfile) - if err != nil { - log.Error(ctx, "failed adding torrent", rlog.Error(err)) - } - } - wg.Done() - } - - wg.Add(loadWorkers) - for range loadWorkers { - go loaderWorker() - } - - return util.Walk(s.sourceFs, "", func(path string, info os.FileInfo, err error) error { - if err != nil { - return fmt.Errorf("fs walk error: %w", err) - } - - if ctx.Err() != nil { - return ctx.Err() - } - - if info.IsDir() { - return nil - } - - if strings.HasSuffix(path, ".torrent") { - loaderPaths <- path - } - - return nil - }) -} - func storeByTorrent[K kv.Bytes, V any](s kv.Store[K, V], infohash infohash.T) kv.Store[K, V] { return kv.PrefixBytes[K, V](s, K(infohash.HexString()+"/")) } @@ -433,8 +192,6 @@ func (s *Daemon) newController(t *torrent.Torrent) *Controller { } func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) { - <-s.torrentLoaded - out := []*Controller{} for _, v := range s.client.Torrents() { out = append(out, s.newController(v)) @@ -443,8 +200,6 @@ func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) { } func (s *Daemon) GetTorrent(infohashHex string) (*Controller, error) { - <-s.torrentLoaded - t, ok := s.client.Torrent(infohash.FromHexString(infohashHex)) if !ok { return nil, nil diff --git a/src/sources/torrent/daemon_load.go b/src/sources/torrent/daemon_load.go new file mode 100644 index 0000000..9793e98 --- /dev/null +++ b/src/sources/torrent/daemon_load.go @@ -0,0 +1,246 @@ +package torrent + +import ( + "bufio" + "context" + "fmt" + "io" + "log/slog" + "os" + "strings" + "sync" + "time" + + "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" + "git.kmsign.ru/royalcat/tstor/pkg/rlog" + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" + "github.com/go-git/go-billy/v5/util" + "github.com/royalcat/ctxio" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +const activityTimeout = time.Minute * 15 + +func readInfoHash(ctx context.Context, f vfs.File) (metainfo.Hash, error) { + ctx, span := tracer.Start(ctx, "readInfoHash") + defer span.End() + + mi, err := metainfo.Load(ctxio.IoReader(ctx, f)) + if err != nil { + return metainfo.Hash{}, fmt.Errorf("loading metainfo: %w", err) + } + + return mi.HashInfoBytes(), nil +} + +func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) { + ctx, span := tracer.Start(ctx, "loadTorrent") + defer span.End() + log := s.log + + stat, err := f.Info() + if err != nil { + return nil, fmt.Errorf("call stat failed: %w", err) + } + + span.SetAttributes(attribute.String("filename", stat.Name())) + + mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f))) + if err != nil { + return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err) + } + log = log.With(slog.String("info-hash", mi.HashInfoBytes().HexString())) + + var ctl *Controller + t, ok := s.client.Torrent(mi.HashInfoBytes()) + if ok { + log = log.With(slog.String("torrent-name", t.Name())) + ctl = s.newController(t) + } else { + span.AddEvent("torrent not found, loading from file") + log.Info(ctx, "torrent not found, loading from file") + + spec, err := torrent.TorrentSpecFromMetaInfoErr(mi) + if err != nil { + return nil, fmt.Errorf("parse spec from metadata: %w", err) + } + infoBytes := spec.InfoBytes + + if !isValidInfoHashBytes(infoBytes) { + log.Warn(ctx, "info loaded from spec not valid") + infoBytes = nil + } + + if len(infoBytes) == 0 { + log.Info(ctx, "no info loaded from file, try to load from cache") + infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash) + if err != nil && err != errNotFound { + return nil, fmt.Errorf("get info bytes from database: %w", err) + } + } + + t, _ = s.client.AddTorrentOpt(torrent.AddTorrentOpts{ + InfoHash: spec.InfoHash, + InfoHashV2: spec.InfoHashV2, + Storage: s.Storage, + InfoBytes: infoBytes, + ChunkSize: spec.ChunkSize, + }) + + log = log.With(slog.String("torrent-name", t.Name())) + + t.AllowDataDownload() + t.AllowDataUpload() + + span.AddEvent("torrent added to client") + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-t.GotInfo(): + err := s.infoBytes.Set(t.InfoHash(), t.Metainfo()) + if err != nil { + log.Error(ctx, "error setting info bytes for torrent", + slog.String("torrent-name", t.Name()), + rlog.Error(err), + ) + } + } + span.AddEvent("got info") + + ctl = s.newController(t) + + err = ctl.initializeTorrentPriories(ctx) + if err != nil { + return nil, fmt.Errorf("initialize torrent priorities: %w", err) + } + + // go func() { + // subscr := ctl.t.SubscribePieceStateChanges() + // defer subscr.Close() + // dropTimer := time.NewTimer(activityTimeout) + // defer dropTimer.Stop() + + // for { + // select { + // case <-subscr.Values: + // dropTimer.Reset(activityTimeout) + // case <-dropTimer.C: + // log.Info(ctx, "torrent dropped by activity timeout") + // select { + // case <-ctl.t.Closed(): + // return + // case <-time.After(time.Second): + // ctl.t.Drop() + // } + // case <-ctl.t.Closed(): + // return + // } + // } + // }() + } + + return ctl, nil +} + +const loadWorkers = 5 + +func (s *Daemon) backgroudFileLoad(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes( + attribute.Int("workers", loadWorkers), + )) + defer span.End() + log := s.log + + loaderPaths := make(chan string, loadWorkers*5) + wg := sync.WaitGroup{} + + defer func() { + close(loaderPaths) + wg.Wait() + }() + + loaderWorker := func() { + for path := range loaderPaths { + info, err := s.sourceFs.Stat(path) + if err != nil { + log.Error(ctx, "error stat torrent file", slog.String("filename", path), rlog.Error(err)) + continue + } + + file, err := s.sourceFs.Open(path) + if err != nil { + log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err)) + continue + } + defer file.Close() + + vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file)) + + ih, err := readInfoHash(ctx, vfile) + if err != nil { + log.Error(ctx, "error reading info hash", slog.String("filename", path), rlog.Error(err)) + continue + } + props := storeByTorrent(s.fileProperties, ih) + _, err = vfile.Seek(0, io.SeekStart) + if err != nil { + log.Error(ctx, "error seeking file", slog.String("filename", path), rlog.Error(err)) + continue + } + + isPrioritized := false + err = props.Range(ctx, func(k string, v FileProperties) error { + if v.Priority > 0 { + isPrioritized = true + return io.EOF + } + return nil + }) + if err != nil && err != io.EOF { + log.Error(ctx, "error checking file priority", slog.String("filename", path), rlog.Error(err)) + continue + } + + if !isPrioritized { + log.Debug(ctx, "file not prioritized, skipping", slog.String("filename", path)) + continue + } + + _, err = s.loadTorrent(ctx, vfile) + if err != nil { + log.Error(ctx, "failed adding torrent", rlog.Error(err)) + } + } + + wg.Done() + } + + wg.Add(loadWorkers) + for range loadWorkers { + go loaderWorker() + } + + return util.Walk(s.sourceFs, "", func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("fs walk error: %w", err) + } + + if ctx.Err() != nil { + return ctx.Err() + } + + if info.IsDir() { + return nil + } + + if strings.HasSuffix(path, ".torrent") { + loaderPaths <- path + } + + return nil + }) +} diff --git a/src/sources/torrent/daemon_stats.go b/src/sources/torrent/daemon_stats.go new file mode 100644 index 0000000..cce0701 --- /dev/null +++ b/src/sources/torrent/daemon_stats.go @@ -0,0 +1,73 @@ +package torrent + +import ( + "context" + "time" + + "git.kmsign.ru/royalcat/tstor/pkg/rlog" + "github.com/anacrolix/torrent/types/infohash" +) + +func (s *Daemon) allStats(ctx context.Context) (map[infohash.T]TorrentStats, TorrentStats) { + totalPeers := 0 + activePeers := 0 + connectedSeeders := 0 + + perTorrentStats := map[infohash.T]TorrentStats{} + + for _, v := range s.client.Torrents() { + stats := v.Stats() + perTorrentStats[v.InfoHash()] = TorrentStats{ + Timestamp: time.Now(), + DownloadedBytes: uint64(stats.BytesRead.Int64()), + UploadedBytes: uint64(stats.BytesWritten.Int64()), + TotalPeers: uint16(stats.TotalPeers), + ActivePeers: uint16(stats.ActivePeers), + ConnectedSeeders: uint16(stats.ConnectedSeeders), + } + + totalPeers += stats.TotalPeers + activePeers += stats.ActivePeers + connectedSeeders += stats.ConnectedSeeders + } + + totalStats := s.client.Stats() + + return perTorrentStats, TorrentStats{ + Timestamp: time.Now(), + DownloadedBytes: uint64(totalStats.BytesRead.Int64()), + UploadedBytes: uint64(totalStats.BytesWritten.Int64()), + TotalPeers: uint16(totalPeers), + ActivePeers: uint16(activePeers), + ConnectedSeeders: uint16(connectedSeeders), + } +} + +func (s *Daemon) updateStats(ctx context.Context) { + log := s.log + + perTorrentStats, totalStats := s.allStats(ctx) + for ih, v := range perTorrentStats { + err := s.statsStore.AddTorrentStats(ih, v) + if err != nil { + log.Error(ctx, "error saving torrent stats", rlog.Error(err)) + } + } + + err := s.statsStore.AddTotalStats(totalStats) + if err != nil { + log.Error(ctx, "error saving total stats", rlog.Error(err)) + } +} + +func (s *Daemon) TotalStatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) { + return s.statsStore.ReadTotalStatsHistory(ctx, since) +} + +func (s *Daemon) TorrentStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) { + return s.statsStore.ReadTorrentStatsHistory(ctx, since, ih) +} + +func (s *Daemon) StatsHistory(ctx context.Context, since time.Time) ([]TorrentStats, error) { + return s.statsStore.ReadStatsHistory(ctx, since) +} diff --git a/src/sources/torrent/fs.go b/src/sources/torrent/fs.go index 54b49b4..6549389 100644 --- a/src/sources/torrent/fs.go +++ b/src/sources/torrent/fs.go @@ -432,6 +432,14 @@ func (tf *torrentFile) Name() string { return tf.name } +// Seek implements vfs.File. +func (tf *torrentFile) Seek(offset int64, whence int) (int64, error) { + tf.mu.Lock() + defer tf.mu.Unlock() + + return tf.tr.Seek(offset, whence) +} + // Type implements File. func (tf *torrentFile) Type() fs.FileMode { return vfs.ROMode | fs.ModeDir @@ -482,8 +490,8 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) { span.End() }() - tf.mu.RLock() - defer tf.mu.RUnlock() + tf.mu.Lock() + defer tf.mu.Unlock() ctx, cancel := tf.readTimeout(ctx) defer cancel() diff --git a/src/sources/torrent/metrics.go b/src/sources/torrent/metrics.go index b1725a3..7101683 100644 --- a/src/sources/torrent/metrics.go +++ b/src/sources/torrent/metrics.go @@ -16,8 +16,11 @@ func registerTorrentMetrics(client *torrent.Client) error { meterSeeders, _ := meter.Int64ObservableGauge("torrent.seeders") meterDownloaded, _ := meter.Int64ObservableGauge("torrent.downloaded", metric.WithUnit("By")) meterIO, _ := meter.Int64ObservableGauge("torrent.io", metric.WithUnit("By")) + meterLoaded, _ := meter.Int64ObservableGauge("torrent.loaded") _, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + o.ObserveInt64(meterLoaded, int64(len(client.Torrents()))) + for _, v := range client.Torrents() { as := attribute.NewSet( attribute.String("infohash", v.InfoHash().HexString()), @@ -34,7 +37,7 @@ func registerTorrentMetrics(client *torrent.Client) error { } return nil - }, meterTotalPeers, meterActivePeers, meterSeeders, meterIO, meterDownloaded) + }, meterTotalPeers, meterActivePeers, meterSeeders, meterIO, meterDownloaded, meterLoaded) if err != nil { return err } diff --git a/src/sources/torrent/peer_store.go b/src/sources/torrent/peer_store.go new file mode 100644 index 0000000..59dcc84 --- /dev/null +++ b/src/sources/torrent/peer_store.go @@ -0,0 +1,24 @@ +package torrent + +import ( + "github.com/anacrolix/dht/v2/krpc" + peer_store "github.com/anacrolix/dht/v2/peer-store" + "github.com/anacrolix/torrent/types/infohash" + "github.com/royalcat/kv" +) + +type peerStore struct { + store kv.Store[infohash.T, []krpc.NodeAddr] +} + +var _ peer_store.Interface = (*peerStore)(nil) + +// AddPeer implements peer_store.Interface. +func (p *peerStore) AddPeer(ih infohash.T, node krpc.NodeAddr) { + panic("unimplemented") +} + +// GetPeers implements peer_store.Interface. +func (p *peerStore) GetPeers(ih infohash.T) []krpc.NodeAddr { + panic("unimplemented") +} diff --git a/src/sources/torrent/storage_open.go b/src/sources/torrent/storage_open.go index 0b691c1..0afa7e9 100644 --- a/src/sources/torrent/storage_open.go +++ b/src/sources/torrent/storage_open.go @@ -19,13 +19,14 @@ import ( ) // OpenTorrent implements storage.ClientImplCloser. -func (me *fileStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) { - ctx := context.Background() +func (me *fileStorage) OpenTorrent(ctx context.Context, info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) { + ctx, span := tracer.Start(ctx, "OpenTorrent") + defer span.End() log := me.log.With(slog.String("infohash", infoHash.HexString()), slog.String("name", info.BestName())) log.Debug(ctx, "opening torrent") - impl, err := me.client.OpenTorrent(info, infoHash) + impl, err := me.client.OpenTorrent(ctx, info, infoHash) if err != nil { log.Error(ctx, "error opening torrent", rlog.Error(err)) } diff --git a/src/telemetry/setup.go b/src/telemetry/setup.go index a80a45b..fa16f48 100644 --- a/src/telemetry/setup.go +++ b/src/telemetry/setup.go @@ -12,6 +12,7 @@ import ( "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs" "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp" logsdk "github.com/agoda-com/opentelemetry-logs-go/sdk/logs" + "github.com/google/uuid" otelpyroscope "github.com/grafana/otel-profiling-go" "github.com/grafana/pyroscope-go" "go.opentelemetry.io/otel" @@ -71,6 +72,7 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) { semconv.SchemaURL, semconv.ServiceName(appName), semconv.HostName(hostName), + semconv.ServiceInstanceID(uuid.NewString()), ), ) if err != nil { diff --git a/src/vfs/archive.go b/src/vfs/archive.go index 0375ef6..47e69ec 100644 --- a/src/vfs/archive.go +++ b/src/vfs/archive.go @@ -193,6 +193,20 @@ type archiveFile struct { buffer *filebuffer.Buffer } +// Seek implements File. +func (d *archiveFile) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + d.offset = offset + + case io.SeekCurrent: + d.offset += offset + case io.SeekEnd: + d.offset = d.size + offset + } + return d.offset, nil +} + // Name implements File. func (d *archiveFile) Name() string { return d.name diff --git a/src/vfs/ctxbillyfs.go b/src/vfs/ctxbillyfs.go index 6613eac..c7c64e8 100644 --- a/src/vfs/ctxbillyfs.go +++ b/src/vfs/ctxbillyfs.go @@ -110,6 +110,11 @@ type CtxBillyFile struct { file ctxbilly.File } +// Seek implements File. +func (c *CtxBillyFile) Seek(offset int64, whence int) (int64, error) { + return c.file.Seek(offset, whence) +} + // Close implements File. func (c *CtxBillyFile) Close(ctx context.Context) error { return c.file.Close(ctx) diff --git a/src/vfs/dir.go b/src/vfs/dir.go index c11864a..1ac24e6 100644 --- a/src/vfs/dir.go +++ b/src/vfs/dir.go @@ -33,6 +33,11 @@ func (d *dirFile) IsDir() bool { return true } +// Seek implements File. +func (d *dirFile) Seek(offset int64, whence int) (int64, error) { + return 0, fs.ErrInvalid +} + // Name implements File. func (d *dirFile) Name() string { return d.name diff --git a/src/vfs/dummy.go b/src/vfs/dummy.go index 8f83993..d30ecb1 100644 --- a/src/vfs/dummy.go +++ b/src/vfs/dummy.go @@ -79,6 +79,11 @@ type DummyFile struct { name string } +// Seek implements File. +func (d *DummyFile) Seek(offset int64, whence int) (int64, error) { + return 0, nil +} + // Name implements File. func (d *DummyFile) Name() string { panic("unimplemented") diff --git a/src/vfs/fs.go b/src/vfs/fs.go index 79cb242..892a01e 100644 --- a/src/vfs/fs.go +++ b/src/vfs/fs.go @@ -19,6 +19,7 @@ type File interface { ctxio.Reader ctxio.ReaderAt ctxio.Closer + ctxio.Seeker } var ErrNotImplemented = errors.New("not implemented") diff --git a/src/vfs/log.go b/src/vfs/log.go index ca9cfec..b21fb10 100644 --- a/src/vfs/log.go +++ b/src/vfs/log.go @@ -207,6 +207,11 @@ type LogFile struct { timeout time.Duration } +// Seek implements File. +func (f *LogFile) Seek(offset int64, whence int) (int64, error) { + return f.f.Seek(offset, whence) +} + // Name implements File. func (f *LogFile) Name() string { return f.f.Name() diff --git a/src/vfs/memory.go b/src/vfs/memory.go index ce0d63e..e7b8de6 100644 --- a/src/vfs/memory.go +++ b/src/vfs/memory.go @@ -108,6 +108,11 @@ func (d *MemoryFile) Name() string { return d.name } +// Seek implements File. +func (d *MemoryFile) Seek(offset int64, whence int) (int64, error) { + return d.data.Seek(offset, whence) +} + // Type implements File. func (d *MemoryFile) Type() fs.FileMode { return ROMode diff --git a/src/vfs/os.go b/src/vfs/os.go index 7244096..07661f1 100644 --- a/src/vfs/os.go +++ b/src/vfs/os.go @@ -122,6 +122,11 @@ func (f *LazyOsFile) Type() fs.FileMode { return f.info.Mode() } +// Seek implements File. +func (f *LazyOsFile) Seek(offset int64, whence int) (int64, error) { + return f.file.Seek(offset, whence) +} + // Close implements File. func (f *LazyOsFile) Close(ctx context.Context) error { if f.file == nil {