Compare commits
No commits in common. "bd75492b02ecb9044a2a8c2a82a10fd6382d0bc0" and "ef6680b854bfe9321169bf8c488102282f375aaa" have entirely different histories.
bd75492b02
...
ef6680b854
82 changed files with 1187 additions and 950 deletions
|
@ -50,7 +50,7 @@ models:
|
|||
Path:
|
||||
type: string
|
||||
FS:
|
||||
type: "git.kmsign.ru/royalcat/tstor/src/vfs.Filesystem"
|
||||
type: "git.kmsign.ru/royalcat/tstor/src/host/vfs.Filesystem"
|
||||
TorrentFS:
|
||||
fields:
|
||||
entries:
|
||||
|
@ -64,11 +64,11 @@ models:
|
|||
resolver: true
|
||||
extraFields:
|
||||
FS:
|
||||
type: "*git.kmsign.ru/royalcat/tstor/src/vfs.ResolverFS"
|
||||
type: "*git.kmsign.ru/royalcat/tstor/src/host/vfs.ResolverFS"
|
||||
ArchiveFS:
|
||||
fields:
|
||||
entries:
|
||||
resolver: true
|
||||
extraFields:
|
||||
FS:
|
||||
type: "*git.kmsign.ru/royalcat/tstor/src/vfs.ArchiveFS"
|
||||
type: "*git.kmsign.ru/royalcat/tstor/src/host/vfs.ArchiveFS"
|
||||
|
|
|
@ -18,11 +18,10 @@ import (
|
|||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/ytdlp"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/telemetry"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/go-git/go-billy/v5/osfs"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
|
@ -91,16 +90,14 @@ func run(configPath string) error {
|
|||
}
|
||||
|
||||
sourceFs := osfs.New(conf.SourceDir, osfs.WithBoundOS())
|
||||
tsrv, err := torrent.NewService(sourceFs, conf.TorrentClient)
|
||||
srv, err := torrent.NewService(sourceFs, conf.TorrentClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating service: %w", err)
|
||||
}
|
||||
|
||||
ytdlpsrv := ytdlp.NewService(conf.SourceDir)
|
||||
|
||||
sfs := sources.NewHostedFS(
|
||||
sfs := host.NewHostedFS(
|
||||
vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(sourceFs)),
|
||||
tsrv, ytdlpsrv,
|
||||
srv,
|
||||
)
|
||||
sfs = vfs.WrapLogFS(sfs)
|
||||
|
||||
|
@ -177,7 +174,7 @@ func run(configPath string) error {
|
|||
go func() {
|
||||
logFilename := filepath.Join(conf.Log.Path, "logs")
|
||||
|
||||
err := delivery.New(nil, tsrv, sfs, logFilename, conf)
|
||||
err := delivery.New(nil, srv, sfs, logFilename, conf)
|
||||
if err != nil {
|
||||
log.Error(ctx, "error initializing HTTP server", rlog.Error(err))
|
||||
}
|
||||
|
@ -187,5 +184,5 @@ func run(configPath string) error {
|
|||
signal.Notify(sigChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigChan
|
||||
|
||||
return tsrv.Close(ctx)
|
||||
return srv.Close(ctx)
|
||||
}
|
||||
|
|
11
go.mod
11
go.mod
|
@ -1,6 +1,6 @@
|
|||
module git.kmsign.ru/royalcat/tstor
|
||||
|
||||
go 1.22.3
|
||||
go 1.22.1
|
||||
|
||||
require (
|
||||
github.com/99designs/gqlgen v0.17.45
|
||||
|
@ -12,8 +12,8 @@ require (
|
|||
github.com/anacrolix/torrent v1.55.0
|
||||
github.com/billziss-gh/cgofuse v1.5.0
|
||||
github.com/bodgit/sevenzip v1.5.1
|
||||
github.com/cyphar/filepath-securejoin v0.2.5
|
||||
github.com/dgraph-io/badger/v4 v4.2.0
|
||||
github.com/dgraph-io/ristretto v0.1.1
|
||||
github.com/dustin/go-humanize v1.0.1
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/go-git/go-billy/v5 v5.5.0
|
||||
|
@ -23,7 +23,6 @@ require (
|
|||
github.com/grafana/pyroscope-go v1.1.1
|
||||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90
|
||||
github.com/knadh/koanf/parsers/yaml v0.1.0
|
||||
github.com/knadh/koanf/providers/env v0.1.0
|
||||
github.com/knadh/koanf/providers/file v0.1.0
|
||||
|
@ -31,10 +30,10 @@ require (
|
|||
github.com/knadh/koanf/v2 v2.1.1
|
||||
github.com/labstack/echo-contrib v0.17.1
|
||||
github.com/labstack/echo/v4 v4.12.0
|
||||
github.com/lrstanley/go-ytdlp v0.0.0-20240504025846-c0493251b060
|
||||
github.com/nwaples/rardecode/v2 v2.0.0-beta.2
|
||||
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93
|
||||
github.com/ravilushqa/otelgqlgen v0.15.0
|
||||
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be
|
||||
github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3
|
||||
github.com/royalcat/kv v0.0.0-20240327213417-8cf5696b2389
|
||||
github.com/rs/zerolog v1.32.0
|
||||
|
@ -59,6 +58,7 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/ProtonMail/go-crypto v1.0.0 // indirect
|
||||
github.com/RoaringBitmap/roaring v1.9.3 // indirect
|
||||
github.com/agnivade/levenshtein v1.1.1 // indirect
|
||||
github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect
|
||||
|
@ -87,11 +87,12 @@ require (
|
|||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cespare/xxhash v1.1.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/cloudflare/circl v1.3.8 // indirect
|
||||
github.com/cloudwego/base64x v0.1.4 // indirect
|
||||
github.com/cloudwego/iasm v0.2.0 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||
github.com/cyphar/filepath-securejoin v0.2.5 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
||||
github.com/edsrzf/mmap-go v1.1.0 // indirect
|
||||
github.com/fatih/structs v1.1.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
|
|
23
go.sum
23
go.sum
|
@ -25,6 +25,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
|||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78=
|
||||
github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
|
||||
github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
|
||||
github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI=
|
||||
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
|
||||
|
@ -135,6 +137,7 @@ github.com/bradfitz/iter v0.0.0-20140124041915-454541ec3da2/go.mod h1:PyRFw1Lt2w
|
|||
github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c/go.mod h1:PyRFw1Lt2wKX4ZVSQ2mk+PeDa1rxyObEDlApuIsUKuo=
|
||||
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaqKnf+7Qs6GbEPfd4iMOitWzXJx8=
|
||||
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og=
|
||||
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
|
||||
github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
|
||||
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
|
||||
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
|
||||
|
@ -151,6 +154,9 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
|
|||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
|
||||
github.com/cloudflare/circl v1.3.8 h1:j+V8jJt09PoeMFIu2uh5JUyEaIHTXVOHslFoLNAKqwI=
|
||||
github.com/cloudflare/circl v1.3.8/go.mod h1:PDRU+oXvdD7KCtgKxW95M5Z8BpSCJXQORiZFnBQS5QU=
|
||||
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
|
||||
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
|
||||
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
|
||||
|
@ -344,8 +350,6 @@ github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq
|
|||
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
|
||||
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90 h1:xrtfZokN++5kencK33hn2Kx3Uj8tGnjMEhdt6FMvHD0=
|
||||
github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90/go.mod h1:LEzdaZarZ5aqROlLIwJ4P7h3+4o71008fSy6wpaEB+s=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
|
@ -396,6 +400,8 @@ github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0
|
|||
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
|
||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||
github.com/lrstanley/go-ytdlp v0.0.0-20240504025846-c0493251b060 h1:UOZcZVKXvw5ZcQ/shW/7xonMJYib9n9FKyNs/TAYAKc=
|
||||
github.com/lrstanley/go-ytdlp v0.0.0-20240504025846-c0493251b060/go.mod h1:75ujbafjqiJugIGw4K6o52/p8C0m/kt+DrYwgClXYT4=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
|
@ -528,8 +534,6 @@ github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6po
|
|||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
|
||||
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
|
||||
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkLUsgvVdYO/UOJkzDyPzESfrTqWfM=
|
||||
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI=
|
||||
github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3 h1:1Ow/NUAWFZLghFcdNuyHt5Avb+bEI11qG8ELr9/XmQQ=
|
||||
github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
|
||||
github.com/royalcat/kv v0.0.0-20240327213417-8cf5696b2389 h1:7XbHzr1TOaxs5Y/i9GtTEOOSTzfQ4ESYqF38DVfPkFY=
|
||||
|
@ -673,6 +677,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
|
||||
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
|
||||
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
|
||||
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
|
||||
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
|
||||
|
@ -739,8 +745,10 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
|
|||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
|
||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
|
||||
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
|
||||
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
|
||||
|
@ -788,7 +796,6 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -798,6 +805,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
@ -814,7 +823,9 @@ golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
|||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
|
||||
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
|
||||
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
|
||||
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
|
||||
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
|
||||
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
|
||||
|
@ -829,7 +840,9 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
|||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
)
|
||||
|
||||
type Filesystem interface {
|
||||
|
@ -36,6 +36,16 @@ type Filesystem interface {
|
|||
// UNC path if and only if the first path element is a UNC path.
|
||||
Join(elem ...string) string
|
||||
|
||||
// TempFile creates a new temporary file in the directory dir with a name
|
||||
// beginning with prefix, opens the file for reading and writing, and
|
||||
// returns the resulting *os.File. If dir is the empty string, TempFile
|
||||
// uses the default directory for temporary files (see os.TempDir).
|
||||
// Multiple programs calling TempFile simultaneously will not choose the
|
||||
// same file. The caller can use f.Name() to find the pathname of the file.
|
||||
// It is the caller's responsibility to remove the file when no longer
|
||||
// needed.
|
||||
TempFile(ctx context.Context, dir, prefix string) (File, error)
|
||||
|
||||
// ReadDir reads the directory named by d(irname and returns a list of
|
||||
// directory entries sorted by filename.
|
||||
ReadDir(ctx context.Context, path string) ([]os.FileInfo, error)
|
||||
|
@ -64,38 +74,19 @@ type Filesystem interface {
|
|||
// Root() string
|
||||
}
|
||||
|
||||
type TempFileFS interface {
|
||||
// TempFile creates a new temporary file in the directory dir with a name
|
||||
// beginning with prefix, opens the file for reading and writing, and
|
||||
// returns the resulting *os.File. If dir is the empty string, TempFile
|
||||
// uses the default directory for temporary files (see os.TempDir).
|
||||
// Multiple programs calling TempFile simultaneously will not choose the
|
||||
// same file. The caller can use f.Name() to find the pathname of the file.
|
||||
// It is the caller's responsibility to remove the file when no longer
|
||||
// needed.
|
||||
TempFile(ctx context.Context, dir, prefix string) (File, error)
|
||||
}
|
||||
|
||||
type File interface {
|
||||
// Name returns the name of the file as presented to Open.
|
||||
Name() string
|
||||
ctxio.Writer
|
||||
ctxio.WriterAt
|
||||
ctxio.Reader
|
||||
ctxio.ReaderAt
|
||||
io.Seeker
|
||||
ctxio.Closer
|
||||
}
|
||||
|
||||
type LockFile interface {
|
||||
// Lock locks the file like e.g. flock. It protects against access from
|
||||
// other processes.
|
||||
Lock() error
|
||||
// Unlock unlocks the file.
|
||||
Unlock() error
|
||||
}
|
||||
|
||||
type TruncateFile interface {
|
||||
// Truncate the file.
|
||||
Truncate(ctx context.Context, size int64) error
|
||||
}
|
||||
|
|
|
@ -164,12 +164,3 @@ func (m *wrapFile) Unlock() error {
|
|||
func (m *wrapFile) Write(ctx context.Context, p []byte) (n int, err error) {
|
||||
return m.File.Write(p)
|
||||
}
|
||||
|
||||
// WriteAt implements File.
|
||||
func (m *wrapFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
_, err = m.File.Seek(off, 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return m.File.Write(p)
|
||||
}
|
||||
|
|
|
@ -1,359 +0,0 @@
|
|||
package ctxbilly
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
securejoin "github.com/cyphar/filepath-securejoin"
|
||||
"github.com/iceber/iouring-go"
|
||||
)
|
||||
|
||||
func NewURingFS() (*UringFS, error) {
|
||||
ur, err := iouring.New(64, iouring.WithAsync())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &UringFS{
|
||||
ur: ur,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ Filesystem = (*UringFS)(nil)
|
||||
|
||||
const (
|
||||
defaultDirectoryMode = 0o755
|
||||
defaultCreateMode = 0o666
|
||||
)
|
||||
|
||||
// UringFS is a fs implementation based on the OS filesystem which is bound to
|
||||
// a base dir.
|
||||
// Prefer this fs implementation over ChrootOS.
|
||||
//
|
||||
// Behaviours of note:
|
||||
// 1. Read and write operations can only be directed to files which descends
|
||||
// from the base dir.
|
||||
// 2. Symlinks don't have their targets modified, and therefore can point
|
||||
// to locations outside the base dir or to non-existent paths.
|
||||
// 3. Readlink and Lstat ensures that the link file is located within the base
|
||||
// dir, evaluating any symlinks that file or base dir may contain.
|
||||
type UringFS struct {
|
||||
ur *iouring.IOURing
|
||||
baseDir string
|
||||
}
|
||||
|
||||
func newBoundOS(d string) *UringFS {
|
||||
return &UringFS{baseDir: d}
|
||||
}
|
||||
|
||||
func (fs *UringFS) Create(ctx context.Context, filename string) (File, error) {
|
||||
return fs.OpenFile(ctx, filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultCreateMode)
|
||||
}
|
||||
|
||||
func (fs *UringFS) OpenFile(ctx context.Context, filename string, flag int, perm os.FileMode) (File, error) {
|
||||
fn, err := fs.abs(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(fn, flag, perm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newFile(fs.ur, f)
|
||||
}
|
||||
|
||||
func (fs *UringFS) ReadDir(ctx context.Context, path string) ([]os.FileInfo, error) {
|
||||
dir, err := fs.abs(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
infos := make([]os.FileInfo, 0, len(entries))
|
||||
for _, v := range entries {
|
||||
info, err := v.Info()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
infos = append(infos, info)
|
||||
}
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
func (fs *UringFS) Rename(ctx context.Context, from, to string) error {
|
||||
f, err := fs.abs(from)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t, err := fs.abs(to)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// MkdirAll for target name.
|
||||
if err := fs.createDir(t); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.Rename(f, t)
|
||||
}
|
||||
|
||||
func (fs *UringFS) MkdirAll(ctx context.Context, path string, perm os.FileMode) error {
|
||||
dir, err := fs.abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.MkdirAll(dir, perm)
|
||||
}
|
||||
|
||||
func (fs *UringFS) Open(ctx context.Context, filename string) (File, error) {
|
||||
return fs.OpenFile(ctx, filename, os.O_RDONLY, 0)
|
||||
}
|
||||
|
||||
func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) {
|
||||
filename, err := fs.abs(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return os.Stat(filename)
|
||||
}
|
||||
|
||||
func (fs *UringFS) Remove(ctx context.Context, filename string) error {
|
||||
fn, err := fs.abs(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Remove(fn)
|
||||
}
|
||||
|
||||
func (fs *UringFS) Join(elem ...string) string {
|
||||
return filepath.Join(elem...)
|
||||
}
|
||||
|
||||
func (fs *UringFS) RemoveAll(path string) error {
|
||||
dir, err := fs.abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.RemoveAll(dir)
|
||||
}
|
||||
|
||||
func (fs *UringFS) Symlink(ctx context.Context, target, link string) error {
|
||||
ln, err := fs.abs(link)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// MkdirAll for containing dir.
|
||||
if err := fs.createDir(ln); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Symlink(target, ln)
|
||||
}
|
||||
|
||||
func (fs *UringFS) Lstat(ctx context.Context, filename string) (os.FileInfo, error) {
|
||||
filename = filepath.Clean(filename)
|
||||
if !filepath.IsAbs(filename) {
|
||||
filename = filepath.Join(fs.baseDir, filename)
|
||||
}
|
||||
if ok, err := fs.insideBaseDirEval(filename); !ok {
|
||||
return nil, err
|
||||
}
|
||||
return os.Lstat(filename)
|
||||
}
|
||||
|
||||
func (fs *UringFS) Readlink(ctx context.Context, link string) (string, error) {
|
||||
if !filepath.IsAbs(link) {
|
||||
link = filepath.Clean(filepath.Join(fs.baseDir, link))
|
||||
}
|
||||
if ok, err := fs.insideBaseDirEval(link); !ok {
|
||||
return "", err
|
||||
}
|
||||
return os.Readlink(link)
|
||||
}
|
||||
|
||||
// Chroot returns a new OS filesystem, with the base dir set to the
|
||||
// result of joining the provided path with the underlying base dir.
|
||||
// func (fs *UringFS) Chroot(path string) (Filesystem, error) {
|
||||
// joined, err := securejoin.SecureJoin(fs.baseDir, path)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return newBoundOS(joined), nil
|
||||
// }
|
||||
|
||||
// Root returns the current base dir of the billy.Filesystem.
|
||||
// This is required in order for this implementation to be a drop-in
|
||||
// replacement for other upstream implementations (e.g. memory and osfs).
|
||||
func (fs *UringFS) Root() string {
|
||||
return fs.baseDir
|
||||
}
|
||||
|
||||
func (fs *UringFS) createDir(fullpath string) error {
|
||||
dir := filepath.Dir(fullpath)
|
||||
if dir != "." {
|
||||
if err := os.MkdirAll(dir, defaultDirectoryMode); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// abs transforms filename to an absolute path, taking into account the base dir.
|
||||
// Relative paths won't be allowed to ascend the base dir, so `../file` will become
|
||||
// `/working-dir/file`.
|
||||
//
|
||||
// Note that if filename is a symlink, the returned address will be the target of the
|
||||
// symlink.
|
||||
func (fs *UringFS) abs(filename string) (string, error) {
|
||||
if filename == fs.baseDir {
|
||||
filename = string(filepath.Separator)
|
||||
}
|
||||
|
||||
path, err := securejoin.SecureJoin(fs.baseDir, filename)
|
||||
if err != nil {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// insideBaseDirEval checks whether filename is contained within
|
||||
// a dir that is within the fs.baseDir, by first evaluating any symlinks
|
||||
// that either filename or fs.baseDir may contain.
|
||||
func (fs *UringFS) insideBaseDirEval(filename string) (bool, error) {
|
||||
dir, err := filepath.EvalSymlinks(filepath.Dir(filename))
|
||||
if dir == "" || os.IsNotExist(err) {
|
||||
dir = filepath.Dir(filename)
|
||||
}
|
||||
wd, err := filepath.EvalSymlinks(fs.baseDir)
|
||||
if wd == "" || os.IsNotExist(err) {
|
||||
wd = fs.baseDir
|
||||
}
|
||||
if filename != wd && dir != wd && !strings.HasPrefix(dir, wd+string(filepath.Separator)) {
|
||||
return false, fmt.Errorf("path outside base dir")
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func newFile(fsur *iouring.IOURing, f *os.File) (*URingFile, error) {
|
||||
ur, err := iouring.New(64, iouring.WithAttachWQ(fsur))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &URingFile{
|
||||
ur: ur,
|
||||
f: f,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type URingFile struct {
|
||||
ur *iouring.IOURing
|
||||
f *os.File
|
||||
}
|
||||
|
||||
// Close implements File.
|
||||
func (o *URingFile) Close(ctx context.Context) error {
|
||||
return errors.Join(o.ur.UnregisterFile(o.f), o.Close(ctx))
|
||||
}
|
||||
|
||||
// Name implements File.
|
||||
func (o *URingFile) Name() string {
|
||||
return o.f.Name()
|
||||
}
|
||||
|
||||
// Read implements File.
|
||||
func (o *URingFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
req, err := o.ur.Read(o.f, p, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer req.Cancel()
|
||||
|
||||
select {
|
||||
case <-req.Done():
|
||||
return req.GetRes()
|
||||
case <-ctx.Done():
|
||||
req.Cancel()
|
||||
<-req.Done()
|
||||
return req.GetRes()
|
||||
}
|
||||
}
|
||||
|
||||
// ReadAt implements File.
|
||||
func (o *URingFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
req, err := o.ur.Pread(o.f, p, uint64(off), nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer req.Cancel()
|
||||
|
||||
select {
|
||||
case <-req.Done():
|
||||
return req.GetRes()
|
||||
case <-ctx.Done():
|
||||
req.Cancel()
|
||||
<-req.Done()
|
||||
return req.GetRes()
|
||||
}
|
||||
}
|
||||
|
||||
// Write implements File.
|
||||
func (o *URingFile) Write(ctx context.Context, p []byte) (n int, err error) {
|
||||
req, err := o.ur.Write(o.f, p, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer req.Cancel()
|
||||
|
||||
select {
|
||||
case <-req.Done():
|
||||
return req.GetRes()
|
||||
case <-ctx.Done():
|
||||
req.Cancel()
|
||||
<-req.Done()
|
||||
return req.GetRes()
|
||||
}
|
||||
}
|
||||
|
||||
// WriteAt implements File.
|
||||
func (o *URingFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
req, err := o.ur.Pwrite(o.f, p, uint64(off), nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer req.Cancel()
|
||||
|
||||
select {
|
||||
case <-req.Done():
|
||||
return req.GetRes()
|
||||
case <-ctx.Done():
|
||||
req.Cancel()
|
||||
<-req.Done()
|
||||
return req.GetRes()
|
||||
}
|
||||
}
|
||||
|
||||
// Seek implements File.
|
||||
func (o *URingFile) Seek(offset int64, whence int) (int64, error) {
|
||||
return o.f.Seek(offset, whence)
|
||||
}
|
||||
|
||||
// Truncate implements File.
|
||||
func (o *URingFile) Truncate(ctx context.Context, size int64) error {
|
||||
return o.f.Truncate(size)
|
||||
}
|
||||
|
||||
var _ File = (*URingFile)(nil)
|
|
@ -1,34 +1,26 @@
|
|||
package ioutils
|
||||
package ctxio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
type FileReader interface {
|
||||
ctxio.ReaderAt
|
||||
ctxio.Reader
|
||||
ctxio.Closer
|
||||
}
|
||||
|
||||
type CacheReader struct {
|
||||
m sync.Mutex
|
||||
|
||||
fo int64
|
||||
fr *FileBuffer
|
||||
to int64
|
||||
tr ctxio.Reader
|
||||
tr Reader
|
||||
}
|
||||
|
||||
var _ FileReader = (*CacheReader)(nil)
|
||||
|
||||
func NewCacheReader(r ctxio.Reader) (FileReader, error) {
|
||||
func NewCacheReader(r Reader) (FileReader, error) {
|
||||
fr := NewFileBuffer(nil)
|
||||
tr := ctxio.TeeReader(r, fr)
|
||||
tr := TeeReader(r, fr)
|
||||
return &CacheReader{fr: fr, tr: tr}, nil
|
||||
}
|
||||
|
||||
|
@ -38,7 +30,7 @@ func (dtr *CacheReader) ReadAt(ctx context.Context, p []byte, off int64) (int, e
|
|||
tb := off + int64(len(p))
|
||||
|
||||
if tb > dtr.fo {
|
||||
w, err := ctxio.CopyN(ctx, ctxio.Discard, dtr.tr, tb-dtr.fo)
|
||||
w, err := CopyN(ctx, Discard, dtr.tr, tb-dtr.fo)
|
||||
dtr.to += w
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, err
|
||||
|
@ -63,7 +55,7 @@ func (dtr *CacheReader) Close(ctx context.Context) error {
|
|||
frcloser := dtr.fr.Close(ctx)
|
||||
|
||||
var closeerr error
|
||||
if rc, ok := dtr.tr.(ctxio.ReadCloser); ok {
|
||||
if rc, ok := dtr.tr.(ReadCloser); ok {
|
||||
closeerr = rc.Close(ctx)
|
||||
}
|
||||
|
89
pkg/ctxio/copy.go
Normal file
89
pkg/ctxio/copy.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package ctxio
|
||||
|
||||
// // CopyN copies n bytes (or until an error) from src to dst.
|
||||
// // It returns the number of bytes copied and the earliest
|
||||
// // error encountered while copying.
|
||||
// // On return, written == n if and only if err == nil.
|
||||
// //
|
||||
// // If dst implements [ReaderFrom], the copy is implemented using it.
|
||||
// func CopyN(ctx context.Context, dst Writer, src Reader, n int64) (written int64, err error) {
|
||||
// written, err = Copy(ctx, dst, LimitReader(src, n))
|
||||
// if written == n {
|
||||
// return n, nil
|
||||
// }
|
||||
// if written < n && err == nil {
|
||||
// // src stopped early; must have been EOF.
|
||||
// err = io.EOF
|
||||
// }
|
||||
|
||||
// return
|
||||
// }
|
||||
|
||||
// // Copy copies from src to dst until either EOF is reached
|
||||
// // on src or an error occurs. It returns the number of bytes
|
||||
// // copied and the first error encountered while copying, if any.
|
||||
// //
|
||||
// // A successful Copy returns err == nil, not err == EOF.
|
||||
// // Because Copy is defined to read from src until EOF, it does
|
||||
// // not treat an EOF from Read as an error to be reported.
|
||||
// //
|
||||
// // If src implements [WriterTo],
|
||||
// // the copy is implemented by calling src.WriteTo(dst).
|
||||
// // Otherwise, if dst implements [ReaderFrom],
|
||||
// // the copy is implemented by calling dst.ReadFrom(src).
|
||||
// func Copy(ctx context.Context, dst Writer, src Reader) (written int64, err error) {
|
||||
// return copyBuffer(ctx, dst, src, nil)
|
||||
// }
|
||||
|
||||
// // copyBuffer is the actual implementation of Copy and CopyBuffer.
|
||||
// // if buf is nil, one is allocated.
|
||||
// func copyBuffer(ctx context.Context, dst Writer, src Reader, buf []byte) (written int64, err error) {
|
||||
// // If the reader has a WriteTo method, use it to do the copy.
|
||||
// // Avoids an allocation and a copy.
|
||||
// if wt, ok := src.(WriterTo); ok {
|
||||
// return wt.WriteTo(dst)
|
||||
// }
|
||||
// // Similarly, if the writer has a ReadFrom method, use it to do the copy.
|
||||
// if rt, ok := dst.(ReaderFrom); ok {
|
||||
// return rt.ReadFrom(src)
|
||||
// }
|
||||
// if buf == nil {
|
||||
// size := 32 * 1024
|
||||
// if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
|
||||
// if l.N < 1 {
|
||||
// size = 1
|
||||
// } else {
|
||||
// size = int(l.N)
|
||||
// }
|
||||
// }
|
||||
// buf = make([]byte, size)
|
||||
// }
|
||||
// for {
|
||||
// nr, er := src.Read(ctx, buf)
|
||||
// if nr > 0 {
|
||||
// nw, ew := dst.Write(ctx, buf[0:nr])
|
||||
// if nw < 0 || nr < nw {
|
||||
// nw = 0
|
||||
// if ew == nil {
|
||||
// ew = errInvalidWrite
|
||||
// }
|
||||
// }
|
||||
// written += int64(nw)
|
||||
// if ew != nil {
|
||||
// err = ew
|
||||
// break
|
||||
// }
|
||||
// if nr != nw {
|
||||
// err = io.ErrShortWrite
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// if er != nil {
|
||||
// if er != io.EOF {
|
||||
// err = er
|
||||
// }
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// return written, err
|
||||
// }
|
|
@ -1,12 +1,10 @@
|
|||
package ioutils
|
||||
package ctxio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
type DiskCacheReader struct {
|
||||
|
@ -15,14 +13,14 @@ type DiskCacheReader struct {
|
|||
fo int64
|
||||
fr *os.File
|
||||
to int64
|
||||
tr ctxio.Reader
|
||||
tr Reader
|
||||
}
|
||||
|
||||
var _ ctxio.ReaderAt = (*DiskCacheReader)(nil)
|
||||
var _ ctxio.Reader = (*DiskCacheReader)(nil)
|
||||
var _ ctxio.Closer = (*DiskCacheReader)(nil)
|
||||
var _ ReaderAt = (*DiskCacheReader)(nil)
|
||||
var _ Reader = (*DiskCacheReader)(nil)
|
||||
var _ Closer = (*DiskCacheReader)(nil)
|
||||
|
||||
func NewDiskCacheReader(r ctxio.Reader) (*DiskCacheReader, error) {
|
||||
func NewDiskCacheReader(r Reader) (*DiskCacheReader, error) {
|
||||
tempDir, err := os.MkdirTemp("/tmp", "tstor")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -32,7 +30,7 @@ func NewDiskCacheReader(r ctxio.Reader) (*DiskCacheReader, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
tr := ctxio.TeeReader(r, ctxio.WrapIoWriter(fr))
|
||||
tr := TeeReader(r, WrapIoWriter(fr))
|
||||
return &DiskCacheReader{fr: fr, tr: tr}, nil
|
||||
}
|
||||
|
||||
|
@ -42,7 +40,7 @@ func (dtr *DiskCacheReader) ReadAt(ctx context.Context, p []byte, off int64) (in
|
|||
tb := off + int64(len(p))
|
||||
|
||||
if tb > dtr.fo {
|
||||
w, err := ctxio.CopyN(ctx, ctxio.Discard, dtr.tr, tb-dtr.fo)
|
||||
w, err := CopyN(ctx, Discard, dtr.tr, tb-dtr.fo)
|
||||
dtr.to += w
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, err
|
|
@ -1,4 +1,4 @@
|
|||
package ioutils
|
||||
package ctxio
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -6,8 +6,6 @@ import (
|
|||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
// FileBuffer implements interfaces implemented by files.
|
||||
|
@ -22,7 +20,7 @@ type FileBuffer struct {
|
|||
}
|
||||
|
||||
var _ FileReader = (*FileBuffer)(nil)
|
||||
var _ ctxio.Writer = (*FileBuffer)(nil)
|
||||
var _ Writer = (*FileBuffer)(nil)
|
||||
|
||||
// NewFileBuffer returns a new populated Buffer
|
||||
func NewFileBuffer(b []byte) *FileBuffer {
|
||||
|
@ -32,8 +30,8 @@ func NewFileBuffer(b []byte) *FileBuffer {
|
|||
// NewFileBufferFromReader is a convenience method that returns a new populated Buffer
|
||||
// whose contents are sourced from a supplied reader by loading it entirely
|
||||
// into memory.
|
||||
func NewFileBufferFromReader(ctx context.Context, reader ctxio.Reader) (*FileBuffer, error) {
|
||||
data, err := ctxio.ReadAll(ctx, reader)
|
||||
func NewFileBufferFromReader(ctx context.Context, reader Reader) (*FileBuffer, error) {
|
||||
data, err := ReadAll(ctx, reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
663
pkg/ctxio/io.go
Normal file
663
pkg/ctxio/io.go
Normal file
|
@ -0,0 +1,663 @@
|
|||
package ctxio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Seek whence values.
|
||||
const (
|
||||
SeekStart = io.SeekStart // seek relative to the origin of the file
|
||||
SeekCurrent = io.SeekCurrent // seek relative to the current offset
|
||||
SeekEnd = io.SeekEnd // seek relative to the end
|
||||
)
|
||||
|
||||
// ErrShortWrite means that a write accepted fewer bytes than requested
|
||||
// but failed to return an explicit error.
|
||||
var ErrShortWrite = io.ErrShortWrite
|
||||
|
||||
// errInvalidWrite means that a write returned an impossible count.
|
||||
var errInvalidWrite = errors.New("invalid write result")
|
||||
|
||||
// ErrShortBuffer means that a read required a longer buffer than was provided.
|
||||
var ErrShortBuffer = io.ErrShortBuffer
|
||||
|
||||
// EOF is the error returned by Read when no more input is available.
|
||||
// (Read must return EOF itself, not an error wrapping EOF,
|
||||
// because callers will test for EOF using ==.)
|
||||
// Functions should return EOF only to signal a graceful end of input.
|
||||
// If the EOF occurs unexpectedly in a structured data stream,
|
||||
// the appropriate error is either [ErrUnexpectedEOF] or some other error
|
||||
// giving more detail.
|
||||
var EOF = io.EOF
|
||||
|
||||
// ErrUnexpectedEOF means that EOF was encountered in the
|
||||
// middle of reading a fixed-size block or data structure.
|
||||
var ErrUnexpectedEOF = io.ErrUnexpectedEOF
|
||||
|
||||
// ErrNoProgress is returned by some clients of a [Reader] when
|
||||
// many calls to Read have failed to return any data or error,
|
||||
// usually the sign of a broken [Reader] implementation.
|
||||
var ErrNoProgress = io.ErrNoProgress
|
||||
|
||||
// Reader is the interface that wraps the basic Read method.
|
||||
//
|
||||
// Read reads up to len(p) bytes into p. It returns the number of bytes
|
||||
// read (0 <= n <= len(p)) and any error encountered. Even if Read
|
||||
// returns n < len(p), it may use all of p as scratch space during the call.
|
||||
// If some data is available but not len(p) bytes, Read conventionally
|
||||
// returns what is available instead of waiting for more.
|
||||
//
|
||||
// When Read encounters an error or end-of-file condition after
|
||||
// successfully reading n > 0 bytes, it returns the number of
|
||||
// bytes read. It may return the (non-nil) error from the same call
|
||||
// or return the error (and n == 0) from a subsequent call.
|
||||
// An instance of this general case is that a Reader returning
|
||||
// a non-zero number of bytes at the end of the input stream may
|
||||
// return either err == EOF or err == nil. The next Read should
|
||||
// return 0, EOF.
|
||||
//
|
||||
// Callers should always process the n > 0 bytes returned before
|
||||
// considering the error err. Doing so correctly handles I/O errors
|
||||
// that happen after reading some bytes and also both of the
|
||||
// allowed EOF behaviors.
|
||||
//
|
||||
// If len(p) == 0, Read should always return n == 0. It may return a
|
||||
// non-nil error if some error condition is known, such as EOF.
|
||||
//
|
||||
// Implementations of Read are discouraged from returning a
|
||||
// zero byte count with a nil error, except when len(p) == 0.
|
||||
// Callers should treat a return of 0 and nil as indicating that
|
||||
// nothing happened; in particular it does not indicate EOF.
|
||||
//
|
||||
// Implementations must not retain p.
|
||||
type Reader interface {
|
||||
Read(ctx context.Context, p []byte) (n int, err error)
|
||||
}
|
||||
|
||||
// Writer is the interface that wraps the basic Write method.
|
||||
//
|
||||
// Write writes len(p) bytes from p to the underlying data stream.
|
||||
// It returns the number of bytes written from p (0 <= n <= len(p))
|
||||
// and any error encountered that caused the write to stop early.
|
||||
// Write must return a non-nil error if it returns n < len(p).
|
||||
// Write must not modify the slice data, even temporarily.
|
||||
//
|
||||
// Implementations must not retain p.
|
||||
type Writer interface {
|
||||
Write(ctx context.Context, p []byte) (n int, err error)
|
||||
}
|
||||
|
||||
// Closer is the interface that wraps the basic Close method.
|
||||
//
|
||||
// The behavior of Close after the first call is undefined.
|
||||
// Specific implementations may document their own behavior.
|
||||
type Closer interface {
|
||||
Close(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Seeker is the interface that wraps the basic Seek method.
|
||||
//
|
||||
// Seek sets the offset for the next Read or Write to offset,
|
||||
// interpreted according to whence:
|
||||
// [SeekStart] means relative to the start of the file,
|
||||
// [SeekCurrent] means relative to the current offset, and
|
||||
// [SeekEnd] means relative to the end
|
||||
// (for example, offset = -2 specifies the penultimate byte of the file).
|
||||
// Seek returns the new offset relative to the start of the
|
||||
// file or an error, if any.
|
||||
//
|
||||
// Seeking to an offset before the start of the file is an error.
|
||||
// Seeking to any positive offset may be allowed, but if the new offset exceeds
|
||||
// the size of the underlying object the behavior of subsequent I/O operations
|
||||
// is implementation-dependent.
|
||||
type Seeker interface {
|
||||
Seek(offset int64, whence int) (int64, error)
|
||||
}
|
||||
|
||||
// ReadWriter is the interface that groups the basic Read and Write methods.
|
||||
type ReadWriter interface {
|
||||
Reader
|
||||
Writer
|
||||
}
|
||||
|
||||
// ReadCloser is the interface that groups the basic Read and Close methods.
|
||||
type ReadCloser interface {
|
||||
Reader
|
||||
Closer
|
||||
}
|
||||
|
||||
// WriteCloser is the interface that groups the basic Write and Close methods.
|
||||
type WriteCloser interface {
|
||||
Writer
|
||||
Closer
|
||||
}
|
||||
|
||||
// ReadWriteCloser is the interface that groups the basic Read, Write and Close methods.
|
||||
type ReadWriteCloser interface {
|
||||
Reader
|
||||
Writer
|
||||
Closer
|
||||
}
|
||||
|
||||
// ReadSeeker is the interface that groups the basic Read and Seek methods.
|
||||
type ReadSeeker interface {
|
||||
Reader
|
||||
Seeker
|
||||
}
|
||||
|
||||
// ReadSeekCloser is the interface that groups the basic Read, Seek and Close
|
||||
// methods.
|
||||
type ReadSeekCloser interface {
|
||||
Reader
|
||||
Seeker
|
||||
Closer
|
||||
}
|
||||
|
||||
// WriteSeeker is the interface that groups the basic Write and Seek methods.
|
||||
type WriteSeeker interface {
|
||||
Writer
|
||||
Seeker
|
||||
}
|
||||
|
||||
// ReadWriteSeeker is the interface that groups the basic Read, Write and Seek methods.
|
||||
type ReadWriteSeeker interface {
|
||||
Reader
|
||||
Writer
|
||||
Seeker
|
||||
}
|
||||
|
||||
// ReaderFrom is the interface that wraps the ReadFrom method.
|
||||
//
|
||||
// ReadFrom reads data from r until EOF or error.
|
||||
// The return value n is the number of bytes read.
|
||||
// Any error except EOF encountered during the read is also returned.
|
||||
//
|
||||
// The [Copy] function uses [ReaderFrom] if available.
|
||||
type ReaderFrom interface {
|
||||
ReadFrom(ctx context.Context, r Reader) (n int64, err error)
|
||||
}
|
||||
|
||||
// WriterTo is the interface that wraps the WriteTo method.
|
||||
//
|
||||
// WriteTo writes data to w until there's no more data to write or
|
||||
// when an error occurs. The return value n is the number of bytes
|
||||
// written. Any error encountered during the write is also returned.
|
||||
//
|
||||
// The Copy function uses WriterTo if available.
|
||||
type WriterTo interface {
|
||||
WriteTo(ctx context.Context, w Writer) (n int64, err error)
|
||||
}
|
||||
|
||||
// ReaderAt is the interface that wraps the basic ReadAt method.
|
||||
//
|
||||
// ReadAt reads len(p) bytes into p starting at offset off in the
|
||||
// underlying input source. It returns the number of bytes
|
||||
// read (0 <= n <= len(p)) and any error encountered.
|
||||
//
|
||||
// When ReadAt returns n < len(p), it returns a non-nil error
|
||||
// explaining why more bytes were not returned. In this respect,
|
||||
// ReadAt is stricter than Read.
|
||||
//
|
||||
// Even if ReadAt returns n < len(p), it may use all of p as scratch
|
||||
// space during the call. If some data is available but not len(p) bytes,
|
||||
// ReadAt blocks until either all the data is available or an error occurs.
|
||||
// In this respect ReadAt is different from Read.
|
||||
//
|
||||
// If the n = len(p) bytes returned by ReadAt are at the end of the
|
||||
// input source, ReadAt may return either err == EOF or err == nil.
|
||||
//
|
||||
// If ReadAt is reading from an input source with a seek offset,
|
||||
// ReadAt should not affect nor be affected by the underlying
|
||||
// seek offset.
|
||||
//
|
||||
// Clients of ReadAt can execute parallel ReadAt calls on the
|
||||
// same input source.
|
||||
//
|
||||
// Implementations must not retain p.
|
||||
type ReaderAt interface {
|
||||
ReadAt(ctx context.Context, p []byte, off int64) (n int, err error)
|
||||
}
|
||||
|
||||
// WriterAt is the interface that wraps the basic WriteAt method.
|
||||
//
|
||||
// WriteAt writes len(p) bytes from p to the underlying data stream
|
||||
// at offset off. It returns the number of bytes written from p (0 <= n <= len(p))
|
||||
// and any error encountered that caused the write to stop early.
|
||||
// WriteAt must return a non-nil error if it returns n < len(p).
|
||||
//
|
||||
// If WriteAt is writing to a destination with a seek offset,
|
||||
// WriteAt should not affect nor be affected by the underlying
|
||||
// seek offset.
|
||||
//
|
||||
// Clients of WriteAt can execute parallel WriteAt calls on the same
|
||||
// destination if the ranges do not overlap.
|
||||
//
|
||||
// Implementations must not retain p.
|
||||
type WriterAt interface {
|
||||
WriteAt(ctx context.Context, p []byte, off int64) (n int, err error)
|
||||
}
|
||||
|
||||
// StringWriter is the interface that wraps the WriteString method.
|
||||
type StringWriter interface {
|
||||
WriteString(s string) (n int, err error)
|
||||
}
|
||||
|
||||
// WriteString writes the contents of the string s to w, which accepts a slice of bytes.
|
||||
// If w implements [StringWriter], [StringWriter.WriteString] is invoked directly.
|
||||
// Otherwise, [Writer.Write] is called exactly once.
|
||||
func WriteString(ctx context.Context, w Writer, s string) (n int, err error) {
|
||||
if sw, ok := w.(StringWriter); ok {
|
||||
return sw.WriteString(s)
|
||||
}
|
||||
return w.Write(ctx, []byte(s))
|
||||
}
|
||||
|
||||
// ReadAtLeast reads from r into buf until it has read at least min bytes.
|
||||
// It returns the number of bytes copied and an error if fewer bytes were read.
|
||||
// The error is EOF only if no bytes were read.
|
||||
// If an EOF happens after reading fewer than min bytes,
|
||||
// ReadAtLeast returns [ErrUnexpectedEOF].
|
||||
// If min is greater than the length of buf, ReadAtLeast returns [ErrShortBuffer].
|
||||
// On return, n >= min if and only if err == nil.
|
||||
// If r returns an error having read at least min bytes, the error is dropped.
|
||||
func ReadAtLeast(ctx context.Context, r Reader, buf []byte, min int) (n int, err error) {
|
||||
if len(buf) < min {
|
||||
return 0, ErrShortBuffer
|
||||
}
|
||||
for n < min && err == nil {
|
||||
var nn int
|
||||
nn, err = r.Read(ctx, buf[n:])
|
||||
n += nn
|
||||
}
|
||||
if n >= min {
|
||||
err = nil
|
||||
} else if n > 0 && err == EOF {
|
||||
err = ErrUnexpectedEOF
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ReadFull reads exactly len(buf) bytes from r into buf.
|
||||
// It returns the number of bytes copied and an error if fewer bytes were read.
|
||||
// The error is EOF only if no bytes were read.
|
||||
// If an EOF happens after reading some but not all the bytes,
|
||||
// ReadFull returns [ErrUnexpectedEOF].
|
||||
// On return, n == len(buf) if and only if err == nil.
|
||||
// If r returns an error having read at least len(buf) bytes, the error is dropped.
|
||||
func ReadFull(ctx context.Context, r Reader, buf []byte) (n int, err error) {
|
||||
return ReadAtLeast(ctx, r, buf, len(buf))
|
||||
}
|
||||
|
||||
// CopyN copies n bytes (or until an error) from src to dst.
|
||||
// It returns the number of bytes copied and the earliest
|
||||
// error encountered while copying.
|
||||
// On return, written == n if and only if err == nil.
|
||||
//
|
||||
// If dst implements [ReaderFrom], the copy is implemented using it.
|
||||
func CopyN(ctx context.Context, dst Writer, src Reader, n int64) (written int64, err error) {
|
||||
written, err = Copy(ctx, dst, LimitReader(src, n))
|
||||
if written == n {
|
||||
return n, nil
|
||||
}
|
||||
if written < n && err == nil {
|
||||
// src stopped early; must have been EOF.
|
||||
err = EOF
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Copy copies from src to dst until either EOF is reached
|
||||
// on src or an error occurs. It returns the number of bytes
|
||||
// copied and the first error encountered while copying, if any.
|
||||
//
|
||||
// A successful Copy returns err == nil, not err == EOF.
|
||||
// Because Copy is defined to read from src until EOF, it does
|
||||
// not treat an EOF from Read as an error to be reported.
|
||||
//
|
||||
// If src implements [WriterTo],
|
||||
// the copy is implemented by calling src.WriteTo(dst).
|
||||
// Otherwise, if dst implements [ReaderFrom],
|
||||
// the copy is implemented by calling dst.ReadFrom(src).
|
||||
func Copy(ctx context.Context, dst Writer, src Reader) (written int64, err error) {
|
||||
return copyBuffer(ctx, dst, src, nil)
|
||||
}
|
||||
|
||||
// CopyBuffer is identical to Copy except that it stages through the
|
||||
// provided buffer (if one is required) rather than allocating a
|
||||
// temporary one. If buf is nil, one is allocated; otherwise if it has
|
||||
// zero length, CopyBuffer panics.
|
||||
//
|
||||
// If either src implements [WriterTo] or dst implements [ReaderFrom],
|
||||
// buf will not be used to perform the copy.
|
||||
func CopyBuffer(ctx context.Context, dst Writer, src Reader, buf []byte) (written int64, err error) {
|
||||
if buf != nil && len(buf) == 0 {
|
||||
panic("empty buffer in CopyBuffer")
|
||||
}
|
||||
return copyBuffer(ctx, dst, src, buf)
|
||||
}
|
||||
|
||||
// copyBuffer is the actual implementation of Copy and CopyBuffer.
|
||||
// if buf is nil, one is allocated.
|
||||
func copyBuffer(ctx context.Context, dst Writer, src Reader, buf []byte) (written int64, err error) {
|
||||
// If the reader has a WriteTo method, use it to do the copy.
|
||||
// Avoids an allocation and a copy.
|
||||
if wt, ok := src.(WriterTo); ok {
|
||||
return wt.WriteTo(ctx, dst)
|
||||
}
|
||||
// Similarly, if the writer has a ReadFrom method, use it to do the copy.
|
||||
if rt, ok := dst.(ReaderFrom); ok {
|
||||
return rt.ReadFrom(ctx, src)
|
||||
}
|
||||
if buf == nil {
|
||||
size := 32 * 1024
|
||||
if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
|
||||
if l.N < 1 {
|
||||
size = 1
|
||||
} else {
|
||||
size = int(l.N)
|
||||
}
|
||||
}
|
||||
buf = make([]byte, size)
|
||||
}
|
||||
for {
|
||||
nr, er := src.Read(ctx, buf)
|
||||
if nr > 0 {
|
||||
nw, ew := dst.Write(ctx, buf[0:nr])
|
||||
if nw < 0 || nr < nw {
|
||||
nw = 0
|
||||
if ew == nil {
|
||||
ew = errInvalidWrite
|
||||
}
|
||||
}
|
||||
written += int64(nw)
|
||||
if ew != nil {
|
||||
err = ew
|
||||
break
|
||||
}
|
||||
if nr != nw {
|
||||
err = ErrShortWrite
|
||||
break
|
||||
}
|
||||
}
|
||||
if er != nil {
|
||||
if er != EOF {
|
||||
err = er
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
return written, err
|
||||
}
|
||||
|
||||
// LimitReader returns a Reader that reads from r
|
||||
// but stops with EOF after n bytes.
|
||||
// The underlying implementation is a *LimitedReader.
|
||||
func LimitReader(r Reader, n int64) Reader { return &LimitedReader{r, n} }
|
||||
|
||||
// A LimitedReader reads from R but limits the amount of
|
||||
// data returned to just N bytes. Each call to Read
|
||||
// updates N to reflect the new amount remaining.
|
||||
// Read returns EOF when N <= 0 or when the underlying R returns EOF.
|
||||
type LimitedReader struct {
|
||||
R Reader // underlying reader
|
||||
N int64 // max bytes remaining
|
||||
}
|
||||
|
||||
func (l *LimitedReader) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
if l.N <= 0 {
|
||||
return 0, EOF
|
||||
}
|
||||
if int64(len(p)) > l.N {
|
||||
p = p[0:l.N]
|
||||
}
|
||||
n, err = l.R.Read(ctx, p)
|
||||
l.N -= int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
// NewSectionReader returns a [SectionReader] that reads from r
|
||||
// starting at offset off and stops with EOF after n bytes.
|
||||
func NewSectionReader(r ReaderAt, off int64, n int64) *SectionReader {
|
||||
var remaining int64
|
||||
const maxint64 = 1<<63 - 1
|
||||
if off <= maxint64-n {
|
||||
remaining = n + off
|
||||
} else {
|
||||
// Overflow, with no way to return error.
|
||||
// Assume we can read up to an offset of 1<<63 - 1.
|
||||
remaining = maxint64
|
||||
}
|
||||
return &SectionReader{r, off, off, remaining, n}
|
||||
}
|
||||
|
||||
// SectionReader implements Read, Seek, and ReadAt on a section
|
||||
// of an underlying [ReaderAt].
|
||||
type SectionReader struct {
|
||||
r ReaderAt // constant after creation
|
||||
base int64 // constant after creation
|
||||
off int64
|
||||
limit int64 // constant after creation
|
||||
n int64 // constant after creation
|
||||
}
|
||||
|
||||
func (s *SectionReader) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
if s.off >= s.limit {
|
||||
return 0, EOF
|
||||
}
|
||||
if max := s.limit - s.off; int64(len(p)) > max {
|
||||
p = p[0:max]
|
||||
}
|
||||
n, err = s.r.ReadAt(ctx, p, s.off)
|
||||
s.off += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
var errWhence = errors.New("Seek: invalid whence")
|
||||
var errOffset = errors.New("Seek: invalid offset")
|
||||
|
||||
func (s *SectionReader) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
default:
|
||||
return 0, errWhence
|
||||
case SeekStart:
|
||||
offset += s.base
|
||||
case SeekCurrent:
|
||||
offset += s.off
|
||||
case SeekEnd:
|
||||
offset += s.limit
|
||||
}
|
||||
if offset < s.base {
|
||||
return 0, errOffset
|
||||
}
|
||||
s.off = offset
|
||||
return offset - s.base, nil
|
||||
}
|
||||
|
||||
func (s *SectionReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
if off < 0 || off >= s.Size() {
|
||||
return 0, EOF
|
||||
}
|
||||
off += s.base
|
||||
if max := s.limit - off; int64(len(p)) > max {
|
||||
p = p[0:max]
|
||||
n, err = s.r.ReadAt(ctx, p, off)
|
||||
if err == nil {
|
||||
err = EOF
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
return s.r.ReadAt(ctx, p, off)
|
||||
}
|
||||
|
||||
// Size returns the size of the section in bytes.
|
||||
func (s *SectionReader) Size() int64 { return s.limit - s.base }
|
||||
|
||||
// Outer returns the underlying [ReaderAt] and offsets for the section.
|
||||
//
|
||||
// The returned values are the same that were passed to [NewSectionReader]
|
||||
// when the [SectionReader] was created.
|
||||
func (s *SectionReader) Outer() (r ReaderAt, off int64, n int64) {
|
||||
return s.r, s.base, s.n
|
||||
}
|
||||
|
||||
// An OffsetWriter maps writes at offset base to offset base+off in the underlying writer.
|
||||
type OffsetWriter struct {
|
||||
w WriterAt
|
||||
base int64 // the original offset
|
||||
off int64 // the current offset
|
||||
}
|
||||
|
||||
// NewOffsetWriter returns an [OffsetWriter] that writes to w
|
||||
// starting at offset off.
|
||||
func NewOffsetWriter(w WriterAt, off int64) *OffsetWriter {
|
||||
return &OffsetWriter{w, off, off}
|
||||
}
|
||||
|
||||
func (o *OffsetWriter) Write(ctx context.Context, p []byte) (n int, err error) {
|
||||
n, err = o.w.WriteAt(ctx, p, o.off)
|
||||
o.off += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
func (o *OffsetWriter) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
if off < 0 {
|
||||
return 0, errOffset
|
||||
}
|
||||
|
||||
off += o.base
|
||||
return o.w.WriteAt(ctx, p, off)
|
||||
}
|
||||
|
||||
func (o *OffsetWriter) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
default:
|
||||
return 0, errWhence
|
||||
case SeekStart:
|
||||
offset += o.base
|
||||
case SeekCurrent:
|
||||
offset += o.off
|
||||
}
|
||||
if offset < o.base {
|
||||
return 0, errOffset
|
||||
}
|
||||
o.off = offset
|
||||
return offset - o.base, nil
|
||||
}
|
||||
|
||||
// TeeReader returns a [Reader] that writes to w what it reads from r.
|
||||
// All reads from r performed through it are matched with
|
||||
// corresponding writes to w. There is no internal buffering -
|
||||
// the write must complete before the read completes.
|
||||
// Any error encountered while writing is reported as a read error.
|
||||
func TeeReader(r Reader, w Writer) Reader {
|
||||
return &teeReader{r, w}
|
||||
}
|
||||
|
||||
type teeReader struct {
|
||||
r Reader
|
||||
w Writer
|
||||
}
|
||||
|
||||
func (t *teeReader) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
n, err = t.r.Read(ctx, p)
|
||||
if n > 0 {
|
||||
if n, err := t.w.Write(ctx, p[:n]); err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Discard is a [Writer] on which all Write calls succeed
|
||||
// without doing anything.
|
||||
var Discard Writer = discard{}
|
||||
|
||||
type discard struct{}
|
||||
|
||||
// discard implements ReaderFrom as an optimization so Copy to
|
||||
// io.Discard can avoid doing unnecessary work.
|
||||
var _ ReaderFrom = discard{}
|
||||
|
||||
func (discard) Write(ctx context.Context, p []byte) (int, error) {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (discard) WriteString(ctx context.Context, s string) (int, error) {
|
||||
return len(s), nil
|
||||
}
|
||||
|
||||
var blackHolePool = sync.Pool{
|
||||
New: func() any {
|
||||
b := make([]byte, 8192)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
func (discard) ReadFrom(ctx context.Context, r Reader) (n int64, err error) {
|
||||
bufp := blackHolePool.Get().(*[]byte)
|
||||
readSize := 0
|
||||
for {
|
||||
readSize, err = r.Read(ctx, *bufp)
|
||||
n += int64(readSize)
|
||||
if err != nil {
|
||||
blackHolePool.Put(bufp)
|
||||
if err == EOF {
|
||||
return n, nil
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NopCloser returns a [ReadCloser] with a no-op Close method wrapping
|
||||
// the provided [Reader] r.
|
||||
// If r implements [WriterTo], the returned [ReadCloser] will implement [WriterTo]
|
||||
// by forwarding calls to r.
|
||||
func NopCloser(r Reader) ReadCloser {
|
||||
if _, ok := r.(WriterTo); ok {
|
||||
return nopCloserWriterTo{r}
|
||||
}
|
||||
return nopCloser{r}
|
||||
}
|
||||
|
||||
type nopCloser struct {
|
||||
Reader
|
||||
}
|
||||
|
||||
func (nopCloser) Close(ctx context.Context) error { return nil }
|
||||
|
||||
type nopCloserWriterTo struct {
|
||||
Reader
|
||||
}
|
||||
|
||||
func (nopCloserWriterTo) Close(ctx context.Context) error { return nil }
|
||||
|
||||
func (c nopCloserWriterTo) WriteTo(ctx context.Context, w Writer) (n int64, err error) {
|
||||
return c.Reader.(WriterTo).WriteTo(ctx, w)
|
||||
}
|
||||
|
||||
// ReadAll reads from r until an error or EOF and returns the data it read.
|
||||
// A successful call returns err == nil, not err == EOF. Because ReadAll is
|
||||
// defined to read from src until EOF, it does not treat an EOF from Read
|
||||
// as an error to be reported.
|
||||
func ReadAll(ctx context.Context, r Reader) ([]byte, error) {
|
||||
b := make([]byte, 0, 512)
|
||||
for {
|
||||
n, err := r.Read(ctx, b[len(b):cap(b)])
|
||||
b = b[:len(b)+n]
|
||||
if err != nil {
|
||||
if err == EOF {
|
||||
err = nil
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
if len(b) == cap(b) {
|
||||
// Add more capacity (let append pick how much).
|
||||
b = append(b, 0)[:len(b)]
|
||||
}
|
||||
}
|
||||
}
|
105
pkg/ctxio/reader.go
Normal file
105
pkg/ctxio/reader.go
Normal file
|
@ -0,0 +1,105 @@
|
|||
package ctxio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
)
|
||||
|
||||
type FileReader interface {
|
||||
Reader
|
||||
ReaderAt
|
||||
Closer
|
||||
}
|
||||
|
||||
type contextReader struct {
|
||||
ctx context.Context
|
||||
r Reader
|
||||
}
|
||||
|
||||
func (r *contextReader) Read(p []byte) (n int, err error) {
|
||||
if r.ctx.Err() != nil {
|
||||
return 0, r.ctx.Err()
|
||||
}
|
||||
|
||||
return r.r.Read(r.ctx, p)
|
||||
}
|
||||
|
||||
func IoReaderAt(ctx context.Context, r ReaderAt) io.ReaderAt {
|
||||
return &contextReaderAt{ctx: ctx, r: r}
|
||||
}
|
||||
|
||||
type contextReaderAt struct {
|
||||
ctx context.Context
|
||||
r ReaderAt
|
||||
}
|
||||
|
||||
func (c *contextReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
if c.ctx.Err() != nil {
|
||||
return 0, c.ctx.Err()
|
||||
}
|
||||
|
||||
return c.r.ReadAt(c.ctx, p, off)
|
||||
}
|
||||
|
||||
func IoReader(ctx context.Context, r Reader) io.Reader {
|
||||
return &contextReader{ctx: ctx, r: r}
|
||||
}
|
||||
|
||||
func WrapIoReader(r io.Reader) Reader {
|
||||
return &wrapReader{r: r}
|
||||
}
|
||||
|
||||
type wrapReader struct {
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
var _ Reader = (*wrapReader)(nil)
|
||||
|
||||
// Read implements Reader.
|
||||
func (c *wrapReader) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
if ctx.Err() != nil {
|
||||
return 0, ctx.Err()
|
||||
}
|
||||
return c.r.Read(p)
|
||||
}
|
||||
|
||||
func WrapIoWriter(w io.Writer) Writer {
|
||||
return &wrapWriter{w: w}
|
||||
}
|
||||
|
||||
type wrapWriter struct {
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
var _ Writer = (*wrapWriter)(nil)
|
||||
|
||||
// Write implements Writer.
|
||||
func (c *wrapWriter) Write(ctx context.Context, p []byte) (n int, err error) {
|
||||
if ctx.Err() != nil {
|
||||
return 0, ctx.Err()
|
||||
}
|
||||
return c.w.Write(p)
|
||||
}
|
||||
|
||||
func WrapIoReadCloser(r io.ReadCloser) ReadCloser {
|
||||
return &wrapReadCloser{r: r}
|
||||
}
|
||||
|
||||
type wrapReadCloser struct {
|
||||
r io.ReadCloser
|
||||
}
|
||||
|
||||
var _ Reader = (*wrapReadCloser)(nil)
|
||||
|
||||
// Read implements Reader.
|
||||
func (c *wrapReadCloser) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
if ctx.Err() != nil {
|
||||
return 0, ctx.Err()
|
||||
}
|
||||
return c.r.Read(p)
|
||||
}
|
||||
|
||||
// Close implements ReadCloser.
|
||||
func (c *wrapReadCloser) Close(ctx context.Context) error {
|
||||
return c.r.Close()
|
||||
}
|
|
@ -1,27 +1,25 @@
|
|||
package ioutils
|
||||
package ctxio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
type ReaderReaderAtWrapper struct {
|
||||
mu sync.Mutex
|
||||
rat ctxio.ReaderAt
|
||||
rat ReaderAt
|
||||
offset int64
|
||||
}
|
||||
|
||||
func NewReaderReaderAtWrapper(rat ctxio.ReaderAt) *ReaderReaderAtWrapper {
|
||||
func NewReaderReaderAtWrapper(rat ReaderAt) *ReaderReaderAtWrapper {
|
||||
return &ReaderReaderAtWrapper{
|
||||
rat: rat,
|
||||
}
|
||||
}
|
||||
|
||||
var _ ctxio.Reader = (*ReaderReaderAtWrapper)(nil)
|
||||
var _ ctxio.ReaderAt = (*ReaderReaderAtWrapper)(nil)
|
||||
var _ ctxio.Closer = (*ReaderReaderAtWrapper)(nil)
|
||||
var _ Reader = (*ReaderReaderAtWrapper)(nil)
|
||||
var _ ReaderAt = (*ReaderReaderAtWrapper)(nil)
|
||||
var _ Closer = (*ReaderReaderAtWrapper)(nil)
|
||||
|
||||
// Read implements Reader.
|
||||
func (r *ReaderReaderAtWrapper) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
|
@ -39,7 +37,7 @@ func (r *ReaderReaderAtWrapper) ReadAt(ctx context.Context, p []byte, off int64)
|
|||
|
||||
// Close implements Closer.
|
||||
func (r *ReaderReaderAtWrapper) Close(ctx context.Context) (err error) {
|
||||
if c, ok := r.rat.(ctxio.Closer); ok {
|
||||
if c, ok := r.rat.(Closer); ok {
|
||||
err = c.Close(ctx)
|
||||
if err != nil {
|
||||
return err
|
|
@ -1,11 +1,9 @@
|
|||
package ioutils
|
||||
package ctxio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
type ioSeekerWrapper struct {
|
||||
|
@ -15,10 +13,10 @@ type ioSeekerWrapper struct {
|
|||
pos int64
|
||||
size int64
|
||||
|
||||
r ctxio.ReaderAt
|
||||
r ReaderAt
|
||||
}
|
||||
|
||||
func WrapIoReadSeeker(ctx context.Context, r ctxio.ReaderAt, size int64) io.ReadSeeker {
|
||||
func WrapIoReadSeeker(ctx context.Context, r ReaderAt, size int64) io.ReadSeeker {
|
||||
return &ioSeekerWrapper{
|
||||
ctx: ctx,
|
||||
r: r,
|
20
pkg/ctxio/teereader.go
Normal file
20
pkg/ctxio/teereader.go
Normal file
|
@ -0,0 +1,20 @@
|
|||
package ctxio
|
||||
|
||||
// func TeeReader(r Reader, w Writer) Reader {
|
||||
// return &teeReader{r, w}
|
||||
// }
|
||||
|
||||
// type teeReader struct {
|
||||
// r Reader
|
||||
// w Writer
|
||||
// }
|
||||
|
||||
// func (t *teeReader) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
// n, err = t.r.Read(ctx, p)
|
||||
// if n > 0 {
|
||||
// if n, err := t.w.Write(ctx, p[:n]); err != nil {
|
||||
// return n, err
|
||||
// }
|
||||
// }
|
||||
// return
|
||||
// }
|
|
@ -6,7 +6,7 @@ import (
|
|||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
)
|
||||
|
||||
// FSStat returns metadata about a file system
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"github.com/anacrolix/missinggo/v2/filecache"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
|
|
@ -3,8 +3,8 @@ package model
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
)
|
||||
|
||||
type FsElem interface {
|
||||
|
|
|
@ -3,7 +3,7 @@ package model
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
atorrent "github.com/anacrolix/torrent"
|
||||
)
|
||||
|
||||
|
|
|
@ -5,8 +5,8 @@ package model
|
|||
import (
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
torrent1 "github.com/anacrolix/torrent"
|
||||
)
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"git.kmsign.ru/royalcat/tstor/pkg/uuid"
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"github.com/99designs/gqlgen/graphql"
|
||||
aih "github.com/anacrolix/torrent/types/infohash"
|
||||
)
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
)
|
||||
|
||||
// Torrents is the resolver for the torrents field.
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package resolver
|
||||
|
||||
import (
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/go-git/go-billy/v5"
|
||||
)
|
||||
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/anacrolix/missinggo/v2/filecache"
|
||||
echopprof "github.com/labstack/echo-contrib/pprof"
|
||||
"github.com/labstack/echo/v4"
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/99designs/gqlgen/graphql"
|
||||
"github.com/99designs/gqlgen/graphql/handler"
|
||||
"github.com/99designs/gqlgen/graphql/handler/extension"
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
"path/filepath"
|
||||
"runtime"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/billziss-gh/cgofuse/fuse"
|
||||
)
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ package fuse
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
)
|
||||
|
||||
type Handler struct{}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/billziss-gh/cgofuse/fuse"
|
||||
)
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -93,7 +93,7 @@ func newHTTPFile(ctx context.Context, f vfs.File, dirContent []os.FileInfo) *htt
|
|||
return &httpFile{
|
||||
f: f,
|
||||
dirContent: dirContent,
|
||||
ReadSeekCloser: ioutils.IoReadSeekCloserWrapper(ctx, f, f.Size()),
|
||||
ReadSeekCloser: ctxio.IoReadSeekCloserWrapper(ctx, f, f.Size()),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,8 +6,8 @@ import (
|
|||
|
||||
nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
|
||||
nfshelper "git.kmsign.ru/royalcat/tstor/pkg/go-nfs/helpers"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/log"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
)
|
||||
|
||||
func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
|
||||
|
|
|
@ -3,7 +3,7 @@ package nfs
|
|||
// import (
|
||||
// "io/fs"
|
||||
|
||||
// "git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
// "git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
// nfsfs "github.com/smallfz/libnfs-go/fs"
|
||||
// )
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
||||
nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/go-git/go-billy/v5"
|
||||
)
|
||||
|
||||
|
@ -199,11 +199,6 @@ func (f *billyFile) Write(ctx context.Context, p []byte) (n int, err error) {
|
|||
return 0, billyErr(nil, vfs.ErrNotImplemented, f.log)
|
||||
}
|
||||
|
||||
// WriteAt implements ctxbilly.File.
|
||||
func (f *billyFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
return 0, billyErr(nil, vfs.ErrNotImplemented, f.log)
|
||||
}
|
||||
|
||||
// Lock implements billy.File.
|
||||
func (*billyFile) Lock() error {
|
||||
return nil // TODO
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"golang.org/x/net/webdav"
|
||||
)
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/webdav"
|
||||
)
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"golang.org/x/net/webdav"
|
||||
)
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"golang.org/x/net/webdav"
|
||||
)
|
||||
|
||||
|
|
51
src/host/controller/sourceddir.go
Normal file
51
src/host/controller/sourceddir.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/lrstanley/go-ytdlp"
|
||||
)
|
||||
|
||||
type SourceUpdater struct {
|
||||
sources []VirtDirSource
|
||||
}
|
||||
|
||||
type SourcedDirSource string
|
||||
|
||||
const (
|
||||
SourcedDirYtDlp SourcedDirSource = "yt-dlp-playlist"
|
||||
)
|
||||
|
||||
type VirtDirSource interface {
|
||||
Source() SourcedDirSource
|
||||
}
|
||||
|
||||
var _ VirtDirSource = (*SourcedDirYtDlpPlaylist)(nil)
|
||||
|
||||
type SourcedDirYtDlpPlaylist struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
func (SourcedDirYtDlpPlaylist) Source() SourcedDirSource {
|
||||
return SourcedDirYtDlp
|
||||
}
|
||||
|
||||
type SDController struct {
|
||||
sources []VirtDirSource
|
||||
}
|
||||
|
||||
func (sd *SourcedDirYtDlpPlaylist) Update(ctx context.Context) error {
|
||||
_, err := ytdlp.Install(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dl := ytdlp.New().PrintJSON()
|
||||
|
||||
_, err = dl.Run(ctx, sd.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
19
src/host/storage.go
Normal file
19
src/host/storage.go
Normal file
|
@ -0,0 +1,19 @@
|
|||
package host
|
||||
|
||||
import (
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
)
|
||||
|
||||
func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Service) vfs.Filesystem {
|
||||
factories := map[string]vfs.FsFactory{
|
||||
".torrent": tsrv.NewTorrentFs,
|
||||
}
|
||||
|
||||
// add default torrent factory for root filesystem
|
||||
for k, v := range vfs.ArchiveFactories {
|
||||
factories[k] = v
|
||||
}
|
||||
|
||||
return vfs.NewResolveFS(sourceFS, factories)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package torrent
|
||||
package tkv
|
||||
|
||||
import (
|
||||
"path"
|
||||
|
@ -8,7 +8,7 @@ import (
|
|||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
|
||||
func New[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
|
||||
dir := path.Join(dbdir, name)
|
||||
store, err = kv.NewBadgerKV[K, V](dir)
|
||||
if err != nil {
|
|
@ -12,7 +12,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/anacrolix/torrent"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
|
@ -13,10 +13,11 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/royalcat/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/tkv"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -31,7 +32,7 @@ import (
|
|||
"github.com/royalcat/kv"
|
||||
)
|
||||
|
||||
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent")
|
||||
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/host/torrent")
|
||||
|
||||
type DirAquire struct {
|
||||
Name string
|
||||
|
@ -98,7 +99,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service,
|
|||
}
|
||||
client.AddDhtNodes(conf.DHTNodes)
|
||||
|
||||
s.dirsAquire, err = NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire")
|
||||
s.dirsAquire, err = tkv.New[string, DirAquire](conf.MetadataFolder, "dir-acquire")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
|
@ -44,7 +44,7 @@ func (a byName) Len() int { return len(a) }
|
|||
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||
|
||||
type TotalTorrentStats struct {
|
||||
type GlobalTorrentStats struct {
|
||||
DownloadedBytes int64 `json:"downloadedBytes"`
|
||||
UploadedBytes int64 `json:"uploadedBytes"`
|
||||
TimePassed float64 `json:"timePassed"`
|
||||
|
@ -117,7 +117,7 @@ func (s *Stats) Stats(path string) (*TorrentStats, error) {
|
|||
return s.stats(now, t, true), nil
|
||||
}
|
||||
|
||||
func (s *Stats) GlobalStats() *TotalTorrentStats {
|
||||
func (s *Stats) GlobalStats() *GlobalTorrentStats {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
|
||||
|
@ -134,7 +134,7 @@ func (s *Stats) GlobalStats() *TotalTorrentStats {
|
|||
timePassed := now.Sub(s.gTime)
|
||||
s.gTime = now
|
||||
|
||||
return &TotalTorrentStats{
|
||||
return &GlobalTorrentStats{
|
||||
DownloadedBytes: totalDownload,
|
||||
UploadedBytes: totalUpload,
|
||||
TimePassed: timePassed.Seconds(),
|
86
src/host/torrent/stats_store.go
Normal file
86
src/host/torrent/stats_store.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/torrent/types/infohash"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/dgraph-io/ristretto/z"
|
||||
)
|
||||
|
||||
type TorrentStat struct {
|
||||
Name string `json:"name"`
|
||||
Hash string `json:"hash"`
|
||||
DownloadedBytes int64 `json:"downloadedBytes"`
|
||||
UploadedBytes int64 `json:"uploadedBytes"`
|
||||
Peers int `json:"peers"`
|
||||
Seeders int `json:"seeders"`
|
||||
PieceChunks []*PieceChunk `json:"pieceChunks"`
|
||||
TotalPieces int `json:"totalPieces"`
|
||||
PieceSize int64 `json:"pieceSize"`
|
||||
}
|
||||
|
||||
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
|
||||
db, err := badger.OpenManaged(
|
||||
badger.
|
||||
DefaultOptions(path.Join(metaDir, "stats-history")).
|
||||
WithNumVersionsToKeep(int(^uint(0) >> 1)), // Infinity
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for n := range time.NewTimer(lifetime / 2).C {
|
||||
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
|
||||
}
|
||||
}()
|
||||
r := &statsStore{
|
||||
db: db,
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
type statsStore struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func (r *statsStore) AddStat(ih infohash.T, stat TorrentStat) error {
|
||||
data, err := json.Marshal(stat)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Set(ih.Bytes(), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) {
|
||||
var stats GlobalTorrentStats
|
||||
stream := r.db.NewStream()
|
||||
stream.SinceTs = uint64(since.Unix())
|
||||
|
||||
var tstat TorrentStat
|
||||
stream.Send = func(buf *z.Buffer) error {
|
||||
err := json.Unmarshal(buf.Bytes(), &tstat)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stats.DownloadedBytes += tstat.DownloadedBytes
|
||||
stats.UploadedBytes += tstat.UploadedBytes
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := stream.Orchestrate(ctx)
|
||||
if err != nil {
|
||||
return stats, err
|
||||
}
|
||||
return stats, nil
|
||||
}
|
|
@ -11,10 +11,9 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
"github.com/bodgit/sevenzip"
|
||||
"github.com/nwaples/rardecode/v2"
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
var ArchiveFactories = map[string]FsFactory{
|
||||
|
@ -139,7 +138,7 @@ func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e
|
|||
|
||||
for p, _ := range afs.files {
|
||||
if strings.HasPrefix(p, filename) {
|
||||
return NewDirInfo(path.Base(filename)), nil
|
||||
return newDirInfo(path.Base(filename)), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -174,7 +173,7 @@ func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archi
|
|||
size: size,
|
||||
af: af,
|
||||
|
||||
buffer: ioutils.NewFileBuffer(nil),
|
||||
buffer: ctxio.NewFileBuffer(nil),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -189,7 +188,7 @@ type archiveFile struct {
|
|||
|
||||
offset int64
|
||||
readen int64
|
||||
buffer *ioutils.FileBuffer
|
||||
buffer *ctxio.FileBuffer
|
||||
}
|
||||
|
||||
// Name implements File.
|
||||
|
@ -351,7 +350,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
|
|||
var _ archiveLoader = RarLoader
|
||||
|
||||
func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||
reader := ctxio.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||
|
||||
r, err := rardecode.NewReader(reader)
|
||||
if err != nil {
|
||||
|
@ -370,7 +369,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
|
|||
|
||||
name := header.Name
|
||||
af := func(ctx context.Context) (io.ReadCloser, error) {
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||
reader := ctxio.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||
r, err := rardecode.NewReader(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
|
@ -7,8 +7,8 @@ import (
|
|||
"io"
|
||||
"testing"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/royalcat/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -46,7 +46,10 @@ func (c *CtxBillyFs) Open(ctx context.Context, filename string) (File, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewCtxBillyFile(info, bf), nil
|
||||
return &CtxBillyFile{
|
||||
info: info,
|
||||
file: bf,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ReadDir implements Filesystem.
|
||||
|
@ -95,13 +98,6 @@ func (c *CtxBillyFs) Unlink(ctx context.Context, filename string) error {
|
|||
return fs.ErrInvalid
|
||||
}
|
||||
|
||||
func NewCtxBillyFile(info fs.FileInfo, bf ctxbilly.File) *CtxBillyFile {
|
||||
return &CtxBillyFile{
|
||||
info: info,
|
||||
file: bf,
|
||||
}
|
||||
}
|
||||
|
||||
var _ File = (*CtxBillyFile)(nil)
|
||||
|
||||
type CtxBillyFile struct {
|
|
@ -25,7 +25,7 @@ func (d *dirFile) Close(ctx context.Context) error {
|
|||
|
||||
// Info implements File.
|
||||
func (d *dirFile) Info() (fs.FileInfo, error) {
|
||||
return NewDirInfo(d.name), nil
|
||||
return newDirInfo(d.name), nil
|
||||
}
|
||||
|
||||
// IsDir implements File.
|
|
@ -24,6 +24,16 @@ func (d *DummyFs) Mode() fs.FileMode {
|
|||
return fs.ModeDir
|
||||
}
|
||||
|
||||
// Size implements Filesystem.
|
||||
func (d *DummyFs) Size() int64 {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Sys implements Filesystem.
|
||||
func (d *DummyFs) Sys() any {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// FsName implements Filesystem.
|
||||
func (d *DummyFs) FsName() string {
|
||||
return "dummyfs"
|
||||
|
@ -55,7 +65,7 @@ func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, erro
|
|||
|
||||
// Info implements Filesystem.
|
||||
func (d *DummyFs) Info() (fs.FileInfo, error) {
|
||||
return NewDirInfo(d.name), nil
|
||||
return newDirInfo(d.name), nil
|
||||
}
|
||||
|
||||
// IsDir implements Filesystem.
|
|
@ -7,7 +7,7 @@ import (
|
|||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
|
@ -24,7 +24,7 @@ type File interface {
|
|||
|
||||
var ErrNotImplemented = errors.New("not implemented")
|
||||
|
||||
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/vfs")
|
||||
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/host/vfs")
|
||||
|
||||
type Filesystem interface {
|
||||
// Open opens the named file for reading. If successful, methods on the
|
||||
|
@ -55,7 +55,7 @@ type fileInfo struct {
|
|||
var _ fs.FileInfo = &fileInfo{}
|
||||
var _ fs.DirEntry = &fileInfo{}
|
||||
|
||||
func NewDirInfo(name string) *fileInfo {
|
||||
func newDirInfo(name string) *fileInfo {
|
||||
return &fileInfo{
|
||||
name: path.Base(name),
|
||||
size: 0,
|
|
@ -29,7 +29,7 @@ func TestDirInfo(t *testing.T) {
|
|||
|
||||
require := require.New(t)
|
||||
|
||||
fi := NewDirInfo("abc/name")
|
||||
fi := newDirInfo("abc/name")
|
||||
|
||||
require.True(fi.IsDir())
|
||||
require.Equal("name", fi.Name())
|
|
@ -42,7 +42,7 @@ func (fs *MemoryFs) FsName() string {
|
|||
|
||||
// Info implements Filesystem.
|
||||
func (fs *MemoryFs) Info() (fs.FileInfo, error) {
|
||||
return NewDirInfo(fs.name), nil
|
||||
return newDirInfo(fs.name), nil
|
||||
}
|
||||
|
||||
// IsDir implements Filesystem.
|
|
@ -17,7 +17,7 @@ var _ Filesystem = (*OsFS)(nil)
|
|||
// Stat implements Filesystem.
|
||||
func (fs *OsFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
|
||||
if path.Clean(filename) == Separator {
|
||||
return NewDirInfo(Separator), nil
|
||||
return newDirInfo(Separator), nil
|
||||
}
|
||||
|
||||
info, err := os.Stat(path.Join(fs.hostDir, filename))
|
||||
|
@ -48,7 +48,7 @@ func (o *OsFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) {
|
|||
|
||||
// Info implements Filesystem.
|
||||
func (fs *OsFS) Info() (fs.FileInfo, error) {
|
||||
return NewDirInfo(fs.Name()), nil
|
||||
return newDirInfo(fs.Name()), nil
|
||||
}
|
||||
|
||||
// IsDir implements Filesystem.
|
|
@ -5,7 +5,7 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -349,7 +349,7 @@ func ListDirFromFiles[F File](m map[string]F, name string) ([]fs.DirEntry, error
|
|||
if len(parts) == 1 {
|
||||
out = append(out, NewFileInfo(parts[0], f.Size()))
|
||||
} else {
|
||||
out = append(out, NewDirInfo(parts[0]))
|
||||
out = append(out, newDirInfo(parts[0]))
|
||||
}
|
||||
|
||||
}
|
|
@ -6,7 +6,7 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
3
src/host/vfs/sourced.go
Normal file
3
src/host/vfs/sourced.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package vfs
|
||||
|
||||
const sorcedDirExt = ".tsvd"
|
|
@ -5,8 +5,8 @@ import (
|
|||
"io"
|
||||
"testing"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -20,7 +20,7 @@ func TestSeekerWrapper(t *testing.T) {
|
|||
|
||||
mf := vfs.NewMemoryFile("text.txt", testData)
|
||||
|
||||
r := ioutils.IoReadSeekCloserWrapper(ctx, mf, mf.Size())
|
||||
r := ctxio.IoReadSeekCloserWrapper(ctx, mf, mf.Size())
|
||||
defer r.Close()
|
||||
|
||||
n, err := r.Seek(6, io.SeekStart)
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type UpdateTask interface{}
|
||||
|
||||
type Source interface {
|
||||
Name() string // unique name within source type
|
||||
SourceType() string
|
||||
Fetch(ctx context.Context, task UpdateTask, dir string) error
|
||||
}
|
||||
|
||||
var sourceTypesRegistry = map[string]reflect.Type{}
|
||||
|
||||
// func RegisterSource[T Source]() {
|
||||
// var s T
|
||||
// t := reflect.TypeOf(s)
|
||||
// if t.Kind() == reflect.Ptr {
|
||||
// RegisterSource[T]()
|
||||
// return
|
||||
// }
|
||||
|
||||
// sourceTypesRegistry[s.SourceType()] = t
|
||||
// }
|
||||
|
||||
type sourceType struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
func parseSource(data []byte) (Source, error) {
|
||||
var sourceType sourceType
|
||||
err := json.Unmarshal(data, &sourceType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
st, ok := sourceTypesRegistry[sourceType.Type]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("source type %s not registred", sourceType.Type)
|
||||
}
|
||||
|
||||
s := reflect.New(st).Interface().(Source)
|
||||
err = json.Unmarshal(data, &s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
|
@ -1,21 +0,0 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/ytdlp"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
)
|
||||
|
||||
func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Service, ytdlpsrv *ytdlp.Service) vfs.Filesystem {
|
||||
factories := map[string]vfs.FsFactory{
|
||||
".torrent": tsrv.NewTorrentFs,
|
||||
".ts-ytdlp": ytdlpsrv.BuildFS,
|
||||
}
|
||||
|
||||
// add default torrent factory for root filesystem
|
||||
for k, v := range vfs.ArchiveFactories {
|
||||
factories[k] = v
|
||||
}
|
||||
|
||||
return vfs.NewResolveFS(sourceFS, factories)
|
||||
}
|
|
@ -1,125 +0,0 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/torrent/types/infohash"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
|
||||
db, err := badger.OpenManaged(
|
||||
badger.
|
||||
DefaultOptions(path.Join(metaDir, "stats-history")).
|
||||
WithNumVersionsToKeep(int(^uint(0) >> 1)), // Infinity
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for n := range time.NewTimer(lifetime / 2).C {
|
||||
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
|
||||
}
|
||||
}()
|
||||
return &statsStore{
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type statsStore struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func (r *statsStore) AddStat(ih infohash.T, t time.Time, stat TorrentStats) error {
|
||||
data, err := json.Marshal(stat)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ts := uint64(t.Unix())
|
||||
|
||||
txn := r.db.NewTransactionAt(ts, false)
|
||||
defer txn.Discard()
|
||||
|
||||
err = txn.Set(ih.Bytes(), data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return txn.CommitAt(ts, nil)
|
||||
}
|
||||
|
||||
func (r *statsStore) ReadTotalStatsHistory(ctx context.Context, since time.Time) ([]TotalTorrentStats, error) {
|
||||
stats := map[time.Time]TotalTorrentStats{}
|
||||
|
||||
err := r.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.AllVersions = true
|
||||
opts.SinceTs = uint64(since.Unix())
|
||||
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
// k := item.Key()
|
||||
var tstat TorrentStats
|
||||
err := item.Value(func(v []byte) error {
|
||||
return json.Unmarshal(v, &tstat)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t := time.Unix(int64(item.Version()), 0)
|
||||
|
||||
if stat, ok := stats[t]; !ok {
|
||||
stats[t] = TotalTorrentStats{
|
||||
DownloadedBytes: tstat.DownloadedBytes,
|
||||
UploadedBytes: stat.DownloadedBytes,
|
||||
}
|
||||
} else {
|
||||
stat.DownloadedBytes += tstat.DownloadedBytes
|
||||
stat.UploadedBytes += tstat.UploadedBytes
|
||||
stats[t] = stat
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return maps.Values(stats), err
|
||||
}
|
||||
|
||||
func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time, ih infohash.T) ([]TorrentStats, error) {
|
||||
var stats map[time.Time]TorrentStats
|
||||
|
||||
err := r.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.AllVersions = true
|
||||
opts.SinceTs = uint64(since.Unix())
|
||||
|
||||
it := txn.NewKeyIterator(ih.Bytes(), opts)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
var tstat TorrentStats
|
||||
err := item.Value(func(v []byte) error {
|
||||
return json.Unmarshal(v, &tstat)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t := time.Unix(int64(item.Version()), 0)
|
||||
|
||||
stats[t] = tstat
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return maps.Values(stats), err
|
||||
}
|
|
@ -1,73 +0,0 @@
|
|||
package ytdlp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/go-git/go-billy/v5/osfs"
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
func NewService(dataDir string) *Service {
|
||||
return &Service{
|
||||
dataDir: dataDir,
|
||||
sources: make(map[string]ytdlpSource, 0),
|
||||
}
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
mu sync.Mutex
|
||||
|
||||
dataDir string
|
||||
sources map[string]ytdlpSource
|
||||
}
|
||||
|
||||
func (c *Service) AddSource(s ytdlpSource) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.sources[s.Name()] = s
|
||||
}
|
||||
|
||||
func (c *Service) sourceDir(s ytdlpSource) string {
|
||||
return path.Join(c.dataDir, s.Name())
|
||||
}
|
||||
|
||||
func (c *Service) Update(ctx context.Context) error {
|
||||
for name, s := range c.sources {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
dir := c.sourceDir(s)
|
||||
err := s.Download(ctx, nil, dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch source %s: %w", name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Service) BuildFS(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
|
||||
data, err := ctxio.ReadAll(ctx, f)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read source file: %w", err)
|
||||
}
|
||||
|
||||
var s ytdlpSource
|
||||
err = json.Unmarshal(data, &s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.AddSource(s)
|
||||
|
||||
downloadFS := ctxbilly.WrapFileSystem(osfs.New(c.sourceDir(s)))
|
||||
|
||||
return newSourceFS(s.Name(), downloadFS, c, s), nil
|
||||
}
|
|
@ -1,69 +0,0 @@
|
|||
package ytdlp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/fs"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
)
|
||||
|
||||
type SourceFS struct {
|
||||
service *Service
|
||||
source ytdlpSource
|
||||
|
||||
fs ctxbilly.Filesystem
|
||||
|
||||
vfs.DefaultFS
|
||||
}
|
||||
|
||||
var _ vfs.Filesystem = (*SourceFS)(nil)
|
||||
|
||||
func newSourceFS(name string, fs ctxbilly.Filesystem, service *Service, source ytdlpSource) *SourceFS {
|
||||
return &SourceFS{
|
||||
fs: fs,
|
||||
service: service,
|
||||
source: source,
|
||||
DefaultFS: vfs.DefaultFS(name),
|
||||
}
|
||||
}
|
||||
|
||||
// Open implements vfs.Filesystem.
|
||||
func (s *SourceFS) Open(ctx context.Context, filename string) (vfs.File, error) {
|
||||
info, err := s.fs.Stat(ctx, filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f, err := s.fs.Open(ctx, filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return vfs.NewCtxBillyFile(info, f), nil
|
||||
}
|
||||
|
||||
// ReadDir implements vfs.Filesystem.
|
||||
func (s *SourceFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
|
||||
infos, err := s.fs.ReadDir(ctx, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entries := make([]fs.DirEntry, 0, len(infos))
|
||||
for _, info := range infos {
|
||||
entries = append(entries, vfs.NewFileInfo(info.Name(), info.Size()))
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// Stat implements vfs.Filesystem.
|
||||
func (s *SourceFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
|
||||
return s.fs.Stat(ctx, filename)
|
||||
}
|
||||
|
||||
// Unlink implements vfs.Filesystem.
|
||||
func (s *SourceFS) Unlink(ctx context.Context, filename string) error {
|
||||
return vfs.ErrNotImplemented
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
package ytdlp
|
||||
|
||||
import "io"
|
||||
|
||||
type TaskUpdater interface {
|
||||
Output() io.Writer
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
package ytdlp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ytdlp"
|
||||
"github.com/royalcat/ctxprogress"
|
||||
)
|
||||
|
||||
type ytdlpSource struct {
|
||||
Url string `json:"url"`
|
||||
}
|
||||
|
||||
var hasher = sha1.New()
|
||||
|
||||
func (s *ytdlpSource) Name() string {
|
||||
return string(hasher.Sum([]byte(s.Url)))
|
||||
}
|
||||
|
||||
func (s *ytdlpSource) Download(ctx context.Context, task TaskUpdater, dir string) error {
|
||||
client, err := ytdlp.New()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctxprogress.New(ctx)
|
||||
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 0, Total: 2})
|
||||
plst, err := client.Playlist(ctx, s.Url)
|
||||
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 1, Total: 2})
|
||||
ctxprogress.Range(ctx, plst, func(ctx context.Context, _ int, e ytdlp.PlaylistEntry) bool {
|
||||
err = client.Download(ctx, e.Url(), dir)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 2, Total: 2})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
package vfs
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
)
|
||||
|
||||
type DefaultFS string
|
||||
|
||||
// Info implements Filesystem.
|
||||
func (d DefaultFS) Info() (fs.FileInfo, error) {
|
||||
return NewDirInfo(string(d)), nil
|
||||
}
|
||||
|
||||
// IsDir implements Filesystem.
|
||||
func (d DefaultFS) IsDir() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Name implements Filesystem.
|
||||
func (d DefaultFS) Name() string {
|
||||
return string(d)
|
||||
}
|
||||
|
||||
// Type implements Filesystem.
|
||||
func (d *DefaultFS) Type() fs.FileMode {
|
||||
return fs.ModeDir
|
||||
}
|
Loading…
Reference in a new issue