From bd75492b02ecb9044a2a8c2a82a10fd6382d0bc0 Mon Sep 17 00:00:00 2001 From: royalcat Date: Sun, 2 Jun 2024 22:53:33 +0300 Subject: [PATCH] refactor --- .gqlgen.yml | 6 +- cmd/tstor/main.go | 19 +- go.mod | 11 +- go.sum | 23 +- pkg/ctxbilly/fs.go | 31 +- pkg/ctxbilly/mem.go | 9 + pkg/ctxbilly/uring.go | 359 ++++++++++ pkg/ctxio/copy.go | 89 --- pkg/ctxio/io.go | 663 ------------------ pkg/ctxio/reader.go | 105 --- pkg/ctxio/teereader.go | 20 - pkg/go-nfs/filesystem.go | 2 +- pkg/{ctxio => ioutils}/cachereader.go | 20 +- pkg/{ctxio => ioutils}/disk.go | 18 +- pkg/{ctxio => ioutils}/filebuffer.go | 10 +- pkg/{ctxio => ioutils}/readerat.go | 16 +- pkg/{ctxio => ioutils}/seeker.go | 8 +- src/delivery/api.go | 2 +- src/delivery/graphql/model/entry.go | 4 +- src/delivery/graphql/model/mappers.go | 2 +- src/delivery/graphql/model/models_gen.go | 4 +- .../graphql/resolver/mutation.resolvers.go | 2 +- .../graphql/resolver/query.resolvers.go | 2 +- src/delivery/graphql/resolver/resolver.go | 4 +- src/delivery/http.go | 4 +- src/delivery/router.go | 4 +- src/export/fuse/handler.go | 2 +- src/export/fuse/handler_nocgo.go | 2 +- src/export/fuse/mount.go | 2 +- src/export/fuse/mount_test.go | 2 +- src/export/httpfs/httpfs.go | 6 +- src/export/nfs/handler.go | 2 +- src/export/nfs/wrapper-v4.go | 2 +- src/export/nfs/wrapper.go | 7 +- src/export/webdav/fs.go | 2 +- src/export/webdav/fs_test.go | 2 +- src/export/webdav/handler.go | 2 +- src/export/webdav/http.go | 2 +- src/host/controller/sourceddir.go | 51 -- src/host/storage.go | 19 - src/host/vfs/sourced.go | 3 - src/iio/wrapper_test.go | 6 +- src/sources/source.go | 53 ++ src/sources/storage.go | 21 + src/{host => sources}/torrent/client.go | 0 src/{host => sources}/torrent/controller.go | 0 .../torrent/file_mappings.go | 0 src/{host => sources}/torrent/fileitem.go | 0 src/{host => sources}/torrent/fs.go | 2 +- src/{host => sources}/torrent/fs_test.go | 0 src/{host => sources}/torrent/id.go | 0 src/{host => sources}/torrent/infobytes.go | 0 .../tkv/new.go => sources/torrent/kv.go} | 4 +- .../torrent/piece_completion.go | 0 .../torrent/piece_storage.go | 0 src/{host => sources}/torrent/queue.go | 0 src/{host => sources}/torrent/service.go | 9 +- src/{host => sources}/torrent/setup.go | 0 src/{host => sources}/torrent/stats.go | 0 src/{host => sources}/torrent/stats_store.go | 0 src/{host => sources}/torrent/storage.go | 0 src/sources/ytdlp/controller.go | 73 ++ src/sources/ytdlp/fs.go | 69 ++ src/sources/ytdlp/task.go | 7 + src/sources/ytdlp/ytdlp.go | 43 ++ src/{host => }/vfs/archive.go | 13 +- src/{host => }/vfs/archive_test.go | 4 +- src/{host => }/vfs/ctxbillyfs.go | 12 +- src/vfs/default.go | 27 + src/{host => }/vfs/dir.go | 2 +- src/{host => }/vfs/dummy.go | 12 +- src/{host => }/vfs/fs.go | 6 +- src/{host => }/vfs/fs_test.go | 2 +- src/{host => }/vfs/log.go | 0 src/{host => }/vfs/memory.go | 2 +- src/{host => }/vfs/memory_test.go | 0 src/{host => }/vfs/os.go | 4 +- src/{host => }/vfs/os_test.go | 2 +- src/{host => }/vfs/resolver.go | 2 +- src/{host => }/vfs/resolver_test.go | 2 +- src/{host => }/vfs/utils.go | 0 81 files changed, 822 insertions(+), 1098 deletions(-) create mode 100644 pkg/ctxbilly/uring.go delete mode 100644 pkg/ctxio/copy.go delete mode 100644 pkg/ctxio/io.go delete mode 100644 pkg/ctxio/reader.go delete mode 100644 pkg/ctxio/teereader.go rename pkg/{ctxio => ioutils}/cachereader.go (74%) rename pkg/{ctxio => ioutils}/disk.go (73%) rename pkg/{ctxio => ioutils}/filebuffer.go (96%) rename pkg/{ctxio => ioutils}/readerat.go (68%) rename pkg/{ctxio => ioutils}/seeker.go (91%) delete mode 100644 src/host/controller/sourceddir.go delete mode 100644 src/host/storage.go delete mode 100644 src/host/vfs/sourced.go create mode 100644 src/sources/source.go create mode 100644 src/sources/storage.go rename src/{host => sources}/torrent/client.go (100%) rename src/{host => sources}/torrent/controller.go (100%) rename src/{host => sources}/torrent/file_mappings.go (100%) rename src/{host => sources}/torrent/fileitem.go (100%) rename src/{host => sources}/torrent/fs.go (99%) rename src/{host => sources}/torrent/fs_test.go (100%) rename src/{host => sources}/torrent/id.go (100%) rename src/{host => sources}/torrent/infobytes.go (100%) rename src/{host/tkv/new.go => sources/torrent/kv.go} (78%) rename src/{host => sources}/torrent/piece_completion.go (100%) rename src/{host => sources}/torrent/piece_storage.go (100%) rename src/{host => sources}/torrent/queue.go (100%) rename src/{host => sources}/torrent/service.go (97%) rename src/{host => sources}/torrent/setup.go (100%) rename src/{host => sources}/torrent/stats.go (100%) rename src/{host => sources}/torrent/stats_store.go (100%) rename src/{host => sources}/torrent/storage.go (100%) create mode 100644 src/sources/ytdlp/controller.go create mode 100644 src/sources/ytdlp/fs.go create mode 100644 src/sources/ytdlp/task.go create mode 100644 src/sources/ytdlp/ytdlp.go rename src/{host => }/vfs/archive.go (96%) rename src/{host => }/vfs/archive_test.go (96%) rename src/{host => }/vfs/ctxbillyfs.go (96%) create mode 100644 src/vfs/default.go rename src/{host => }/vfs/dir.go (96%) rename src/{host => }/vfs/dummy.go (90%) rename src/{host => }/vfs/fs.go (92%) rename src/{host => }/vfs/fs_test.go (96%) rename src/{host => }/vfs/log.go (100%) rename src/{host => }/vfs/memory.go (98%) rename src/{host => }/vfs/memory_test.go (100%) rename src/{host => }/vfs/os.go (97%) rename src/{host => }/vfs/os_test.go (97%) rename src/{host => }/vfs/resolver.go (99%) rename src/{host => }/vfs/resolver_test.go (99%) rename src/{host => }/vfs/utils.go (100%) diff --git a/.gqlgen.yml b/.gqlgen.yml index 3b25f62..98fae22 100644 --- a/.gqlgen.yml +++ b/.gqlgen.yml @@ -50,7 +50,7 @@ models: Path: type: string FS: - type: "git.kmsign.ru/royalcat/tstor/src/host/vfs.Filesystem" + type: "git.kmsign.ru/royalcat/tstor/src/vfs.Filesystem" TorrentFS: fields: entries: @@ -64,11 +64,11 @@ models: resolver: true extraFields: FS: - type: "*git.kmsign.ru/royalcat/tstor/src/host/vfs.ResolverFS" + type: "*git.kmsign.ru/royalcat/tstor/src/vfs.ResolverFS" ArchiveFS: fields: entries: resolver: true extraFields: FS: - type: "*git.kmsign.ru/royalcat/tstor/src/host/vfs.ArchiveFS" + type: "*git.kmsign.ru/royalcat/tstor/src/vfs.ArchiveFS" diff --git a/cmd/tstor/main.go b/cmd/tstor/main.go index 92a07ad..c842573 100644 --- a/cmd/tstor/main.go +++ b/cmd/tstor/main.go @@ -18,10 +18,11 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/delivery" - "git.kmsign.ru/royalcat/tstor/src/host" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/sources" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" + "git.kmsign.ru/royalcat/tstor/src/sources/ytdlp" "git.kmsign.ru/royalcat/tstor/src/telemetry" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/go-git/go-billy/v5/osfs" "github.com/urfave/cli/v2" @@ -90,14 +91,16 @@ func run(configPath string) error { } sourceFs := osfs.New(conf.SourceDir, osfs.WithBoundOS()) - srv, err := torrent.NewService(sourceFs, conf.TorrentClient) + tsrv, err := torrent.NewService(sourceFs, conf.TorrentClient) if err != nil { return fmt.Errorf("error creating service: %w", err) } - sfs := host.NewHostedFS( + ytdlpsrv := ytdlp.NewService(conf.SourceDir) + + sfs := sources.NewHostedFS( vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(sourceFs)), - srv, + tsrv, ytdlpsrv, ) sfs = vfs.WrapLogFS(sfs) @@ -174,7 +177,7 @@ func run(configPath string) error { go func() { logFilename := filepath.Join(conf.Log.Path, "logs") - err := delivery.New(nil, srv, sfs, logFilename, conf) + err := delivery.New(nil, tsrv, sfs, logFilename, conf) if err != nil { log.Error(ctx, "error initializing HTTP server", rlog.Error(err)) } @@ -184,5 +187,5 @@ func run(configPath string) error { signal.Notify(sigChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigChan - return srv.Close(ctx) + return tsrv.Close(ctx) } diff --git a/go.mod b/go.mod index 79bb901..edabe37 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module git.kmsign.ru/royalcat/tstor -go 1.22.1 +go 1.22.3 require ( github.com/99designs/gqlgen v0.17.45 @@ -12,8 +12,8 @@ require ( github.com/anacrolix/torrent v1.55.0 github.com/billziss-gh/cgofuse v1.5.0 github.com/bodgit/sevenzip v1.5.1 + github.com/cyphar/filepath-securejoin v0.2.5 github.com/dgraph-io/badger/v4 v4.2.0 - github.com/dgraph-io/ristretto v0.1.1 github.com/dustin/go-humanize v1.0.1 github.com/gin-gonic/gin v1.9.1 github.com/go-git/go-billy/v5 v5.5.0 @@ -23,6 +23,7 @@ require ( github.com/grafana/pyroscope-go v1.1.1 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90 github.com/knadh/koanf/parsers/yaml v0.1.0 github.com/knadh/koanf/providers/env v0.1.0 github.com/knadh/koanf/providers/file v0.1.0 @@ -30,10 +31,10 @@ require ( github.com/knadh/koanf/v2 v2.1.1 github.com/labstack/echo-contrib v0.17.1 github.com/labstack/echo/v4 v4.12.0 - github.com/lrstanley/go-ytdlp v0.0.0-20240504025846-c0493251b060 github.com/nwaples/rardecode/v2 v2.0.0-beta.2 github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 github.com/ravilushqa/otelgqlgen v0.15.0 + github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3 github.com/royalcat/kv v0.0.0-20240327213417-8cf5696b2389 github.com/rs/zerolog v1.32.0 @@ -58,7 +59,6 @@ require ( ) require ( - github.com/ProtonMail/go-crypto v1.0.0 // indirect github.com/RoaringBitmap/roaring v1.9.3 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect @@ -87,12 +87,11 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cloudflare/circl v1.3.8 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect - github.com/cyphar/filepath-securejoin v0.2.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect diff --git a/go.sum b/go.sum index 59a1cc5..d271264 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78= -github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI= github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= @@ -137,7 +135,6 @@ github.com/bradfitz/iter v0.0.0-20140124041915-454541ec3da2/go.mod h1:PyRFw1Lt2w github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c/go.mod h1:PyRFw1Lt2wKX4ZVSQ2mk+PeDa1rxyObEDlApuIsUKuo= github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaqKnf+7Qs6GbEPfd4iMOitWzXJx8= github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og= -github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= @@ -154,9 +151,6 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= -github.com/cloudflare/circl v1.3.8 h1:j+V8jJt09PoeMFIu2uh5JUyEaIHTXVOHslFoLNAKqwI= -github.com/cloudflare/circl v1.3.8/go.mod h1:PDRU+oXvdD7KCtgKxW95M5Z8BpSCJXQORiZFnBQS5QU= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= @@ -350,6 +344,8 @@ github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90 h1:xrtfZokN++5kencK33hn2Kx3Uj8tGnjMEhdt6FMvHD0= +github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90/go.mod h1:LEzdaZarZ5aqROlLIwJ4P7h3+4o71008fSy6wpaEB+s= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -400,8 +396,6 @@ github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0 github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= -github.com/lrstanley/go-ytdlp v0.0.0-20240504025846-c0493251b060 h1:UOZcZVKXvw5ZcQ/shW/7xonMJYib9n9FKyNs/TAYAKc= -github.com/lrstanley/go-ytdlp v0.0.0-20240504025846-c0493251b060/go.mod h1:75ujbafjqiJugIGw4K6o52/p8C0m/kt+DrYwgClXYT4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -534,6 +528,8 @@ github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6po github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkLUsgvVdYO/UOJkzDyPzESfrTqWfM= +github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI= github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3 h1:1Ow/NUAWFZLghFcdNuyHt5Avb+bEI11qG8ELr9/XmQQ= github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA= github.com/royalcat/kv v0.0.0-20240327213417-8cf5696b2389 h1:7XbHzr1TOaxs5Y/i9GtTEOOSTzfQ4ESYqF38DVfPkFY= @@ -677,8 +673,6 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= @@ -745,10 +739,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= @@ -796,6 +788,7 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200413165638-669c56c373c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -805,8 +798,6 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -823,9 +814,7 @@ golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= @@ -840,9 +829,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= diff --git a/pkg/ctxbilly/fs.go b/pkg/ctxbilly/fs.go index e14835b..7fa3b73 100644 --- a/pkg/ctxbilly/fs.go +++ b/pkg/ctxbilly/fs.go @@ -5,7 +5,7 @@ import ( "io" "os" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" + "github.com/royalcat/ctxio" ) type Filesystem interface { @@ -36,16 +36,6 @@ type Filesystem interface { // UNC path if and only if the first path element is a UNC path. Join(elem ...string) string - // TempFile creates a new temporary file in the directory dir with a name - // beginning with prefix, opens the file for reading and writing, and - // returns the resulting *os.File. If dir is the empty string, TempFile - // uses the default directory for temporary files (see os.TempDir). - // Multiple programs calling TempFile simultaneously will not choose the - // same file. The caller can use f.Name() to find the pathname of the file. - // It is the caller's responsibility to remove the file when no longer - // needed. - TempFile(ctx context.Context, dir, prefix string) (File, error) - // ReadDir reads the directory named by d(irname and returns a list of // directory entries sorted by filename. ReadDir(ctx context.Context, path string) ([]os.FileInfo, error) @@ -74,19 +64,38 @@ type Filesystem interface { // Root() string } +type TempFileFS interface { + // TempFile creates a new temporary file in the directory dir with a name + // beginning with prefix, opens the file for reading and writing, and + // returns the resulting *os.File. If dir is the empty string, TempFile + // uses the default directory for temporary files (see os.TempDir). + // Multiple programs calling TempFile simultaneously will not choose the + // same file. The caller can use f.Name() to find the pathname of the file. + // It is the caller's responsibility to remove the file when no longer + // needed. + TempFile(ctx context.Context, dir, prefix string) (File, error) +} + type File interface { // Name returns the name of the file as presented to Open. Name() string ctxio.Writer + ctxio.WriterAt ctxio.Reader ctxio.ReaderAt io.Seeker ctxio.Closer +} + +type LockFile interface { // Lock locks the file like e.g. flock. It protects against access from // other processes. Lock() error // Unlock unlocks the file. Unlock() error +} + +type TruncateFile interface { // Truncate the file. Truncate(ctx context.Context, size int64) error } diff --git a/pkg/ctxbilly/mem.go b/pkg/ctxbilly/mem.go index 934e18a..82b9cdc 100644 --- a/pkg/ctxbilly/mem.go +++ b/pkg/ctxbilly/mem.go @@ -164,3 +164,12 @@ func (m *wrapFile) Unlock() error { func (m *wrapFile) Write(ctx context.Context, p []byte) (n int, err error) { return m.File.Write(p) } + +// WriteAt implements File. +func (m *wrapFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) { + _, err = m.File.Seek(off, 0) + if err != nil { + return 0, err + } + return m.File.Write(p) +} diff --git a/pkg/ctxbilly/uring.go b/pkg/ctxbilly/uring.go new file mode 100644 index 0000000..01b9d50 --- /dev/null +++ b/pkg/ctxbilly/uring.go @@ -0,0 +1,359 @@ +package ctxbilly + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + securejoin "github.com/cyphar/filepath-securejoin" + "github.com/iceber/iouring-go" +) + +func NewURingFS() (*UringFS, error) { + ur, err := iouring.New(64, iouring.WithAsync()) + if err != nil { + return nil, err + } + + return &UringFS{ + ur: ur, + }, nil +} + +var _ Filesystem = (*UringFS)(nil) + +const ( + defaultDirectoryMode = 0o755 + defaultCreateMode = 0o666 +) + +// UringFS is a fs implementation based on the OS filesystem which is bound to +// a base dir. +// Prefer this fs implementation over ChrootOS. +// +// Behaviours of note: +// 1. Read and write operations can only be directed to files which descends +// from the base dir. +// 2. Symlinks don't have their targets modified, and therefore can point +// to locations outside the base dir or to non-existent paths. +// 3. Readlink and Lstat ensures that the link file is located within the base +// dir, evaluating any symlinks that file or base dir may contain. +type UringFS struct { + ur *iouring.IOURing + baseDir string +} + +func newBoundOS(d string) *UringFS { + return &UringFS{baseDir: d} +} + +func (fs *UringFS) Create(ctx context.Context, filename string) (File, error) { + return fs.OpenFile(ctx, filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultCreateMode) +} + +func (fs *UringFS) OpenFile(ctx context.Context, filename string, flag int, perm os.FileMode) (File, error) { + fn, err := fs.abs(filename) + if err != nil { + return nil, err + } + + f, err := os.OpenFile(fn, flag, perm) + if err != nil { + return nil, err + } + + return newFile(fs.ur, f) +} + +func (fs *UringFS) ReadDir(ctx context.Context, path string) ([]os.FileInfo, error) { + dir, err := fs.abs(path) + if err != nil { + return nil, err + } + + entries, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + infos := make([]os.FileInfo, 0, len(entries)) + for _, v := range entries { + info, err := v.Info() + if err != nil { + return nil, err + } + + infos = append(infos, info) + } + + return infos, nil +} + +func (fs *UringFS) Rename(ctx context.Context, from, to string) error { + f, err := fs.abs(from) + if err != nil { + return err + } + t, err := fs.abs(to) + if err != nil { + return err + } + + // MkdirAll for target name. + if err := fs.createDir(t); err != nil { + return err + } + + return os.Rename(f, t) +} + +func (fs *UringFS) MkdirAll(ctx context.Context, path string, perm os.FileMode) error { + dir, err := fs.abs(path) + if err != nil { + return err + } + return os.MkdirAll(dir, perm) +} + +func (fs *UringFS) Open(ctx context.Context, filename string) (File, error) { + return fs.OpenFile(ctx, filename, os.O_RDONLY, 0) +} + +func (fs *UringFS) Stat(ctx context.Context, filename string) (os.FileInfo, error) { + filename, err := fs.abs(filename) + if err != nil { + return nil, err + } + return os.Stat(filename) +} + +func (fs *UringFS) Remove(ctx context.Context, filename string) error { + fn, err := fs.abs(filename) + if err != nil { + return err + } + return os.Remove(fn) +} + +func (fs *UringFS) Join(elem ...string) string { + return filepath.Join(elem...) +} + +func (fs *UringFS) RemoveAll(path string) error { + dir, err := fs.abs(path) + if err != nil { + return err + } + return os.RemoveAll(dir) +} + +func (fs *UringFS) Symlink(ctx context.Context, target, link string) error { + ln, err := fs.abs(link) + if err != nil { + return err + } + // MkdirAll for containing dir. + if err := fs.createDir(ln); err != nil { + return err + } + return os.Symlink(target, ln) +} + +func (fs *UringFS) Lstat(ctx context.Context, filename string) (os.FileInfo, error) { + filename = filepath.Clean(filename) + if !filepath.IsAbs(filename) { + filename = filepath.Join(fs.baseDir, filename) + } + if ok, err := fs.insideBaseDirEval(filename); !ok { + return nil, err + } + return os.Lstat(filename) +} + +func (fs *UringFS) Readlink(ctx context.Context, link string) (string, error) { + if !filepath.IsAbs(link) { + link = filepath.Clean(filepath.Join(fs.baseDir, link)) + } + if ok, err := fs.insideBaseDirEval(link); !ok { + return "", err + } + return os.Readlink(link) +} + +// Chroot returns a new OS filesystem, with the base dir set to the +// result of joining the provided path with the underlying base dir. +// func (fs *UringFS) Chroot(path string) (Filesystem, error) { +// joined, err := securejoin.SecureJoin(fs.baseDir, path) +// if err != nil { +// return nil, err +// } +// return newBoundOS(joined), nil +// } + +// Root returns the current base dir of the billy.Filesystem. +// This is required in order for this implementation to be a drop-in +// replacement for other upstream implementations (e.g. memory and osfs). +func (fs *UringFS) Root() string { + return fs.baseDir +} + +func (fs *UringFS) createDir(fullpath string) error { + dir := filepath.Dir(fullpath) + if dir != "." { + if err := os.MkdirAll(dir, defaultDirectoryMode); err != nil { + return err + } + } + + return nil +} + +// abs transforms filename to an absolute path, taking into account the base dir. +// Relative paths won't be allowed to ascend the base dir, so `../file` will become +// `/working-dir/file`. +// +// Note that if filename is a symlink, the returned address will be the target of the +// symlink. +func (fs *UringFS) abs(filename string) (string, error) { + if filename == fs.baseDir { + filename = string(filepath.Separator) + } + + path, err := securejoin.SecureJoin(fs.baseDir, filename) + if err != nil { + return "", nil + } + + return path, nil +} + +// insideBaseDirEval checks whether filename is contained within +// a dir that is within the fs.baseDir, by first evaluating any symlinks +// that either filename or fs.baseDir may contain. +func (fs *UringFS) insideBaseDirEval(filename string) (bool, error) { + dir, err := filepath.EvalSymlinks(filepath.Dir(filename)) + if dir == "" || os.IsNotExist(err) { + dir = filepath.Dir(filename) + } + wd, err := filepath.EvalSymlinks(fs.baseDir) + if wd == "" || os.IsNotExist(err) { + wd = fs.baseDir + } + if filename != wd && dir != wd && !strings.HasPrefix(dir, wd+string(filepath.Separator)) { + return false, fmt.Errorf("path outside base dir") + } + return true, nil +} + +func newFile(fsur *iouring.IOURing, f *os.File) (*URingFile, error) { + ur, err := iouring.New(64, iouring.WithAttachWQ(fsur)) + if err != nil { + return nil, err + } + + return &URingFile{ + ur: ur, + f: f, + }, nil +} + +type URingFile struct { + ur *iouring.IOURing + f *os.File +} + +// Close implements File. +func (o *URingFile) Close(ctx context.Context) error { + return errors.Join(o.ur.UnregisterFile(o.f), o.Close(ctx)) +} + +// Name implements File. +func (o *URingFile) Name() string { + return o.f.Name() +} + +// Read implements File. +func (o *URingFile) Read(ctx context.Context, p []byte) (n int, err error) { + req, err := o.ur.Read(o.f, p, nil) + if err != nil { + return 0, err + } + defer req.Cancel() + + select { + case <-req.Done(): + return req.GetRes() + case <-ctx.Done(): + req.Cancel() + <-req.Done() + return req.GetRes() + } +} + +// ReadAt implements File. +func (o *URingFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + req, err := o.ur.Pread(o.f, p, uint64(off), nil) + if err != nil { + return 0, err + } + defer req.Cancel() + + select { + case <-req.Done(): + return req.GetRes() + case <-ctx.Done(): + req.Cancel() + <-req.Done() + return req.GetRes() + } +} + +// Write implements File. +func (o *URingFile) Write(ctx context.Context, p []byte) (n int, err error) { + req, err := o.ur.Write(o.f, p, nil) + if err != nil { + return 0, err + } + defer req.Cancel() + + select { + case <-req.Done(): + return req.GetRes() + case <-ctx.Done(): + req.Cancel() + <-req.Done() + return req.GetRes() + } +} + +// WriteAt implements File. +func (o *URingFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) { + req, err := o.ur.Pwrite(o.f, p, uint64(off), nil) + if err != nil { + return 0, err + } + defer req.Cancel() + + select { + case <-req.Done(): + return req.GetRes() + case <-ctx.Done(): + req.Cancel() + <-req.Done() + return req.GetRes() + } +} + +// Seek implements File. +func (o *URingFile) Seek(offset int64, whence int) (int64, error) { + return o.f.Seek(offset, whence) +} + +// Truncate implements File. +func (o *URingFile) Truncate(ctx context.Context, size int64) error { + return o.f.Truncate(size) +} + +var _ File = (*URingFile)(nil) diff --git a/pkg/ctxio/copy.go b/pkg/ctxio/copy.go deleted file mode 100644 index 4c98a8e..0000000 --- a/pkg/ctxio/copy.go +++ /dev/null @@ -1,89 +0,0 @@ -package ctxio - -// // CopyN copies n bytes (or until an error) from src to dst. -// // It returns the number of bytes copied and the earliest -// // error encountered while copying. -// // On return, written == n if and only if err == nil. -// // -// // If dst implements [ReaderFrom], the copy is implemented using it. -// func CopyN(ctx context.Context, dst Writer, src Reader, n int64) (written int64, err error) { -// written, err = Copy(ctx, dst, LimitReader(src, n)) -// if written == n { -// return n, nil -// } -// if written < n && err == nil { -// // src stopped early; must have been EOF. -// err = io.EOF -// } - -// return -// } - -// // Copy copies from src to dst until either EOF is reached -// // on src or an error occurs. It returns the number of bytes -// // copied and the first error encountered while copying, if any. -// // -// // A successful Copy returns err == nil, not err == EOF. -// // Because Copy is defined to read from src until EOF, it does -// // not treat an EOF from Read as an error to be reported. -// // -// // If src implements [WriterTo], -// // the copy is implemented by calling src.WriteTo(dst). -// // Otherwise, if dst implements [ReaderFrom], -// // the copy is implemented by calling dst.ReadFrom(src). -// func Copy(ctx context.Context, dst Writer, src Reader) (written int64, err error) { -// return copyBuffer(ctx, dst, src, nil) -// } - -// // copyBuffer is the actual implementation of Copy and CopyBuffer. -// // if buf is nil, one is allocated. -// func copyBuffer(ctx context.Context, dst Writer, src Reader, buf []byte) (written int64, err error) { -// // If the reader has a WriteTo method, use it to do the copy. -// // Avoids an allocation and a copy. -// if wt, ok := src.(WriterTo); ok { -// return wt.WriteTo(dst) -// } -// // Similarly, if the writer has a ReadFrom method, use it to do the copy. -// if rt, ok := dst.(ReaderFrom); ok { -// return rt.ReadFrom(src) -// } -// if buf == nil { -// size := 32 * 1024 -// if l, ok := src.(*LimitedReader); ok && int64(size) > l.N { -// if l.N < 1 { -// size = 1 -// } else { -// size = int(l.N) -// } -// } -// buf = make([]byte, size) -// } -// for { -// nr, er := src.Read(ctx, buf) -// if nr > 0 { -// nw, ew := dst.Write(ctx, buf[0:nr]) -// if nw < 0 || nr < nw { -// nw = 0 -// if ew == nil { -// ew = errInvalidWrite -// } -// } -// written += int64(nw) -// if ew != nil { -// err = ew -// break -// } -// if nr != nw { -// err = io.ErrShortWrite -// break -// } -// } -// if er != nil { -// if er != io.EOF { -// err = er -// } -// break -// } -// } -// return written, err -// } diff --git a/pkg/ctxio/io.go b/pkg/ctxio/io.go deleted file mode 100644 index e0a7158..0000000 --- a/pkg/ctxio/io.go +++ /dev/null @@ -1,663 +0,0 @@ -package ctxio - -import ( - "context" - "errors" - "io" - "sync" -) - -// Seek whence values. -const ( - SeekStart = io.SeekStart // seek relative to the origin of the file - SeekCurrent = io.SeekCurrent // seek relative to the current offset - SeekEnd = io.SeekEnd // seek relative to the end -) - -// ErrShortWrite means that a write accepted fewer bytes than requested -// but failed to return an explicit error. -var ErrShortWrite = io.ErrShortWrite - -// errInvalidWrite means that a write returned an impossible count. -var errInvalidWrite = errors.New("invalid write result") - -// ErrShortBuffer means that a read required a longer buffer than was provided. -var ErrShortBuffer = io.ErrShortBuffer - -// EOF is the error returned by Read when no more input is available. -// (Read must return EOF itself, not an error wrapping EOF, -// because callers will test for EOF using ==.) -// Functions should return EOF only to signal a graceful end of input. -// If the EOF occurs unexpectedly in a structured data stream, -// the appropriate error is either [ErrUnexpectedEOF] or some other error -// giving more detail. -var EOF = io.EOF - -// ErrUnexpectedEOF means that EOF was encountered in the -// middle of reading a fixed-size block or data structure. -var ErrUnexpectedEOF = io.ErrUnexpectedEOF - -// ErrNoProgress is returned by some clients of a [Reader] when -// many calls to Read have failed to return any data or error, -// usually the sign of a broken [Reader] implementation. -var ErrNoProgress = io.ErrNoProgress - -// Reader is the interface that wraps the basic Read method. -// -// Read reads up to len(p) bytes into p. It returns the number of bytes -// read (0 <= n <= len(p)) and any error encountered. Even if Read -// returns n < len(p), it may use all of p as scratch space during the call. -// If some data is available but not len(p) bytes, Read conventionally -// returns what is available instead of waiting for more. -// -// When Read encounters an error or end-of-file condition after -// successfully reading n > 0 bytes, it returns the number of -// bytes read. It may return the (non-nil) error from the same call -// or return the error (and n == 0) from a subsequent call. -// An instance of this general case is that a Reader returning -// a non-zero number of bytes at the end of the input stream may -// return either err == EOF or err == nil. The next Read should -// return 0, EOF. -// -// Callers should always process the n > 0 bytes returned before -// considering the error err. Doing so correctly handles I/O errors -// that happen after reading some bytes and also both of the -// allowed EOF behaviors. -// -// If len(p) == 0, Read should always return n == 0. It may return a -// non-nil error if some error condition is known, such as EOF. -// -// Implementations of Read are discouraged from returning a -// zero byte count with a nil error, except when len(p) == 0. -// Callers should treat a return of 0 and nil as indicating that -// nothing happened; in particular it does not indicate EOF. -// -// Implementations must not retain p. -type Reader interface { - Read(ctx context.Context, p []byte) (n int, err error) -} - -// Writer is the interface that wraps the basic Write method. -// -// Write writes len(p) bytes from p to the underlying data stream. -// It returns the number of bytes written from p (0 <= n <= len(p)) -// and any error encountered that caused the write to stop early. -// Write must return a non-nil error if it returns n < len(p). -// Write must not modify the slice data, even temporarily. -// -// Implementations must not retain p. -type Writer interface { - Write(ctx context.Context, p []byte) (n int, err error) -} - -// Closer is the interface that wraps the basic Close method. -// -// The behavior of Close after the first call is undefined. -// Specific implementations may document their own behavior. -type Closer interface { - Close(ctx context.Context) error -} - -// Seeker is the interface that wraps the basic Seek method. -// -// Seek sets the offset for the next Read or Write to offset, -// interpreted according to whence: -// [SeekStart] means relative to the start of the file, -// [SeekCurrent] means relative to the current offset, and -// [SeekEnd] means relative to the end -// (for example, offset = -2 specifies the penultimate byte of the file). -// Seek returns the new offset relative to the start of the -// file or an error, if any. -// -// Seeking to an offset before the start of the file is an error. -// Seeking to any positive offset may be allowed, but if the new offset exceeds -// the size of the underlying object the behavior of subsequent I/O operations -// is implementation-dependent. -type Seeker interface { - Seek(offset int64, whence int) (int64, error) -} - -// ReadWriter is the interface that groups the basic Read and Write methods. -type ReadWriter interface { - Reader - Writer -} - -// ReadCloser is the interface that groups the basic Read and Close methods. -type ReadCloser interface { - Reader - Closer -} - -// WriteCloser is the interface that groups the basic Write and Close methods. -type WriteCloser interface { - Writer - Closer -} - -// ReadWriteCloser is the interface that groups the basic Read, Write and Close methods. -type ReadWriteCloser interface { - Reader - Writer - Closer -} - -// ReadSeeker is the interface that groups the basic Read and Seek methods. -type ReadSeeker interface { - Reader - Seeker -} - -// ReadSeekCloser is the interface that groups the basic Read, Seek and Close -// methods. -type ReadSeekCloser interface { - Reader - Seeker - Closer -} - -// WriteSeeker is the interface that groups the basic Write and Seek methods. -type WriteSeeker interface { - Writer - Seeker -} - -// ReadWriteSeeker is the interface that groups the basic Read, Write and Seek methods. -type ReadWriteSeeker interface { - Reader - Writer - Seeker -} - -// ReaderFrom is the interface that wraps the ReadFrom method. -// -// ReadFrom reads data from r until EOF or error. -// The return value n is the number of bytes read. -// Any error except EOF encountered during the read is also returned. -// -// The [Copy] function uses [ReaderFrom] if available. -type ReaderFrom interface { - ReadFrom(ctx context.Context, r Reader) (n int64, err error) -} - -// WriterTo is the interface that wraps the WriteTo method. -// -// WriteTo writes data to w until there's no more data to write or -// when an error occurs. The return value n is the number of bytes -// written. Any error encountered during the write is also returned. -// -// The Copy function uses WriterTo if available. -type WriterTo interface { - WriteTo(ctx context.Context, w Writer) (n int64, err error) -} - -// ReaderAt is the interface that wraps the basic ReadAt method. -// -// ReadAt reads len(p) bytes into p starting at offset off in the -// underlying input source. It returns the number of bytes -// read (0 <= n <= len(p)) and any error encountered. -// -// When ReadAt returns n < len(p), it returns a non-nil error -// explaining why more bytes were not returned. In this respect, -// ReadAt is stricter than Read. -// -// Even if ReadAt returns n < len(p), it may use all of p as scratch -// space during the call. If some data is available but not len(p) bytes, -// ReadAt blocks until either all the data is available or an error occurs. -// In this respect ReadAt is different from Read. -// -// If the n = len(p) bytes returned by ReadAt are at the end of the -// input source, ReadAt may return either err == EOF or err == nil. -// -// If ReadAt is reading from an input source with a seek offset, -// ReadAt should not affect nor be affected by the underlying -// seek offset. -// -// Clients of ReadAt can execute parallel ReadAt calls on the -// same input source. -// -// Implementations must not retain p. -type ReaderAt interface { - ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) -} - -// WriterAt is the interface that wraps the basic WriteAt method. -// -// WriteAt writes len(p) bytes from p to the underlying data stream -// at offset off. It returns the number of bytes written from p (0 <= n <= len(p)) -// and any error encountered that caused the write to stop early. -// WriteAt must return a non-nil error if it returns n < len(p). -// -// If WriteAt is writing to a destination with a seek offset, -// WriteAt should not affect nor be affected by the underlying -// seek offset. -// -// Clients of WriteAt can execute parallel WriteAt calls on the same -// destination if the ranges do not overlap. -// -// Implementations must not retain p. -type WriterAt interface { - WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) -} - -// StringWriter is the interface that wraps the WriteString method. -type StringWriter interface { - WriteString(s string) (n int, err error) -} - -// WriteString writes the contents of the string s to w, which accepts a slice of bytes. -// If w implements [StringWriter], [StringWriter.WriteString] is invoked directly. -// Otherwise, [Writer.Write] is called exactly once. -func WriteString(ctx context.Context, w Writer, s string) (n int, err error) { - if sw, ok := w.(StringWriter); ok { - return sw.WriteString(s) - } - return w.Write(ctx, []byte(s)) -} - -// ReadAtLeast reads from r into buf until it has read at least min bytes. -// It returns the number of bytes copied and an error if fewer bytes were read. -// The error is EOF only if no bytes were read. -// If an EOF happens after reading fewer than min bytes, -// ReadAtLeast returns [ErrUnexpectedEOF]. -// If min is greater than the length of buf, ReadAtLeast returns [ErrShortBuffer]. -// On return, n >= min if and only if err == nil. -// If r returns an error having read at least min bytes, the error is dropped. -func ReadAtLeast(ctx context.Context, r Reader, buf []byte, min int) (n int, err error) { - if len(buf) < min { - return 0, ErrShortBuffer - } - for n < min && err == nil { - var nn int - nn, err = r.Read(ctx, buf[n:]) - n += nn - } - if n >= min { - err = nil - } else if n > 0 && err == EOF { - err = ErrUnexpectedEOF - } - return -} - -// ReadFull reads exactly len(buf) bytes from r into buf. -// It returns the number of bytes copied and an error if fewer bytes were read. -// The error is EOF only if no bytes were read. -// If an EOF happens after reading some but not all the bytes, -// ReadFull returns [ErrUnexpectedEOF]. -// On return, n == len(buf) if and only if err == nil. -// If r returns an error having read at least len(buf) bytes, the error is dropped. -func ReadFull(ctx context.Context, r Reader, buf []byte) (n int, err error) { - return ReadAtLeast(ctx, r, buf, len(buf)) -} - -// CopyN copies n bytes (or until an error) from src to dst. -// It returns the number of bytes copied and the earliest -// error encountered while copying. -// On return, written == n if and only if err == nil. -// -// If dst implements [ReaderFrom], the copy is implemented using it. -func CopyN(ctx context.Context, dst Writer, src Reader, n int64) (written int64, err error) { - written, err = Copy(ctx, dst, LimitReader(src, n)) - if written == n { - return n, nil - } - if written < n && err == nil { - // src stopped early; must have been EOF. - err = EOF - } - return -} - -// Copy copies from src to dst until either EOF is reached -// on src or an error occurs. It returns the number of bytes -// copied and the first error encountered while copying, if any. -// -// A successful Copy returns err == nil, not err == EOF. -// Because Copy is defined to read from src until EOF, it does -// not treat an EOF from Read as an error to be reported. -// -// If src implements [WriterTo], -// the copy is implemented by calling src.WriteTo(dst). -// Otherwise, if dst implements [ReaderFrom], -// the copy is implemented by calling dst.ReadFrom(src). -func Copy(ctx context.Context, dst Writer, src Reader) (written int64, err error) { - return copyBuffer(ctx, dst, src, nil) -} - -// CopyBuffer is identical to Copy except that it stages through the -// provided buffer (if one is required) rather than allocating a -// temporary one. If buf is nil, one is allocated; otherwise if it has -// zero length, CopyBuffer panics. -// -// If either src implements [WriterTo] or dst implements [ReaderFrom], -// buf will not be used to perform the copy. -func CopyBuffer(ctx context.Context, dst Writer, src Reader, buf []byte) (written int64, err error) { - if buf != nil && len(buf) == 0 { - panic("empty buffer in CopyBuffer") - } - return copyBuffer(ctx, dst, src, buf) -} - -// copyBuffer is the actual implementation of Copy and CopyBuffer. -// if buf is nil, one is allocated. -func copyBuffer(ctx context.Context, dst Writer, src Reader, buf []byte) (written int64, err error) { - // If the reader has a WriteTo method, use it to do the copy. - // Avoids an allocation and a copy. - if wt, ok := src.(WriterTo); ok { - return wt.WriteTo(ctx, dst) - } - // Similarly, if the writer has a ReadFrom method, use it to do the copy. - if rt, ok := dst.(ReaderFrom); ok { - return rt.ReadFrom(ctx, src) - } - if buf == nil { - size := 32 * 1024 - if l, ok := src.(*LimitedReader); ok && int64(size) > l.N { - if l.N < 1 { - size = 1 - } else { - size = int(l.N) - } - } - buf = make([]byte, size) - } - for { - nr, er := src.Read(ctx, buf) - if nr > 0 { - nw, ew := dst.Write(ctx, buf[0:nr]) - if nw < 0 || nr < nw { - nw = 0 - if ew == nil { - ew = errInvalidWrite - } - } - written += int64(nw) - if ew != nil { - err = ew - break - } - if nr != nw { - err = ErrShortWrite - break - } - } - if er != nil { - if er != EOF { - err = er - } - break - } - } - return written, err -} - -// LimitReader returns a Reader that reads from r -// but stops with EOF after n bytes. -// The underlying implementation is a *LimitedReader. -func LimitReader(r Reader, n int64) Reader { return &LimitedReader{r, n} } - -// A LimitedReader reads from R but limits the amount of -// data returned to just N bytes. Each call to Read -// updates N to reflect the new amount remaining. -// Read returns EOF when N <= 0 or when the underlying R returns EOF. -type LimitedReader struct { - R Reader // underlying reader - N int64 // max bytes remaining -} - -func (l *LimitedReader) Read(ctx context.Context, p []byte) (n int, err error) { - if l.N <= 0 { - return 0, EOF - } - if int64(len(p)) > l.N { - p = p[0:l.N] - } - n, err = l.R.Read(ctx, p) - l.N -= int64(n) - return -} - -// NewSectionReader returns a [SectionReader] that reads from r -// starting at offset off and stops with EOF after n bytes. -func NewSectionReader(r ReaderAt, off int64, n int64) *SectionReader { - var remaining int64 - const maxint64 = 1<<63 - 1 - if off <= maxint64-n { - remaining = n + off - } else { - // Overflow, with no way to return error. - // Assume we can read up to an offset of 1<<63 - 1. - remaining = maxint64 - } - return &SectionReader{r, off, off, remaining, n} -} - -// SectionReader implements Read, Seek, and ReadAt on a section -// of an underlying [ReaderAt]. -type SectionReader struct { - r ReaderAt // constant after creation - base int64 // constant after creation - off int64 - limit int64 // constant after creation - n int64 // constant after creation -} - -func (s *SectionReader) Read(ctx context.Context, p []byte) (n int, err error) { - if s.off >= s.limit { - return 0, EOF - } - if max := s.limit - s.off; int64(len(p)) > max { - p = p[0:max] - } - n, err = s.r.ReadAt(ctx, p, s.off) - s.off += int64(n) - return -} - -var errWhence = errors.New("Seek: invalid whence") -var errOffset = errors.New("Seek: invalid offset") - -func (s *SectionReader) Seek(offset int64, whence int) (int64, error) { - switch whence { - default: - return 0, errWhence - case SeekStart: - offset += s.base - case SeekCurrent: - offset += s.off - case SeekEnd: - offset += s.limit - } - if offset < s.base { - return 0, errOffset - } - s.off = offset - return offset - s.base, nil -} - -func (s *SectionReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { - if off < 0 || off >= s.Size() { - return 0, EOF - } - off += s.base - if max := s.limit - off; int64(len(p)) > max { - p = p[0:max] - n, err = s.r.ReadAt(ctx, p, off) - if err == nil { - err = EOF - } - return n, err - } - return s.r.ReadAt(ctx, p, off) -} - -// Size returns the size of the section in bytes. -func (s *SectionReader) Size() int64 { return s.limit - s.base } - -// Outer returns the underlying [ReaderAt] and offsets for the section. -// -// The returned values are the same that were passed to [NewSectionReader] -// when the [SectionReader] was created. -func (s *SectionReader) Outer() (r ReaderAt, off int64, n int64) { - return s.r, s.base, s.n -} - -// An OffsetWriter maps writes at offset base to offset base+off in the underlying writer. -type OffsetWriter struct { - w WriterAt - base int64 // the original offset - off int64 // the current offset -} - -// NewOffsetWriter returns an [OffsetWriter] that writes to w -// starting at offset off. -func NewOffsetWriter(w WriterAt, off int64) *OffsetWriter { - return &OffsetWriter{w, off, off} -} - -func (o *OffsetWriter) Write(ctx context.Context, p []byte) (n int, err error) { - n, err = o.w.WriteAt(ctx, p, o.off) - o.off += int64(n) - return -} - -func (o *OffsetWriter) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) { - if off < 0 { - return 0, errOffset - } - - off += o.base - return o.w.WriteAt(ctx, p, off) -} - -func (o *OffsetWriter) Seek(offset int64, whence int) (int64, error) { - switch whence { - default: - return 0, errWhence - case SeekStart: - offset += o.base - case SeekCurrent: - offset += o.off - } - if offset < o.base { - return 0, errOffset - } - o.off = offset - return offset - o.base, nil -} - -// TeeReader returns a [Reader] that writes to w what it reads from r. -// All reads from r performed through it are matched with -// corresponding writes to w. There is no internal buffering - -// the write must complete before the read completes. -// Any error encountered while writing is reported as a read error. -func TeeReader(r Reader, w Writer) Reader { - return &teeReader{r, w} -} - -type teeReader struct { - r Reader - w Writer -} - -func (t *teeReader) Read(ctx context.Context, p []byte) (n int, err error) { - n, err = t.r.Read(ctx, p) - if n > 0 { - if n, err := t.w.Write(ctx, p[:n]); err != nil { - return n, err - } - } - return -} - -// Discard is a [Writer] on which all Write calls succeed -// without doing anything. -var Discard Writer = discard{} - -type discard struct{} - -// discard implements ReaderFrom as an optimization so Copy to -// io.Discard can avoid doing unnecessary work. -var _ ReaderFrom = discard{} - -func (discard) Write(ctx context.Context, p []byte) (int, error) { - return len(p), nil -} - -func (discard) WriteString(ctx context.Context, s string) (int, error) { - return len(s), nil -} - -var blackHolePool = sync.Pool{ - New: func() any { - b := make([]byte, 8192) - return &b - }, -} - -func (discard) ReadFrom(ctx context.Context, r Reader) (n int64, err error) { - bufp := blackHolePool.Get().(*[]byte) - readSize := 0 - for { - readSize, err = r.Read(ctx, *bufp) - n += int64(readSize) - if err != nil { - blackHolePool.Put(bufp) - if err == EOF { - return n, nil - } - return - } - } -} - -// NopCloser returns a [ReadCloser] with a no-op Close method wrapping -// the provided [Reader] r. -// If r implements [WriterTo], the returned [ReadCloser] will implement [WriterTo] -// by forwarding calls to r. -func NopCloser(r Reader) ReadCloser { - if _, ok := r.(WriterTo); ok { - return nopCloserWriterTo{r} - } - return nopCloser{r} -} - -type nopCloser struct { - Reader -} - -func (nopCloser) Close(ctx context.Context) error { return nil } - -type nopCloserWriterTo struct { - Reader -} - -func (nopCloserWriterTo) Close(ctx context.Context) error { return nil } - -func (c nopCloserWriterTo) WriteTo(ctx context.Context, w Writer) (n int64, err error) { - return c.Reader.(WriterTo).WriteTo(ctx, w) -} - -// ReadAll reads from r until an error or EOF and returns the data it read. -// A successful call returns err == nil, not err == EOF. Because ReadAll is -// defined to read from src until EOF, it does not treat an EOF from Read -// as an error to be reported. -func ReadAll(ctx context.Context, r Reader) ([]byte, error) { - b := make([]byte, 0, 512) - for { - n, err := r.Read(ctx, b[len(b):cap(b)]) - b = b[:len(b)+n] - if err != nil { - if err == EOF { - err = nil - } - return b, err - } - - if len(b) == cap(b) { - // Add more capacity (let append pick how much). - b = append(b, 0)[:len(b)] - } - } -} diff --git a/pkg/ctxio/reader.go b/pkg/ctxio/reader.go deleted file mode 100644 index 3ec11b6..0000000 --- a/pkg/ctxio/reader.go +++ /dev/null @@ -1,105 +0,0 @@ -package ctxio - -import ( - "context" - "io" -) - -type FileReader interface { - Reader - ReaderAt - Closer -} - -type contextReader struct { - ctx context.Context - r Reader -} - -func (r *contextReader) Read(p []byte) (n int, err error) { - if r.ctx.Err() != nil { - return 0, r.ctx.Err() - } - - return r.r.Read(r.ctx, p) -} - -func IoReaderAt(ctx context.Context, r ReaderAt) io.ReaderAt { - return &contextReaderAt{ctx: ctx, r: r} -} - -type contextReaderAt struct { - ctx context.Context - r ReaderAt -} - -func (c *contextReaderAt) ReadAt(p []byte, off int64) (n int, err error) { - if c.ctx.Err() != nil { - return 0, c.ctx.Err() - } - - return c.r.ReadAt(c.ctx, p, off) -} - -func IoReader(ctx context.Context, r Reader) io.Reader { - return &contextReader{ctx: ctx, r: r} -} - -func WrapIoReader(r io.Reader) Reader { - return &wrapReader{r: r} -} - -type wrapReader struct { - r io.Reader -} - -var _ Reader = (*wrapReader)(nil) - -// Read implements Reader. -func (c *wrapReader) Read(ctx context.Context, p []byte) (n int, err error) { - if ctx.Err() != nil { - return 0, ctx.Err() - } - return c.r.Read(p) -} - -func WrapIoWriter(w io.Writer) Writer { - return &wrapWriter{w: w} -} - -type wrapWriter struct { - w io.Writer -} - -var _ Writer = (*wrapWriter)(nil) - -// Write implements Writer. -func (c *wrapWriter) Write(ctx context.Context, p []byte) (n int, err error) { - if ctx.Err() != nil { - return 0, ctx.Err() - } - return c.w.Write(p) -} - -func WrapIoReadCloser(r io.ReadCloser) ReadCloser { - return &wrapReadCloser{r: r} -} - -type wrapReadCloser struct { - r io.ReadCloser -} - -var _ Reader = (*wrapReadCloser)(nil) - -// Read implements Reader. -func (c *wrapReadCloser) Read(ctx context.Context, p []byte) (n int, err error) { - if ctx.Err() != nil { - return 0, ctx.Err() - } - return c.r.Read(p) -} - -// Close implements ReadCloser. -func (c *wrapReadCloser) Close(ctx context.Context) error { - return c.r.Close() -} diff --git a/pkg/ctxio/teereader.go b/pkg/ctxio/teereader.go deleted file mode 100644 index 999a670..0000000 --- a/pkg/ctxio/teereader.go +++ /dev/null @@ -1,20 +0,0 @@ -package ctxio - -// func TeeReader(r Reader, w Writer) Reader { -// return &teeReader{r, w} -// } - -// type teeReader struct { -// r Reader -// w Writer -// } - -// func (t *teeReader) Read(ctx context.Context, p []byte) (n int, err error) { -// n, err = t.r.Read(ctx, p) -// if n > 0 { -// if n, err := t.w.Write(ctx, p[:n]); err != nil { -// return n, err -// } -// } -// return -// } diff --git a/pkg/go-nfs/filesystem.go b/pkg/go-nfs/filesystem.go index 3a06cc8..7aa66c6 100644 --- a/pkg/go-nfs/filesystem.go +++ b/pkg/go-nfs/filesystem.go @@ -6,7 +6,7 @@ import ( "os" "time" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" + "github.com/royalcat/ctxio" ) // FSStat returns metadata about a file system diff --git a/pkg/ctxio/cachereader.go b/pkg/ioutils/cachereader.go similarity index 74% rename from pkg/ctxio/cachereader.go rename to pkg/ioutils/cachereader.go index 14e70f9..93862ba 100644 --- a/pkg/ctxio/cachereader.go +++ b/pkg/ioutils/cachereader.go @@ -1,26 +1,34 @@ -package ctxio +package ioutils import ( "context" "errors" "io" "sync" + + "github.com/royalcat/ctxio" ) +type FileReader interface { + ctxio.ReaderAt + ctxio.Reader + ctxio.Closer +} + type CacheReader struct { m sync.Mutex fo int64 fr *FileBuffer to int64 - tr Reader + tr ctxio.Reader } var _ FileReader = (*CacheReader)(nil) -func NewCacheReader(r Reader) (FileReader, error) { +func NewCacheReader(r ctxio.Reader) (FileReader, error) { fr := NewFileBuffer(nil) - tr := TeeReader(r, fr) + tr := ctxio.TeeReader(r, fr) return &CacheReader{fr: fr, tr: tr}, nil } @@ -30,7 +38,7 @@ func (dtr *CacheReader) ReadAt(ctx context.Context, p []byte, off int64) (int, e tb := off + int64(len(p)) if tb > dtr.fo { - w, err := CopyN(ctx, Discard, dtr.tr, tb-dtr.fo) + w, err := ctxio.CopyN(ctx, ctxio.Discard, dtr.tr, tb-dtr.fo) dtr.to += w if err != nil && err != io.EOF { return 0, err @@ -55,7 +63,7 @@ func (dtr *CacheReader) Close(ctx context.Context) error { frcloser := dtr.fr.Close(ctx) var closeerr error - if rc, ok := dtr.tr.(ReadCloser); ok { + if rc, ok := dtr.tr.(ctxio.ReadCloser); ok { closeerr = rc.Close(ctx) } diff --git a/pkg/ctxio/disk.go b/pkg/ioutils/disk.go similarity index 73% rename from pkg/ctxio/disk.go rename to pkg/ioutils/disk.go index cddd6a9..bfc40fd 100644 --- a/pkg/ctxio/disk.go +++ b/pkg/ioutils/disk.go @@ -1,10 +1,12 @@ -package ctxio +package ioutils import ( "context" "io" "os" "sync" + + "github.com/royalcat/ctxio" ) type DiskCacheReader struct { @@ -13,14 +15,14 @@ type DiskCacheReader struct { fo int64 fr *os.File to int64 - tr Reader + tr ctxio.Reader } -var _ ReaderAt = (*DiskCacheReader)(nil) -var _ Reader = (*DiskCacheReader)(nil) -var _ Closer = (*DiskCacheReader)(nil) +var _ ctxio.ReaderAt = (*DiskCacheReader)(nil) +var _ ctxio.Reader = (*DiskCacheReader)(nil) +var _ ctxio.Closer = (*DiskCacheReader)(nil) -func NewDiskCacheReader(r Reader) (*DiskCacheReader, error) { +func NewDiskCacheReader(r ctxio.Reader) (*DiskCacheReader, error) { tempDir, err := os.MkdirTemp("/tmp", "tstor") if err != nil { return nil, err @@ -30,7 +32,7 @@ func NewDiskCacheReader(r Reader) (*DiskCacheReader, error) { return nil, err } - tr := TeeReader(r, WrapIoWriter(fr)) + tr := ctxio.TeeReader(r, ctxio.WrapIoWriter(fr)) return &DiskCacheReader{fr: fr, tr: tr}, nil } @@ -40,7 +42,7 @@ func (dtr *DiskCacheReader) ReadAt(ctx context.Context, p []byte, off int64) (in tb := off + int64(len(p)) if tb > dtr.fo { - w, err := CopyN(ctx, Discard, dtr.tr, tb-dtr.fo) + w, err := ctxio.CopyN(ctx, ctxio.Discard, dtr.tr, tb-dtr.fo) dtr.to += w if err != nil && err != io.EOF { return 0, err diff --git a/pkg/ctxio/filebuffer.go b/pkg/ioutils/filebuffer.go similarity index 96% rename from pkg/ctxio/filebuffer.go rename to pkg/ioutils/filebuffer.go index 23cb88e..0866d88 100644 --- a/pkg/ctxio/filebuffer.go +++ b/pkg/ioutils/filebuffer.go @@ -1,4 +1,4 @@ -package ctxio +package ioutils import ( "bytes" @@ -6,6 +6,8 @@ import ( "errors" "io" "os" + + "github.com/royalcat/ctxio" ) // FileBuffer implements interfaces implemented by files. @@ -20,7 +22,7 @@ type FileBuffer struct { } var _ FileReader = (*FileBuffer)(nil) -var _ Writer = (*FileBuffer)(nil) +var _ ctxio.Writer = (*FileBuffer)(nil) // NewFileBuffer returns a new populated Buffer func NewFileBuffer(b []byte) *FileBuffer { @@ -30,8 +32,8 @@ func NewFileBuffer(b []byte) *FileBuffer { // NewFileBufferFromReader is a convenience method that returns a new populated Buffer // whose contents are sourced from a supplied reader by loading it entirely // into memory. -func NewFileBufferFromReader(ctx context.Context, reader Reader) (*FileBuffer, error) { - data, err := ReadAll(ctx, reader) +func NewFileBufferFromReader(ctx context.Context, reader ctxio.Reader) (*FileBuffer, error) { + data, err := ctxio.ReadAll(ctx, reader) if err != nil { return nil, err } diff --git a/pkg/ctxio/readerat.go b/pkg/ioutils/readerat.go similarity index 68% rename from pkg/ctxio/readerat.go rename to pkg/ioutils/readerat.go index af2d156..20af177 100644 --- a/pkg/ctxio/readerat.go +++ b/pkg/ioutils/readerat.go @@ -1,25 +1,27 @@ -package ctxio +package ioutils import ( "context" "sync" + + "github.com/royalcat/ctxio" ) type ReaderReaderAtWrapper struct { mu sync.Mutex - rat ReaderAt + rat ctxio.ReaderAt offset int64 } -func NewReaderReaderAtWrapper(rat ReaderAt) *ReaderReaderAtWrapper { +func NewReaderReaderAtWrapper(rat ctxio.ReaderAt) *ReaderReaderAtWrapper { return &ReaderReaderAtWrapper{ rat: rat, } } -var _ Reader = (*ReaderReaderAtWrapper)(nil) -var _ ReaderAt = (*ReaderReaderAtWrapper)(nil) -var _ Closer = (*ReaderReaderAtWrapper)(nil) +var _ ctxio.Reader = (*ReaderReaderAtWrapper)(nil) +var _ ctxio.ReaderAt = (*ReaderReaderAtWrapper)(nil) +var _ ctxio.Closer = (*ReaderReaderAtWrapper)(nil) // Read implements Reader. func (r *ReaderReaderAtWrapper) Read(ctx context.Context, p []byte) (n int, err error) { @@ -37,7 +39,7 @@ func (r *ReaderReaderAtWrapper) ReadAt(ctx context.Context, p []byte, off int64) // Close implements Closer. func (r *ReaderReaderAtWrapper) Close(ctx context.Context) (err error) { - if c, ok := r.rat.(Closer); ok { + if c, ok := r.rat.(ctxio.Closer); ok { err = c.Close(ctx) if err != nil { return err diff --git a/pkg/ctxio/seeker.go b/pkg/ioutils/seeker.go similarity index 91% rename from pkg/ctxio/seeker.go rename to pkg/ioutils/seeker.go index 9a696c9..71c0c96 100644 --- a/pkg/ctxio/seeker.go +++ b/pkg/ioutils/seeker.go @@ -1,9 +1,11 @@ -package ctxio +package ioutils import ( "context" "io" "sync" + + "github.com/royalcat/ctxio" ) type ioSeekerWrapper struct { @@ -13,10 +15,10 @@ type ioSeekerWrapper struct { pos int64 size int64 - r ReaderAt + r ctxio.ReaderAt } -func WrapIoReadSeeker(ctx context.Context, r ReaderAt, size int64) io.ReadSeeker { +func WrapIoReadSeeker(ctx context.Context, r ctxio.ReaderAt, size int64) io.ReadSeeker { return &ioSeekerWrapper{ ctx: ctx, r: r, diff --git a/src/delivery/api.go b/src/delivery/api.go index 71732b3..feab526 100644 --- a/src/delivery/api.go +++ b/src/delivery/api.go @@ -7,7 +7,7 @@ import ( "net/http" "os" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" "github.com/anacrolix/missinggo/v2/filecache" "github.com/gin-gonic/gin" ) diff --git a/src/delivery/graphql/model/entry.go b/src/delivery/graphql/model/entry.go index 2317c05..4d46797 100644 --- a/src/delivery/graphql/model/entry.go +++ b/src/delivery/graphql/model/entry.go @@ -3,8 +3,8 @@ package model import ( "context" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" + "git.kmsign.ru/royalcat/tstor/src/vfs" ) type FsElem interface { diff --git a/src/delivery/graphql/model/mappers.go b/src/delivery/graphql/model/mappers.go index cc414ad..93d2651 100644 --- a/src/delivery/graphql/model/mappers.go +++ b/src/delivery/graphql/model/mappers.go @@ -3,7 +3,7 @@ package model import ( "context" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" atorrent "github.com/anacrolix/torrent" ) diff --git a/src/delivery/graphql/model/models_gen.go b/src/delivery/graphql/model/models_gen.go index 6d6a543..7a2aeb2 100644 --- a/src/delivery/graphql/model/models_gen.go +++ b/src/delivery/graphql/model/models_gen.go @@ -5,8 +5,8 @@ package model import ( "time" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" + "git.kmsign.ru/royalcat/tstor/src/vfs" torrent1 "github.com/anacrolix/torrent" ) diff --git a/src/delivery/graphql/resolver/mutation.resolvers.go b/src/delivery/graphql/resolver/mutation.resolvers.go index 76cf728..d59d80d 100644 --- a/src/delivery/graphql/resolver/mutation.resolvers.go +++ b/src/delivery/graphql/resolver/mutation.resolvers.go @@ -14,7 +14,7 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/uuid" graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" "github.com/99designs/gqlgen/graphql" aih "github.com/anacrolix/torrent/types/infohash" ) diff --git a/src/delivery/graphql/resolver/query.resolvers.go b/src/delivery/graphql/resolver/query.resolvers.go index e869e1e..8d8d01f 100644 --- a/src/delivery/graphql/resolver/query.resolvers.go +++ b/src/delivery/graphql/resolver/query.resolvers.go @@ -11,7 +11,7 @@ import ( graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" ) // Torrents is the resolver for the torrents field. diff --git a/src/delivery/graphql/resolver/resolver.go b/src/delivery/graphql/resolver/resolver.go index 8a9a3e1..e0a237e 100644 --- a/src/delivery/graphql/resolver/resolver.go +++ b/src/delivery/graphql/resolver/resolver.go @@ -1,8 +1,8 @@ package resolver import ( - "git.kmsign.ru/royalcat/tstor/src/host/torrent" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/go-git/go-billy/v5" ) diff --git a/src/delivery/http.go b/src/delivery/http.go index 931ab2f..51745db 100644 --- a/src/delivery/http.go +++ b/src/delivery/http.go @@ -7,8 +7,8 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/anacrolix/missinggo/v2/filecache" echopprof "github.com/labstack/echo-contrib/pprof" "github.com/labstack/echo/v4" diff --git a/src/delivery/router.go b/src/delivery/router.go index f996973..07961b4 100644 --- a/src/delivery/router.go +++ b/src/delivery/router.go @@ -8,8 +8,8 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/rlog" graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver" - "git.kmsign.ru/royalcat/tstor/src/host/torrent" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/handler/extension" diff --git a/src/export/fuse/handler.go b/src/export/fuse/handler.go index 83c635f..4b9f087 100644 --- a/src/export/fuse/handler.go +++ b/src/export/fuse/handler.go @@ -8,7 +8,7 @@ import ( "path/filepath" "runtime" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/billziss-gh/cgofuse/fuse" ) diff --git a/src/export/fuse/handler_nocgo.go b/src/export/fuse/handler_nocgo.go index 4e0e92e..7f0bd4f 100644 --- a/src/export/fuse/handler_nocgo.go +++ b/src/export/fuse/handler_nocgo.go @@ -5,7 +5,7 @@ package fuse import ( "fmt" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" ) type Handler struct{} diff --git a/src/export/fuse/mount.go b/src/export/fuse/mount.go index 9248b70..579f045 100644 --- a/src/export/fuse/mount.go +++ b/src/export/fuse/mount.go @@ -11,7 +11,7 @@ import ( "os" "sync" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/billziss-gh/cgofuse/fuse" ) diff --git a/src/export/fuse/mount_test.go b/src/export/fuse/mount_test.go index 8280bb1..6b78a81 100644 --- a/src/export/fuse/mount_test.go +++ b/src/export/fuse/mount_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/stretchr/testify/require" ) diff --git a/src/export/httpfs/httpfs.go b/src/export/httpfs/httpfs.go index 9a03a23..3eaf4b7 100644 --- a/src/export/httpfs/httpfs.go +++ b/src/export/httpfs/httpfs.go @@ -8,8 +8,8 @@ import ( "os" "sync" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/pkg/ioutils" + "git.kmsign.ru/royalcat/tstor/src/vfs" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -93,7 +93,7 @@ func newHTTPFile(ctx context.Context, f vfs.File, dirContent []os.FileInfo) *htt return &httpFile{ f: f, dirContent: dirContent, - ReadSeekCloser: ctxio.IoReadSeekCloserWrapper(ctx, f, f.Size()), + ReadSeekCloser: ioutils.IoReadSeekCloserWrapper(ctx, f, f.Size()), } } diff --git a/src/export/nfs/handler.go b/src/export/nfs/handler.go index 18ef6bd..332487c 100644 --- a/src/export/nfs/handler.go +++ b/src/export/nfs/handler.go @@ -6,8 +6,8 @@ import ( nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs" nfshelper "git.kmsign.ru/royalcat/tstor/pkg/go-nfs/helpers" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/log" + "git.kmsign.ru/royalcat/tstor/src/vfs" ) func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) { diff --git a/src/export/nfs/wrapper-v4.go b/src/export/nfs/wrapper-v4.go index e1e7bf3..c7b741c 100644 --- a/src/export/nfs/wrapper-v4.go +++ b/src/export/nfs/wrapper-v4.go @@ -3,7 +3,7 @@ package nfs // import ( // "io/fs" -// "git.kmsign.ru/royalcat/tstor/src/host/vfs" +// "git.kmsign.ru/royalcat/tstor/src/vfs" // nfsfs "github.com/smallfz/libnfs-go/fs" // ) diff --git a/src/export/nfs/wrapper.go b/src/export/nfs/wrapper.go index 753a5f9..eaf2784 100644 --- a/src/export/nfs/wrapper.go +++ b/src/export/nfs/wrapper.go @@ -11,7 +11,7 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/go-git/go-billy/v5" ) @@ -199,6 +199,11 @@ func (f *billyFile) Write(ctx context.Context, p []byte) (n int, err error) { return 0, billyErr(nil, vfs.ErrNotImplemented, f.log) } +// WriteAt implements ctxbilly.File. +func (f *billyFile) WriteAt(ctx context.Context, p []byte, off int64) (n int, err error) { + return 0, billyErr(nil, vfs.ErrNotImplemented, f.log) +} + // Lock implements billy.File. func (*billyFile) Lock() error { return nil // TODO diff --git a/src/export/webdav/fs.go b/src/export/webdav/fs.go index 6309bad..e5da3fe 100644 --- a/src/export/webdav/fs.go +++ b/src/export/webdav/fs.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "golang.org/x/net/webdav" ) diff --git a/src/export/webdav/fs_test.go b/src/export/webdav/fs_test.go index a43a50e..55eb4aa 100644 --- a/src/export/webdav/fs_test.go +++ b/src/export/webdav/fs_test.go @@ -6,7 +6,7 @@ import ( "os" "testing" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/stretchr/testify/require" "golang.org/x/net/webdav" ) diff --git a/src/export/webdav/handler.go b/src/export/webdav/handler.go index c44528b..f93da4a 100644 --- a/src/export/webdav/handler.go +++ b/src/export/webdav/handler.go @@ -4,7 +4,7 @@ import ( "log/slog" "net/http" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "golang.org/x/net/webdav" ) diff --git a/src/export/webdav/http.go b/src/export/webdav/http.go index f88336b..c3d9e83 100644 --- a/src/export/webdav/http.go +++ b/src/export/webdav/http.go @@ -5,7 +5,7 @@ import ( "log/slog" "net/http" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "golang.org/x/net/webdav" ) diff --git a/src/host/controller/sourceddir.go b/src/host/controller/sourceddir.go deleted file mode 100644 index f317c84..0000000 --- a/src/host/controller/sourceddir.go +++ /dev/null @@ -1,51 +0,0 @@ -package controller - -import ( - "context" - - "github.com/lrstanley/go-ytdlp" -) - -type SourceUpdater struct { - sources []VirtDirSource -} - -type SourcedDirSource string - -const ( - SourcedDirYtDlp SourcedDirSource = "yt-dlp-playlist" -) - -type VirtDirSource interface { - Source() SourcedDirSource -} - -var _ VirtDirSource = (*SourcedDirYtDlpPlaylist)(nil) - -type SourcedDirYtDlpPlaylist struct { - URL string `json:"url"` -} - -func (SourcedDirYtDlpPlaylist) Source() SourcedDirSource { - return SourcedDirYtDlp -} - -type SDController struct { - sources []VirtDirSource -} - -func (sd *SourcedDirYtDlpPlaylist) Update(ctx context.Context) error { - _, err := ytdlp.Install(ctx, nil) - if err != nil { - return err - } - - dl := ytdlp.New().PrintJSON() - - _, err = dl.Run(ctx, sd.URL) - if err != nil { - return err - } - - return nil -} diff --git a/src/host/storage.go b/src/host/storage.go deleted file mode 100644 index d61388a..0000000 --- a/src/host/storage.go +++ /dev/null @@ -1,19 +0,0 @@ -package host - -import ( - "git.kmsign.ru/royalcat/tstor/src/host/torrent" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" -) - -func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Service) vfs.Filesystem { - factories := map[string]vfs.FsFactory{ - ".torrent": tsrv.NewTorrentFs, - } - - // add default torrent factory for root filesystem - for k, v := range vfs.ArchiveFactories { - factories[k] = v - } - - return vfs.NewResolveFS(sourceFS, factories) -} diff --git a/src/host/vfs/sourced.go b/src/host/vfs/sourced.go deleted file mode 100644 index aaec80a..0000000 --- a/src/host/vfs/sourced.go +++ /dev/null @@ -1,3 +0,0 @@ -package vfs - -const sorcedDirExt = ".tsvd" diff --git a/src/iio/wrapper_test.go b/src/iio/wrapper_test.go index e53471f..b661034 100644 --- a/src/iio/wrapper_test.go +++ b/src/iio/wrapper_test.go @@ -5,8 +5,8 @@ import ( "io" "testing" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/pkg/ioutils" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/stretchr/testify/require" ) @@ -20,7 +20,7 @@ func TestSeekerWrapper(t *testing.T) { mf := vfs.NewMemoryFile("text.txt", testData) - r := ctxio.IoReadSeekCloserWrapper(ctx, mf, mf.Size()) + r := ioutils.IoReadSeekCloserWrapper(ctx, mf, mf.Size()) defer r.Close() n, err := r.Seek(6, io.SeekStart) diff --git a/src/sources/source.go b/src/sources/source.go new file mode 100644 index 0000000..1629e2c --- /dev/null +++ b/src/sources/source.go @@ -0,0 +1,53 @@ +package sources + +import ( + "context" + "encoding/json" + "fmt" + "reflect" +) + +type UpdateTask interface{} + +type Source interface { + Name() string // unique name within source type + SourceType() string + Fetch(ctx context.Context, task UpdateTask, dir string) error +} + +var sourceTypesRegistry = map[string]reflect.Type{} + +// func RegisterSource[T Source]() { +// var s T +// t := reflect.TypeOf(s) +// if t.Kind() == reflect.Ptr { +// RegisterSource[T]() +// return +// } + +// sourceTypesRegistry[s.SourceType()] = t +// } + +type sourceType struct { + Type string `json:"type"` +} + +func parseSource(data []byte) (Source, error) { + var sourceType sourceType + err := json.Unmarshal(data, &sourceType) + if err != nil { + return nil, err + } + + st, ok := sourceTypesRegistry[sourceType.Type] + if !ok { + return nil, fmt.Errorf("source type %s not registred", sourceType.Type) + } + + s := reflect.New(st).Interface().(Source) + err = json.Unmarshal(data, &s) + if err != nil { + return nil, err + } + return s, nil +} diff --git a/src/sources/storage.go b/src/sources/storage.go new file mode 100644 index 0000000..86d56a7 --- /dev/null +++ b/src/sources/storage.go @@ -0,0 +1,21 @@ +package sources + +import ( + "git.kmsign.ru/royalcat/tstor/src/sources/torrent" + "git.kmsign.ru/royalcat/tstor/src/sources/ytdlp" + "git.kmsign.ru/royalcat/tstor/src/vfs" +) + +func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Service, ytdlpsrv *ytdlp.Service) vfs.Filesystem { + factories := map[string]vfs.FsFactory{ + ".torrent": tsrv.NewTorrentFs, + ".ts-ytdlp": ytdlpsrv.BuildFS, + } + + // add default torrent factory for root filesystem + for k, v := range vfs.ArchiveFactories { + factories[k] = v + } + + return vfs.NewResolveFS(sourceFS, factories) +} diff --git a/src/host/torrent/client.go b/src/sources/torrent/client.go similarity index 100% rename from src/host/torrent/client.go rename to src/sources/torrent/client.go diff --git a/src/host/torrent/controller.go b/src/sources/torrent/controller.go similarity index 100% rename from src/host/torrent/controller.go rename to src/sources/torrent/controller.go diff --git a/src/host/torrent/file_mappings.go b/src/sources/torrent/file_mappings.go similarity index 100% rename from src/host/torrent/file_mappings.go rename to src/sources/torrent/file_mappings.go diff --git a/src/host/torrent/fileitem.go b/src/sources/torrent/fileitem.go similarity index 100% rename from src/host/torrent/fileitem.go rename to src/sources/torrent/fileitem.go diff --git a/src/host/torrent/fs.go b/src/sources/torrent/fs.go similarity index 99% rename from src/host/torrent/fs.go rename to src/sources/torrent/fs.go index 8635feb..14b634e 100644 --- a/src/host/torrent/fs.go +++ b/src/sources/torrent/fs.go @@ -12,7 +12,7 @@ import ( "sync/atomic" "time" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/anacrolix/torrent" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" diff --git a/src/host/torrent/fs_test.go b/src/sources/torrent/fs_test.go similarity index 100% rename from src/host/torrent/fs_test.go rename to src/sources/torrent/fs_test.go diff --git a/src/host/torrent/id.go b/src/sources/torrent/id.go similarity index 100% rename from src/host/torrent/id.go rename to src/sources/torrent/id.go diff --git a/src/host/torrent/infobytes.go b/src/sources/torrent/infobytes.go similarity index 100% rename from src/host/torrent/infobytes.go rename to src/sources/torrent/infobytes.go diff --git a/src/host/tkv/new.go b/src/sources/torrent/kv.go similarity index 78% rename from src/host/tkv/new.go rename to src/sources/torrent/kv.go index 3f740ea..f50feca 100644 --- a/src/host/tkv/new.go +++ b/src/sources/torrent/kv.go @@ -1,4 +1,4 @@ -package tkv +package torrent import ( "path" @@ -8,7 +8,7 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func New[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) { +func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) { dir := path.Join(dbdir, name) store, err = kv.NewBadgerKV[K, V](dir) if err != nil { diff --git a/src/host/torrent/piece_completion.go b/src/sources/torrent/piece_completion.go similarity index 100% rename from src/host/torrent/piece_completion.go rename to src/sources/torrent/piece_completion.go diff --git a/src/host/torrent/piece_storage.go b/src/sources/torrent/piece_storage.go similarity index 100% rename from src/host/torrent/piece_storage.go rename to src/sources/torrent/piece_storage.go diff --git a/src/host/torrent/queue.go b/src/sources/torrent/queue.go similarity index 100% rename from src/host/torrent/queue.go rename to src/sources/torrent/queue.go diff --git a/src/host/torrent/service.go b/src/sources/torrent/service.go similarity index 97% rename from src/host/torrent/service.go rename to src/sources/torrent/service.go index bab9867..d21eace 100644 --- a/src/host/torrent/service.go +++ b/src/sources/torrent/service.go @@ -13,11 +13,10 @@ import ( "sync" "time" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" - "git.kmsign.ru/royalcat/tstor/src/host/tkv" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/royalcat/ctxio" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -32,7 +31,7 @@ import ( "github.com/royalcat/kv" ) -var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/host/torrent") +var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent") type DirAquire struct { Name string @@ -99,7 +98,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, } client.AddDhtNodes(conf.DHTNodes) - s.dirsAquire, err = tkv.New[string, DirAquire](conf.MetadataFolder, "dir-acquire") + s.dirsAquire, err = NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire") if err != nil { return nil, err } diff --git a/src/host/torrent/setup.go b/src/sources/torrent/setup.go similarity index 100% rename from src/host/torrent/setup.go rename to src/sources/torrent/setup.go diff --git a/src/host/torrent/stats.go b/src/sources/torrent/stats.go similarity index 100% rename from src/host/torrent/stats.go rename to src/sources/torrent/stats.go diff --git a/src/host/torrent/stats_store.go b/src/sources/torrent/stats_store.go similarity index 100% rename from src/host/torrent/stats_store.go rename to src/sources/torrent/stats_store.go diff --git a/src/host/torrent/storage.go b/src/sources/torrent/storage.go similarity index 100% rename from src/host/torrent/storage.go rename to src/sources/torrent/storage.go diff --git a/src/sources/ytdlp/controller.go b/src/sources/ytdlp/controller.go new file mode 100644 index 0000000..03233dc --- /dev/null +++ b/src/sources/ytdlp/controller.go @@ -0,0 +1,73 @@ +package ytdlp + +import ( + "context" + "encoding/json" + "fmt" + "path" + "sync" + + "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/go-git/go-billy/v5/osfs" + "github.com/royalcat/ctxio" +) + +func NewService(dataDir string) *Service { + return &Service{ + dataDir: dataDir, + sources: make(map[string]ytdlpSource, 0), + } +} + +type Service struct { + mu sync.Mutex + + dataDir string + sources map[string]ytdlpSource +} + +func (c *Service) AddSource(s ytdlpSource) { + c.mu.Lock() + defer c.mu.Unlock() + + c.sources[s.Name()] = s +} + +func (c *Service) sourceDir(s ytdlpSource) string { + return path.Join(c.dataDir, s.Name()) +} + +func (c *Service) Update(ctx context.Context) error { + for name, s := range c.sources { + if ctx.Err() != nil { + return ctx.Err() + } + + dir := c.sourceDir(s) + err := s.Download(ctx, nil, dir) + if err != nil { + return fmt.Errorf("failed to fetch source %s: %w", name, err) + } + } + return nil +} + +func (c *Service) BuildFS(ctx context.Context, f vfs.File) (vfs.Filesystem, error) { + data, err := ctxio.ReadAll(ctx, f) + if err != nil { + return nil, fmt.Errorf("failed to read source file: %w", err) + } + + var s ytdlpSource + err = json.Unmarshal(data, &s) + if err != nil { + return nil, err + } + + c.AddSource(s) + + downloadFS := ctxbilly.WrapFileSystem(osfs.New(c.sourceDir(s))) + + return newSourceFS(s.Name(), downloadFS, c, s), nil +} diff --git a/src/sources/ytdlp/fs.go b/src/sources/ytdlp/fs.go new file mode 100644 index 0000000..8add3a2 --- /dev/null +++ b/src/sources/ytdlp/fs.go @@ -0,0 +1,69 @@ +package ytdlp + +import ( + "context" + "io/fs" + + "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" + "git.kmsign.ru/royalcat/tstor/src/vfs" +) + +type SourceFS struct { + service *Service + source ytdlpSource + + fs ctxbilly.Filesystem + + vfs.DefaultFS +} + +var _ vfs.Filesystem = (*SourceFS)(nil) + +func newSourceFS(name string, fs ctxbilly.Filesystem, service *Service, source ytdlpSource) *SourceFS { + return &SourceFS{ + fs: fs, + service: service, + source: source, + DefaultFS: vfs.DefaultFS(name), + } +} + +// Open implements vfs.Filesystem. +func (s *SourceFS) Open(ctx context.Context, filename string) (vfs.File, error) { + info, err := s.fs.Stat(ctx, filename) + if err != nil { + return nil, err + } + + f, err := s.fs.Open(ctx, filename) + if err != nil { + return nil, err + } + + return vfs.NewCtxBillyFile(info, f), nil +} + +// ReadDir implements vfs.Filesystem. +func (s *SourceFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { + infos, err := s.fs.ReadDir(ctx, path) + if err != nil { + return nil, err + } + + entries := make([]fs.DirEntry, 0, len(infos)) + for _, info := range infos { + entries = append(entries, vfs.NewFileInfo(info.Name(), info.Size())) + } + + return entries, nil +} + +// Stat implements vfs.Filesystem. +func (s *SourceFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { + return s.fs.Stat(ctx, filename) +} + +// Unlink implements vfs.Filesystem. +func (s *SourceFS) Unlink(ctx context.Context, filename string) error { + return vfs.ErrNotImplemented +} diff --git a/src/sources/ytdlp/task.go b/src/sources/ytdlp/task.go new file mode 100644 index 0000000..d36b7f9 --- /dev/null +++ b/src/sources/ytdlp/task.go @@ -0,0 +1,7 @@ +package ytdlp + +import "io" + +type TaskUpdater interface { + Output() io.Writer +} diff --git a/src/sources/ytdlp/ytdlp.go b/src/sources/ytdlp/ytdlp.go new file mode 100644 index 0000000..916aa30 --- /dev/null +++ b/src/sources/ytdlp/ytdlp.go @@ -0,0 +1,43 @@ +package ytdlp + +import ( + "context" + "crypto/sha1" + + "git.kmsign.ru/royalcat/tstor/pkg/ytdlp" + "github.com/royalcat/ctxprogress" +) + +type ytdlpSource struct { + Url string `json:"url"` +} + +var hasher = sha1.New() + +func (s *ytdlpSource) Name() string { + return string(hasher.Sum([]byte(s.Url))) +} + +func (s *ytdlpSource) Download(ctx context.Context, task TaskUpdater, dir string) error { + client, err := ytdlp.New() + if err != nil { + return err + } + ctxprogress.New(ctx) + ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 0, Total: 2}) + plst, err := client.Playlist(ctx, s.Url) + ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 1, Total: 2}) + ctxprogress.Range(ctx, plst, func(ctx context.Context, _ int, e ytdlp.PlaylistEntry) bool { + err = client.Download(ctx, e.Url(), dir) + if err != nil { + return false + } + return true + }) + ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 2, Total: 2}) + if err != nil { + return err + } + + return nil +} diff --git a/src/host/vfs/archive.go b/src/vfs/archive.go similarity index 96% rename from src/host/vfs/archive.go rename to src/vfs/archive.go index 807f3e9..376ad7f 100644 --- a/src/host/vfs/archive.go +++ b/src/vfs/archive.go @@ -11,9 +11,10 @@ import ( "sync" "time" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" + "git.kmsign.ru/royalcat/tstor/pkg/ioutils" "github.com/bodgit/sevenzip" "github.com/nwaples/rardecode/v2" + "github.com/royalcat/ctxio" ) var ArchiveFactories = map[string]FsFactory{ @@ -138,7 +139,7 @@ func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e for p, _ := range afs.files { if strings.HasPrefix(p, filename) { - return newDirInfo(path.Base(filename)), nil + return NewDirInfo(path.Base(filename)), nil } } @@ -173,7 +174,7 @@ func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archi size: size, af: af, - buffer: ctxio.NewFileBuffer(nil), + buffer: ioutils.NewFileBuffer(nil), } } @@ -188,7 +189,7 @@ type archiveFile struct { offset int64 readen int64 - buffer *ctxio.FileBuffer + buffer *ioutils.FileBuffer } // Name implements File. @@ -350,7 +351,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) ( var _ archiveLoader = RarLoader func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) { - reader := ctxio.WrapIoReadSeeker(ctx, ctxreader, size) + reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) r, err := rardecode.NewReader(reader) if err != nil { @@ -369,7 +370,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s name := header.Name af := func(ctx context.Context) (io.ReadCloser, error) { - reader := ctxio.WrapIoReadSeeker(ctx, ctxreader, size) + reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) r, err := rardecode.NewReader(reader) if err != nil { return nil, err diff --git a/src/host/vfs/archive_test.go b/src/vfs/archive_test.go similarity index 96% rename from src/host/vfs/archive_test.go rename to src/vfs/archive_test.go index 443abe2..c6c1ee3 100644 --- a/src/host/vfs/archive_test.go +++ b/src/vfs/archive_test.go @@ -7,8 +7,8 @@ import ( "io" "testing" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/royalcat/ctxio" "github.com/stretchr/testify/require" ) diff --git a/src/host/vfs/ctxbillyfs.go b/src/vfs/ctxbillyfs.go similarity index 96% rename from src/host/vfs/ctxbillyfs.go rename to src/vfs/ctxbillyfs.go index e040731..f8b61a4 100644 --- a/src/host/vfs/ctxbillyfs.go +++ b/src/vfs/ctxbillyfs.go @@ -46,10 +46,7 @@ func (c *CtxBillyFs) Open(ctx context.Context, filename string) (File, error) { if err != nil { return nil, err } - return &CtxBillyFile{ - info: info, - file: bf, - }, nil + return NewCtxBillyFile(info, bf), nil } // ReadDir implements Filesystem. @@ -98,6 +95,13 @@ func (c *CtxBillyFs) Unlink(ctx context.Context, filename string) error { return fs.ErrInvalid } +func NewCtxBillyFile(info fs.FileInfo, bf ctxbilly.File) *CtxBillyFile { + return &CtxBillyFile{ + info: info, + file: bf, + } +} + var _ File = (*CtxBillyFile)(nil) type CtxBillyFile struct { diff --git a/src/vfs/default.go b/src/vfs/default.go new file mode 100644 index 0000000..e6581bf --- /dev/null +++ b/src/vfs/default.go @@ -0,0 +1,27 @@ +package vfs + +import ( + "io/fs" +) + +type DefaultFS string + +// Info implements Filesystem. +func (d DefaultFS) Info() (fs.FileInfo, error) { + return NewDirInfo(string(d)), nil +} + +// IsDir implements Filesystem. +func (d DefaultFS) IsDir() bool { + return true +} + +// Name implements Filesystem. +func (d DefaultFS) Name() string { + return string(d) +} + +// Type implements Filesystem. +func (d *DefaultFS) Type() fs.FileMode { + return fs.ModeDir +} diff --git a/src/host/vfs/dir.go b/src/vfs/dir.go similarity index 96% rename from src/host/vfs/dir.go rename to src/vfs/dir.go index 8d11eb2..c11864a 100644 --- a/src/host/vfs/dir.go +++ b/src/vfs/dir.go @@ -25,7 +25,7 @@ func (d *dirFile) Close(ctx context.Context) error { // Info implements File. func (d *dirFile) Info() (fs.FileInfo, error) { - return newDirInfo(d.name), nil + return NewDirInfo(d.name), nil } // IsDir implements File. diff --git a/src/host/vfs/dummy.go b/src/vfs/dummy.go similarity index 90% rename from src/host/vfs/dummy.go rename to src/vfs/dummy.go index e46918c..8f83993 100644 --- a/src/host/vfs/dummy.go +++ b/src/vfs/dummy.go @@ -24,16 +24,6 @@ func (d *DummyFs) Mode() fs.FileMode { return fs.ModeDir } -// Size implements Filesystem. -func (d *DummyFs) Size() int64 { - panic("unimplemented") -} - -// Sys implements Filesystem. -func (d *DummyFs) Sys() any { - panic("unimplemented") -} - // FsName implements Filesystem. func (d *DummyFs) FsName() string { return "dummyfs" @@ -65,7 +55,7 @@ func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, erro // Info implements Filesystem. func (d *DummyFs) Info() (fs.FileInfo, error) { - return newDirInfo(d.name), nil + return NewDirInfo(d.name), nil } // IsDir implements Filesystem. diff --git a/src/host/vfs/fs.go b/src/vfs/fs.go similarity index 92% rename from src/host/vfs/fs.go rename to src/vfs/fs.go index 7df9395..88cc267 100644 --- a/src/host/vfs/fs.go +++ b/src/vfs/fs.go @@ -7,7 +7,7 @@ import ( "path" "time" - "git.kmsign.ru/royalcat/tstor/pkg/ctxio" + "github.com/royalcat/ctxio" "go.opentelemetry.io/otel" ) @@ -24,7 +24,7 @@ type File interface { var ErrNotImplemented = errors.New("not implemented") -var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/host/vfs") +var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/vfs") type Filesystem interface { // Open opens the named file for reading. If successful, methods on the @@ -55,7 +55,7 @@ type fileInfo struct { var _ fs.FileInfo = &fileInfo{} var _ fs.DirEntry = &fileInfo{} -func newDirInfo(name string) *fileInfo { +func NewDirInfo(name string) *fileInfo { return &fileInfo{ name: path.Base(name), size: 0, diff --git a/src/host/vfs/fs_test.go b/src/vfs/fs_test.go similarity index 96% rename from src/host/vfs/fs_test.go rename to src/vfs/fs_test.go index 5c14345..9b79de0 100644 --- a/src/host/vfs/fs_test.go +++ b/src/vfs/fs_test.go @@ -29,7 +29,7 @@ func TestDirInfo(t *testing.T) { require := require.New(t) - fi := newDirInfo("abc/name") + fi := NewDirInfo("abc/name") require.True(fi.IsDir()) require.Equal("name", fi.Name()) diff --git a/src/host/vfs/log.go b/src/vfs/log.go similarity index 100% rename from src/host/vfs/log.go rename to src/vfs/log.go diff --git a/src/host/vfs/memory.go b/src/vfs/memory.go similarity index 98% rename from src/host/vfs/memory.go rename to src/vfs/memory.go index 826f468..ce0d63e 100644 --- a/src/host/vfs/memory.go +++ b/src/vfs/memory.go @@ -42,7 +42,7 @@ func (fs *MemoryFs) FsName() string { // Info implements Filesystem. func (fs *MemoryFs) Info() (fs.FileInfo, error) { - return newDirInfo(fs.name), nil + return NewDirInfo(fs.name), nil } // IsDir implements Filesystem. diff --git a/src/host/vfs/memory_test.go b/src/vfs/memory_test.go similarity index 100% rename from src/host/vfs/memory_test.go rename to src/vfs/memory_test.go diff --git a/src/host/vfs/os.go b/src/vfs/os.go similarity index 97% rename from src/host/vfs/os.go rename to src/vfs/os.go index 1eeaede..7244096 100644 --- a/src/host/vfs/os.go +++ b/src/vfs/os.go @@ -17,7 +17,7 @@ var _ Filesystem = (*OsFS)(nil) // Stat implements Filesystem. func (fs *OsFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { if path.Clean(filename) == Separator { - return newDirInfo(Separator), nil + return NewDirInfo(Separator), nil } info, err := os.Stat(path.Join(fs.hostDir, filename)) @@ -48,7 +48,7 @@ func (o *OsFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) { // Info implements Filesystem. func (fs *OsFS) Info() (fs.FileInfo, error) { - return newDirInfo(fs.Name()), nil + return NewDirInfo(fs.Name()), nil } // IsDir implements Filesystem. diff --git a/src/host/vfs/os_test.go b/src/vfs/os_test.go similarity index 97% rename from src/host/vfs/os_test.go rename to src/vfs/os_test.go index 06f26e3..2768aab 100644 --- a/src/host/vfs/os_test.go +++ b/src/vfs/os_test.go @@ -5,7 +5,7 @@ import ( "os" "testing" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/stretchr/testify/require" ) diff --git a/src/host/vfs/resolver.go b/src/vfs/resolver.go similarity index 99% rename from src/host/vfs/resolver.go rename to src/vfs/resolver.go index 190ea1b..bfd35dc 100644 --- a/src/host/vfs/resolver.go +++ b/src/vfs/resolver.go @@ -349,7 +349,7 @@ func ListDirFromFiles[F File](m map[string]F, name string) ([]fs.DirEntry, error if len(parts) == 1 { out = append(out, NewFileInfo(parts[0], f.Size())) } else { - out = append(out, newDirInfo(parts[0])) + out = append(out, NewDirInfo(parts[0])) } } diff --git a/src/host/vfs/resolver_test.go b/src/vfs/resolver_test.go similarity index 99% rename from src/host/vfs/resolver_test.go rename to src/vfs/resolver_test.go index 4b927fd..5b9c141 100644 --- a/src/host/vfs/resolver_test.go +++ b/src/vfs/resolver_test.go @@ -6,7 +6,7 @@ import ( "context" "testing" - "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/stretchr/testify/require" ) diff --git a/src/host/vfs/utils.go b/src/vfs/utils.go similarity index 100% rename from src/host/vfs/utils.go rename to src/vfs/utils.go