From 832f5b9710ffca509e7828af2ff4f8ebce51a953 Mon Sep 17 00:00:00 2001 From: royalcat Date: Mon, 20 Jan 2025 05:18:15 +0300 Subject: [PATCH] wip --- .gqlgen.yml | 8 +- cmd/tstor/main.go | 75 +-- daemons/archive/archive.go | 123 +++++ {src/vfs => daemons/archive}/archive_cache.go | 7 +- {src/vfs => daemons/archive}/archive_test.go | 5 +- daemons/archive/daemon.go | 44 ++ daemons/archive/fs.go | 158 +++++++ daemons/archive/rar.go | 69 +++ daemons/archive/sevenzip.go | 59 +++ daemons/archive/zip.go | 63 +++ daemons/qbittorrent/client.go | 2 +- daemons/qbittorrent/config.go | 66 +++ daemons/qbittorrent/daemon.go | 47 +- daemons/qbittorrent/go.mod | 3 + daemons/qbittorrent/plugin/main.go | 10 + graphql/subscription.graphql | 5 - graphql/types/fs.graphql | 10 +- src/config/default.go | 47 -- src/config/load.go | 18 +- src/config/model.go | 13 +- src/daemon/daemon.go | 17 + src/daemon/hostedfs.go | 33 ++ src/daemon/plugin.go | 46 ++ src/daemons/storage.go | 29 -- src/delivery/graphql/generated.go | 297 +----------- src/delivery/graphql/model/entry.go | 15 +- src/delivery/graphql/model/mappers.go | 48 -- src/delivery/graphql/model/models_gen.go | 22 - src/delivery/graphql/resolver/fs.resolvers.go | 21 - src/delivery/http.go | 14 +- src/vfs/archive.go | 440 ------------------ src/vfs/resolver_test.go | 167 ++++--- ui/lib/api/schema.graphql | 5 - 33 files changed, 900 insertions(+), 1086 deletions(-) create mode 100644 daemons/archive/archive.go rename {src/vfs => daemons/archive}/archive_cache.go (96%) rename {src/vfs => daemons/archive}/archive_test.go (95%) create mode 100644 daemons/archive/daemon.go create mode 100644 daemons/archive/fs.go create mode 100644 daemons/archive/rar.go create mode 100644 daemons/archive/sevenzip.go create mode 100644 daemons/archive/zip.go create mode 100644 daemons/qbittorrent/config.go create mode 100644 daemons/qbittorrent/go.mod create mode 100644 daemons/qbittorrent/plugin/main.go create mode 100644 src/daemon/daemon.go create mode 100644 src/daemon/hostedfs.go create mode 100644 src/daemon/plugin.go delete mode 100644 src/daemons/storage.go delete mode 100644 src/vfs/archive.go diff --git a/.gqlgen.yml b/.gqlgen.yml index 591c55a..c7fd5d1 100644 --- a/.gqlgen.yml +++ b/.gqlgen.yml @@ -34,10 +34,10 @@ models: extraFields: FS: type: "*git.kmsign.ru/royalcat/tstor/src/vfs.ResolverFS" - ArchiveFS: - extraFields: - FS: - type: "*git.kmsign.ru/royalcat/tstor/src/vfs.ArchiveFS" + # ArchiveFS: + # extraFields: + # FS: + # type: "*git.kmsign.ru/royalcat/tstor/src/vfs.ArchiveFS" TorrentOps: extraFields: InfoHash: diff --git a/cmd/tstor/main.go b/cmd/tstor/main.go index c25da4d..42133c9 100644 --- a/cmd/tstor/main.go +++ b/cmd/tstor/main.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "path/filepath" "net" nethttp "net/http" @@ -13,17 +14,13 @@ import ( "os/signal" "syscall" - "git.kmsign.ru/royalcat/tstor/daemons/qbittorrent" - "git.kmsign.ru/royalcat/tstor/daemons/ytdlp" - "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" wnfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" - "git.kmsign.ru/royalcat/tstor/src/daemons" + "git.kmsign.ru/royalcat/tstor/src/daemon" "git.kmsign.ru/royalcat/tstor/src/delivery" "git.kmsign.ru/royalcat/tstor/src/telemetry" "git.kmsign.ru/royalcat/tstor/src/vfs" - "github.com/go-git/go-billy/v5/osfs" "github.com/urfave/cli/v2" _ "git.kmsign.ru/royalcat/tstor/pkg/rlog" @@ -64,7 +61,7 @@ func main() { } func run(configPath string) error { - conf, err := config.Load(configPath) + conf, koanf, err := config.Load(configPath) if err != nil { return fmt.Errorf("error loading configuration: %w", err) } @@ -79,6 +76,22 @@ func run(configPath string) error { log := rlog.Component("run") + daemons := []daemon.Daemon{} + + plugins, err := os.ReadDir(conf.DaemonsPluginsDir) + for _, v := range plugins { + if v.IsDir() { + continue + } + path := filepath.Join(conf.DaemonsPluginsDir, v.Name()) + dm, err := daemon.LoadFromPlugin(ctx, path, koanf) + if err != nil { + log.Error(ctx, "error registering plugin daemon", rlog.Error(err)) + } + + daemons = append(daemons, dm) + } + // TODO make optional // err = syscall.Setpriority(syscall.PRIO_PGRP, 0, 19) // if err != nil { @@ -89,31 +102,13 @@ func run(configPath string) error { return fmt.Errorf("error creating data folder: %w", err) } - sourceFs := osfs.New(conf.SourceDir, osfs.WithBoundOS()) - // tsrv, err := torrent.NewDaemon(sourceFs, conf.Sources.TorrentClient) - // if err != nil { - // return fmt.Errorf("error creating service: %w", err) - // } + // sourceFs := vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(osfs.New(conf.SourceDir, osfs.WithBoundOS()))) - err = os.MkdirAll("./ytdlp", 0744) + hostedfs, err := daemon.NewHostedFS(vfs.NewOsFs(conf.SourceDir), daemons) if err != nil { - return err + return fmt.Errorf("error creating hosted filesystem: %w", err) } - ytdlpsrv, err := ytdlp.NewService("./ytdlp") - if err != nil { - return err - } - - qtdaemon, err := qbittorrent.NewDaemon(conf.Sources.QBittorrent) - if err != nil { - return fmt.Errorf("error creating qbittorrent daemon: %w", err) - } - - sfs := daemons.NewHostedFS( - vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(sourceFs)), - qtdaemon, ytdlpsrv, - ) - sfs, err = vfs.WrapLogFS(sfs) + hostedfs, err = vfs.WrapLogFS(hostedfs) if err != nil { return err } @@ -130,7 +125,7 @@ func run(configPath string) error { if conf.Mounts.Fuse.Enabled { mh := fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path) - err := mh.Mount(sfs) + err := mh.Mount(hostedfs) if err != nil { return fmt.Errorf("mount fuse error: %w", err) } @@ -139,7 +134,7 @@ func run(configPath string) error { if conf.Mounts.WebDAV.Enabled { go func() { - if err := webdav.NewWebDAVServer(sfs, conf.Mounts.WebDAV.Port, conf.Mounts.WebDAV.User, conf.Mounts.WebDAV.Pass); err != nil { + if err := webdav.NewWebDAVServer(hostedfs, conf.Mounts.WebDAV.Port, conf.Mounts.WebDAV.User, conf.Mounts.WebDAV.Pass); err != nil { log.Error(ctx, "error starting webDAV", rlog.Error(err)) } @@ -148,7 +143,7 @@ func run(configPath string) error { } if conf.Mounts.HttpFs.Enabled { go func() { - httpfs := httpfs.NewHTTPFS(sfs) + httpfs := httpfs.NewHTTPFS(hostedfs) addr := fmt.Sprintf("0.0.0.0:%d", conf.Mounts.HttpFs.Port) err = nethttp.ListenAndServe(addr, nethttp.FileServer(httpfs)) if err != nil { @@ -177,7 +172,7 @@ func run(configPath string) error { return } log.Info(ctx, "starting NFS server", slog.String("address", listener.Addr().String())) - handler, err := nfs.NewNFSv3Handler(sfs, conf.Mounts.NFS) + handler, err := nfs.NewNFSv3Handler(hostedfs, conf.Mounts.NFS) if err != nil { log.Error(ctx, "failed to create NFS handler", rlog.Error(err)) return @@ -199,7 +194,7 @@ func run(configPath string) error { }() go func() { - err := delivery.Run(qtdaemon, sfs, conf) + err := delivery.Run(conf.WebUi, hostedfs, daemons) if err != nil { log.Error(ctx, "error initializing HTTP server", rlog.Error(err)) } @@ -209,8 +204,14 @@ func run(configPath string) error { signal.Notify(sigChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigChan - return errors.Join( - // tsrv.Close(ctx), - qtdaemon.Close(ctx), - ) + errs := []error{} + + for _, dm := range daemons { + err := dm.Close(ctx) + if err != nil { + errs = append(errs, err) + } + } + + return errors.Join(errs...) } diff --git a/daemons/archive/archive.go b/daemons/archive/archive.go new file mode 100644 index 0000000..d9f8460 --- /dev/null +++ b/daemons/archive/archive.go @@ -0,0 +1,123 @@ +package archive + +import ( + "context" + "io/fs" + "strings" + "time" + + "git.kmsign.ru/royalcat/tstor/src/vfs" +) + +var ArchiveFactories = map[string]vfs.FsFactory{ + ".zip": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) { + stat, err := f.Info() + if err != nil { + return nil, err + } + return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), ZipLoader) + }, + ".rar": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) { + stat, err := f.Info() + if err != nil { + return nil, err + } + return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), RarLoader) + }, + ".7z": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) { + stat, err := f.Info() + if err != nil { + return nil, err + } + return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), SevenZipLoader) + }, +} + +type archiveLoader func(ctx context.Context, archivePath string, r vfs.File, size int64) (map[string]fileEntry, error) + +var _ vfs.Filesystem = &ArchiveFS{} + +type fileEntry struct { + fs.FileInfo + open func(ctx context.Context) (vfs.File, error) +} + +type ArchiveFS struct { + name string + size int64 + files map[string]fileEntry +} + +// Rename implements Filesystem. +func (a *ArchiveFS) Rename(ctx context.Context, oldpath string, newpath string) error { + return vfs.ErrNotImplemented +} + +// ModTime implements Filesystem. +func (a *ArchiveFS) ModTime() time.Time { + return time.Time{} +} + +// Mode implements Filesystem. +func (a *ArchiveFS) Mode() fs.FileMode { + return fs.ModeDir +} + +// Size implements Filesystem. +func (a *ArchiveFS) Size() int64 { + return int64(a.size) +} + +// Sys implements Filesystem. +func (a *ArchiveFS) Sys() any { + return nil +} + +// FsName implements Filesystem. +func (a *ArchiveFS) FsName() string { + return "archivefs" +} + +func NewArchive(ctx context.Context, archivePath, name string, f vfs.File, size int64, loader archiveLoader) (*ArchiveFS, error) { + archiveFiles, err := loader(ctx, archivePath, f, size) + if err != nil { + return nil, err + } + + // TODO make optional + singleDir := true + for k := range archiveFiles { + if !strings.HasPrefix(k, "/"+name+"/") { + singleDir = false + break + } + } + + files := make(map[string]fileEntry, len(archiveFiles)) + for k, v := range archiveFiles { + // TODO make optional + if strings.Contains(k, "/__MACOSX/") { + continue + } + + if singleDir { + k, _ = strings.CutPrefix(k, "/"+name) + } + + files[k] = v + } + + // FIXME configurable + files["/.forcegallery"] = fileEntry{ + FileInfo: vfs.NewFileInfo("/.forcegallery", 0, time.Time{}), + open: func(ctx context.Context) (vfs.File, error) { + return vfs.NewMemoryFile(".forcegallery", []byte{}), nil + }, + } + + return &ArchiveFS{ + name: name, + size: size, + files: files, + }, nil +} diff --git a/src/vfs/archive_cache.go b/daemons/archive/archive_cache.go similarity index 96% rename from src/vfs/archive_cache.go rename to daemons/archive/archive_cache.go index fd3bde0..00f1500 100644 --- a/src/vfs/archive_cache.go +++ b/daemons/archive/archive_cache.go @@ -1,4 +1,4 @@ -package vfs +package archive import ( "context" @@ -7,6 +7,7 @@ import ( "io" "sync" + "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/hashicorp/golang-lru/arc/v2" "github.com/royalcat/ctxio" "go.opentelemetry.io/otel/attribute" @@ -19,8 +20,10 @@ const blockSize = 1024 * 16 // 16KB const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage const defaultBlockCount = cacheSize / blockSize +type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error) + type archiveFileIndex struct { - archiveHash Hash + archiveHash vfs.Hash filename string } diff --git a/src/vfs/archive_test.go b/daemons/archive/archive_test.go similarity index 95% rename from src/vfs/archive_test.go rename to daemons/archive/archive_test.go index fcac485..cce26e5 100644 --- a/src/vfs/archive_test.go +++ b/daemons/archive/archive_test.go @@ -1,4 +1,4 @@ -package vfs_test +package archive_test import ( "archive/zip" @@ -8,6 +8,7 @@ import ( "io/fs" "testing" + "git.kmsign.ru/royalcat/tstor/daemons/archive" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/stretchr/testify/require" ) @@ -48,7 +49,7 @@ func TestZipFilesystem(t *testing.T) { ctx := context.Background() // TODO add single dir collapse test - zfs, err := vfs.NewArchive(ctx, "test", "test", zReader, size, vfs.ZipLoader) + zfs, err := archive.NewArchive(ctx, "test", "test", zReader, size, archive.ZipLoader) require.NoError(err) files, err := zfs.ReadDir(ctx, "/path/to/test/file") diff --git a/daemons/archive/daemon.go b/daemons/archive/daemon.go new file mode 100644 index 0000000..e916209 --- /dev/null +++ b/daemons/archive/daemon.go @@ -0,0 +1,44 @@ +package archive + +import ( + "context" + + "git.kmsign.ru/royalcat/tstor/src/daemon" + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/knadh/koanf/v2" + "go.opentelemetry.io/otel" +) + +const DaemonName string = "archive" + +var _ daemon.DaemonConstructor = NewDaemon + +func NewDaemon(ctx context.Context, koanf *koanf.Koanf) (daemon.Daemon, error) { + return &Daemon{}, nil +} + +var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/daemons/archive") + +var _ daemon.Daemon = (*Daemon)(nil) + +type Daemon struct{} + +// Name implements daemon.Daemon. +func (d *Daemon) Name() string { + return DaemonName +} + +// Extensions implements daemon.Daemon. +func (d *Daemon) Extensions() []string { + return []string{".zip", ".rar", ".7z"} +} + +// GetFS implements daemon.Daemon. +func (d *Daemon) GetFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) { + panic("unimplemented") +} + +// Close implements daemon.Daemon. +func (d *Daemon) Close(ctx context.Context) error { + panic("unimplemented") +} diff --git a/daemons/archive/fs.go b/daemons/archive/fs.go new file mode 100644 index 0000000..6758964 --- /dev/null +++ b/daemons/archive/fs.go @@ -0,0 +1,158 @@ +package archive + +import ( + "context" + "io" + "io/fs" + "path" + "strings" + "sync" + "time" + + "git.kmsign.ru/royalcat/tstor/src/vfs" +) + +// Unlink implements Filesystem. +func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error { + return vfs.ErrNotImplemented +} + +func (a *ArchiveFS) Open(ctx context.Context, filename string) (vfs.File, error) { + if filename == vfs.Separator { + return vfs.NewDirFile(filename), nil + } + + f, ok := a.files[filename] + if ok { + return f.open(ctx) + } + + for p := range a.files { + if strings.HasPrefix(p, filename) { + return vfs.NewDirFile(filename), nil + } + } + + return nil, vfs.ErrNotExist +} + +func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { + infos := make(map[string]fs.FileInfo, len(a.files)) + for k, v := range a.files { + infos[k] = v + } + + return vfs.ListDirFromInfo(infos, path) +} + +// Stat implements Filesystem. +func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { + if entry, ok := afs.files[filename]; ok { + return entry, nil + } + + for p, _ := range afs.files { + if strings.HasPrefix(p, filename) { + return vfs.NewDirInfo(path.Base(filename), time.Time{}), nil + } + } + + return nil, vfs.ErrNotExist +} + +// Info implements Filesystem. +func (a *ArchiveFS) Info() (fs.FileInfo, error) { + return a, nil +} + +// IsDir implements Filesystem. +func (a *ArchiveFS) IsDir() bool { + return true +} + +// Name implements Filesystem. +func (a *ArchiveFS) Name() string { + return a.name +} + +// Type implements Filesystem. +func (a *ArchiveFS) Type() fs.FileMode { + return fs.ModeDir +} + +var _ vfs.File = (*archiveFile)(nil) + +func newArchiveFile(name string, size int64, rr *randomReaderFromLinear) *archiveFile { + return &archiveFile{ + name: name, + size: size, + rr: rr, + } +} + +type archiveFile struct { + name string + size int64 + + m sync.Mutex + offset int64 + + rr *randomReaderFromLinear +} + +// Seek implements File. +func (d *archiveFile) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + d.offset = offset + + case io.SeekCurrent: + d.offset += offset + case io.SeekEnd: + d.offset = d.size + offset + } + return d.offset, nil +} + +// Name implements File. +func (d *archiveFile) Name() string { + return d.name +} + +// Type implements File. +func (d *archiveFile) Type() fs.FileMode { + return vfs.ModeFileRO +} + +func (d *archiveFile) Info() (fs.FileInfo, error) { + return vfs.NewFileInfo(d.name, d.size, time.Time{}), nil +} + +func (d *archiveFile) Size() int64 { + return d.size +} + +func (d *archiveFile) IsDir() bool { + return false +} + +func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) { + ctx, span := tracer.Start(ctx, "archive.File.Read") + defer span.End() + + n, err = d.rr.ReadAt(ctx, p, d.offset) + d.offset += int64(n) + return n, err +} + +func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + d.m.Lock() + defer d.m.Unlock() + + return d.rr.ReadAt(ctx, p, off) +} + +func (d *archiveFile) Close(ctx context.Context) error { + // FIXME close should do nothing as archive fs currently reuse the same file instances + return nil +} diff --git a/daemons/archive/rar.go b/daemons/archive/rar.go new file mode 100644 index 0000000..79ba4f7 --- /dev/null +++ b/daemons/archive/rar.go @@ -0,0 +1,69 @@ +package archive + +import ( + "context" + "fmt" + "io" + + "git.kmsign.ru/royalcat/tstor/pkg/ioutils" + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/nwaples/rardecode/v2" + "github.com/royalcat/ctxio" +) + +var _ archiveLoader = RarLoader + +func RarLoader(ctx context.Context, archivePath string, f vfs.File, size int64) (map[string]fileEntry, error) { + hash, err := vfs.FileHash(ctx, f) + if err != nil { + return nil, err + } + + reader := ioutils.WrapIoReadSeeker(ctx, f, size) + + r, err := rardecode.NewReader(reader) + if err != nil { + return nil, err + } + + out := make(map[string]fileEntry) + for { + header, err := r.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + name := header.Name + af := func(ctx context.Context) (ctxio.ReadCloser, error) { + reader := ioutils.WrapIoReadSeeker(ctx, f, size) + r, err := rardecode.NewReader(reader) + if err != nil { + return nil, err + } + + for header, err := r.Next(); err != io.EOF; header, err = r.Next() { + if err != nil { + return nil, err + } + if header.Name == name { + return ctxio.NopCloser(ctxio.WrapIoReader(r)), nil + } + } + return nil, fmt.Errorf("file with name '%s' not found", name) + } + + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af) + + out[vfs.AbsPath(header.Name)] = fileEntry{ + FileInfo: vfs.NewFileInfo(header.Name, header.UnPackedSize, header.ModificationTime), + open: func(ctx context.Context) (vfs.File, error) { + return newArchiveFile(header.Name, header.UnPackedSize, rr), nil + }, + } + } + + return out, nil +} diff --git a/daemons/archive/sevenzip.go b/daemons/archive/sevenzip.go new file mode 100644 index 0000000..4351a04 --- /dev/null +++ b/daemons/archive/sevenzip.go @@ -0,0 +1,59 @@ +package archive + +import ( + "context" + + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/bodgit/sevenzip" + "github.com/royalcat/ctxio" +) + +var _ archiveLoader = SevenZipLoader + +func SevenZipLoader(ctx context.Context, archivePath string, ctxreader vfs.File, size int64) (map[string]fileEntry, error) { + hash, err := vfs.FileHash(ctx, ctxreader) + if err != nil { + return nil, err + } + + reader := ctxio.IoReaderAt(ctx, ctxreader) + r, err := sevenzip.NewReader(reader, size) + if err != nil { + return nil, err + } + + out := make(map[string]fileEntry) + for i, f := range r.File { + if f.FileInfo().IsDir() { + continue + } + + af := func(ctx context.Context) (ctxio.ReadCloser, error) { + reader := ctxio.IoReaderAt(ctx, ctxreader) + zr, err := sevenzip.NewReader(reader, size) + if err != nil { + return nil, err + } + + rc, err := zr.File[i].Open() + if err != nil { + return nil, err + } + + return ctxio.WrapIoReadCloser(rc), nil + } + + info := f.FileInfo() + + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af) + + out[vfs.AbsPath(f.Name)] = fileEntry{ + FileInfo: f.FileInfo(), + open: func(ctx context.Context) (vfs.File, error) { + return newArchiveFile(info.Name(), info.Size(), rr), nil + }, + } + } + + return out, nil +} diff --git a/daemons/archive/zip.go b/daemons/archive/zip.go new file mode 100644 index 0000000..3e02e69 --- /dev/null +++ b/daemons/archive/zip.go @@ -0,0 +1,63 @@ +package archive + +import ( + "archive/zip" + "context" + "fmt" + + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/royalcat/ctxio" +) + +var _ archiveLoader = ZipLoader + +func ZipLoader(ctx context.Context, archivePath string, f vfs.File, size int64) (map[string]fileEntry, error) { + hash, err := vfs.FileHash(ctx, f) + if err != nil { + return nil, err + } + + reader := ctxio.IoReaderAt(ctx, f) + zr, err := zip.NewReader(reader, size) + if err != nil { + return nil, err + } + + out := make(map[string]fileEntry) + for i := range zr.File { + zipFile := zr.File[i] + if zipFile.FileInfo().IsDir() { + continue + } + + i := i + af := func(ctx context.Context) (ctxio.ReadCloser, error) { + reader := ctxio.IoReaderAt(ctx, f) + + zr, err := zip.NewReader(reader, size) + if err != nil { + return nil, fmt.Errorf("failed to create zip reader: %w", err) + } + + rc, err := zr.File[i].Open() + if err != nil { + return nil, fmt.Errorf("failed to open file in zip archive: %w", err) + } + + return ctxio.WrapIoReadCloser(rc), nil + } + + info := zipFile.FileInfo() + + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af) + + out[vfs.AbsPath(zipFile.Name)] = fileEntry{ + FileInfo: info, + open: func(ctx context.Context) (vfs.File, error) { + return newArchiveFile(info.Name(), info.Size(), rr), nil + }, + } + } + + return out, nil +} diff --git a/daemons/qbittorrent/client.go b/daemons/qbittorrent/client.go index f86b294..e171b32 100644 --- a/daemons/qbittorrent/client.go +++ b/daemons/qbittorrent/client.go @@ -7,9 +7,9 @@ import ( "time" "git.kmsign.ru/royalcat/tstor/pkg/qbittorrent" - "github.com/creativecreature/sturdyc" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/royalcat/btrgo/btrsync" + "github.com/viccon/sturdyc" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" ) diff --git a/daemons/qbittorrent/config.go b/daemons/qbittorrent/config.go new file mode 100644 index 0000000..cd5c4ab --- /dev/null +++ b/daemons/qbittorrent/config.go @@ -0,0 +1,66 @@ +package qbittorrent + +import ( + "github.com/knadh/koanf/providers/structs" + "github.com/knadh/koanf/v2" +) + +type Config struct { + DataFolder string `koanf:"data_folder,omitempty"` + MetadataFolder string `koanf:"metadata_folder,omitempty"` +} + +var defaultConfig = Config{ + DataFolder: "./qbittorrent/data", + MetadataFolder: "./qbittorrent/metadata", +} + +func loadConfig(koanf *koanf.Koanf) (Config, error) { + if err := koanf.Load(structs.Provider(defaultConf, "koanf"), nil); err != nil { + return Config{}, err + } + + var config Config + if err := koanf.Unmarshal("", &config); err != nil { + return Config{}, err + } + + return config, nil +} + +// var defaultRoutes = []Route{ +// { +// Name: "multimedia", +// Torrents: []Torrent{ +// { +// MagnetURI: "magnet:?xt=urn:btih:c9e15763f722f23e98a29decdfae341b98d53056&dn=Cosmos+Laundromat&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fcosmos-laundromat.torrent", +// }, +// { +// MagnetURI: "magnet:?xt=urn:btih:dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c&dn=Big+Buck+Bunny&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fbig-buck-bunny.torrent", +// }, +// { +// MagnetURI: "magnet:?xt=urn:btih:08ada5a7a6183aae1e09d831df6748d566095a10&dn=Sintel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fsintel.torrent", +// }, +// { +// MagnetURI: "magnet:?xt=urn:btih:209c8226b299b308beaf2b9cd3fb49212dbd13ec&dn=Tears+of+Steel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Ftears-of-steel.torrent", +// }, +// { +// MagnetURI: "magnet:?xt=urn:btih:a88fda5954e89178c372716a6a78b8180ed4dad3&dn=The+WIRED+CD+-+Rip.+Sample.+Mash.+Share&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fwired-cd.torrent", +// }, +// }, +// }, +// } +// var defaultServers = []Server{ +// { +// Name: "server", +// Path: "server", +// Trackers: []string{ +// "wss://tracker.btorrent.xyz", +// "wss://tracker.openwebtorrent.com", +// "http://p4p.arenabg.com:1337/announce", +// "udp://tracker.opentrackr.org:1337/announce", +// "udp://open.tracker.cl:1337/announce", +// "http://openbittorrent.com:80/announce", +// }, +// }, +// } diff --git a/daemons/qbittorrent/daemon.go b/daemons/qbittorrent/daemon.go index 0f6aafb..6863811 100644 --- a/daemons/qbittorrent/daemon.go +++ b/daemons/qbittorrent/daemon.go @@ -14,7 +14,7 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/qbittorrent" "git.kmsign.ru/royalcat/tstor/pkg/rlog" - "git.kmsign.ru/royalcat/tstor/src/config" + "git.kmsign.ru/royalcat/tstor/src/daemon" "git.kmsign.ru/royalcat/tstor/src/logwrap" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/anacrolix/torrent/metainfo" @@ -22,6 +22,7 @@ import ( infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" mapset "github.com/deckarep/golang-set/v2" "github.com/iceber/iouring-go" + "github.com/knadh/koanf/v2" "github.com/royalcat/ctxio" "go.opentelemetry.io/otel" ) @@ -52,12 +53,20 @@ WebUI\LocalHostAuth=false WebUI\Password_PBKDF2="@ByteArray(qef5I4wZBkDG+PP6/5mQwA==:LoTmorQM/QM5RHI4+dOiu6xfAz9xak6fhR4ZGpRtJF3JNCGG081Yrtva4G71kXz//ODUuWQKTLlrZPuIDvzqUQ==)" ` -func NewDaemon(conf config.QBittorrent) (*Daemon, error) { - ctx := context.Background() - log := rlog.Component("qbittorrent") +const DaemonName = "qbittorrent" - binPath := conf.MetadataFolder + "/qbittorrent-nox" - err := downloadLatestQbitRelease(ctx, binPath) +var _ daemon.DaemonConstructor = NewDaemon + +func NewDaemon(ctx context.Context, koanf *koanf.Koanf) (daemon.Daemon, error) { + log := rlog.Component(DaemonName) + + config, err := loadConfig(koanf) + if err != nil { + return nil, err + } + + binPath := config.MetadataFolder + "/qbittorrent-nox" + err = downloadLatestQbitRelease(ctx, binPath) if err != nil { return nil, err } @@ -66,26 +75,26 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) { outLog := logwrap.NewSlogWriter(ctx, slog.LevelInfo, daemonLog.Slog()) errLog := logwrap.NewSlogWriter(ctx, slog.LevelError, daemonLog.Slog()) - _, err = os.Stat(conf.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf") + _, err = os.Stat(config.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf") if errors.Is(err, os.ErrNotExist) { - err = os.MkdirAll(conf.MetadataFolder+"/profile/qBittorrent/config", 0744) + err = os.MkdirAll(config.MetadataFolder+"/profile/qBittorrent/config", 0744) if err != nil { return nil, err } - err = os.WriteFile(conf.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644) + err = os.WriteFile(config.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644) if err != nil { return nil, err } } - err = os.MkdirAll(conf.DataFolder, 0744) + err = os.MkdirAll(config.DataFolder, 0744) if err != nil { return nil, err } const port = 25436 - proc, err := runQBittorrent(binPath, conf.MetadataFolder+"/profile", port, outLog, errLog) + proc, err := runQBittorrent(binPath, config.MetadataFolder+"/profile", port, outLog, errLog) if err != nil { return nil, err } @@ -109,7 +118,7 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) { time.Sleep(time.Second) } - dataDir, err := filepath.Abs(conf.DataFolder) + dataDir, err := filepath.Abs(config.DataFolder) if err != nil { return nil, err } @@ -129,15 +138,23 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) { return &Daemon{ qb: qb, proc: proc, - dataDir: conf.DataFolder, + dataDir: config.DataFolder, ur: ur, sourceFiles: make(map[string]string), registeredTorrents: mapset.NewSet[string](), client: wrapClient(qb), - log: rlog.Component("qbittorrent"), + log: rlog.Component(DaemonName), }, nil } +func (d *Daemon) Name() string { + return DaemonName +} + +func (d *Daemon) Extensions() []string { + return []string{".torrent"} +} + func (d *Daemon) Close(ctx context.Context) error { err := d.proc.Signal(os.Interrupt) if err != nil { @@ -156,7 +173,7 @@ func torrentDataPath(dataDir string, ih string) (string, error) { return filepath.Abs(path.Join(dataDir, ih)) } -func (fs *Daemon) GetTorrentFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) { +func (fs *Daemon) GetFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) { ctx, span := trace.Start(ctx, "GetTorrentFS") defer span.End() diff --git a/daemons/qbittorrent/go.mod b/daemons/qbittorrent/go.mod new file mode 100644 index 0000000..bce977f --- /dev/null +++ b/daemons/qbittorrent/go.mod @@ -0,0 +1,3 @@ +module github.com/royalcat/tstor/daemons/qbittorrent + +go 1.23.5 diff --git a/daemons/qbittorrent/plugin/main.go b/daemons/qbittorrent/plugin/main.go new file mode 100644 index 0000000..1b175a2 --- /dev/null +++ b/daemons/qbittorrent/plugin/main.go @@ -0,0 +1,10 @@ +package main + +import "git.kmsign.ru/royalcat/tstor/daemons/qbittorrent" + +func main() { +} + +const DaemonName = qbittorrent.DaemonName + +var NewDaemon = qbittorrent.NewDaemon diff --git a/graphql/subscription.graphql b/graphql/subscription.graphql index eb724c8..ba957fb 100644 --- a/graphql/subscription.graphql +++ b/graphql/subscription.graphql @@ -1,8 +1,3 @@ type Subscription { taskProgress(taskID: ID!): Progress } - -interface Progress { - current: Int! - total: Int! -} diff --git a/graphql/types/fs.graphql b/graphql/types/fs.graphql index 66b6ffe..45dde96 100644 --- a/graphql/types/fs.graphql +++ b/graphql/types/fs.graphql @@ -27,9 +27,9 @@ type ResolverFS implements Dir & FsEntry { entries: [FsEntry!]! @resolver } -type ArchiveFS implements Dir & FsEntry { - name: String! - entries: [FsEntry!]! @resolver +# type ArchiveFS implements Dir & FsEntry { +# name: String! +# entries: [FsEntry!]! @resolver - size: Int! -} +# size: Int! +# } diff --git a/src/config/default.go b/src/config/default.go index 36de950..1095596 100644 --- a/src/config/default.go +++ b/src/config/default.go @@ -6,16 +6,6 @@ var defaultConfig = Settings{ Port: 4444, IP: "0.0.0.0", }, - Sources: Sources{ - QBittorrent: QBittorrent{ - DataFolder: "./qbittorrent/data", - MetadataFolder: "./qbittorrent/metadata", - }, - TorrentClient: TorrentClient{ - DataFolder: "./torrent/data", - MetadataFolder: "./torrent/metadata", - }, - }, Mounts: Mounts{ HttpFs: HttpFs{ Enabled: true, @@ -43,40 +33,3 @@ var defaultConfig = Settings{ MaxSize: 50, }, } - -var defaultRoutes = []Route{ - { - Name: "multimedia", - Torrents: []Torrent{ - { - MagnetURI: "magnet:?xt=urn:btih:c9e15763f722f23e98a29decdfae341b98d53056&dn=Cosmos+Laundromat&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fcosmos-laundromat.torrent", - }, - { - MagnetURI: "magnet:?xt=urn:btih:dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c&dn=Big+Buck+Bunny&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fbig-buck-bunny.torrent", - }, - { - MagnetURI: "magnet:?xt=urn:btih:08ada5a7a6183aae1e09d831df6748d566095a10&dn=Sintel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fsintel.torrent", - }, - { - MagnetURI: "magnet:?xt=urn:btih:209c8226b299b308beaf2b9cd3fb49212dbd13ec&dn=Tears+of+Steel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Ftears-of-steel.torrent", - }, - { - MagnetURI: "magnet:?xt=urn:btih:a88fda5954e89178c372716a6a78b8180ed4dad3&dn=The+WIRED+CD+-+Rip.+Sample.+Mash.+Share&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fwired-cd.torrent", - }, - }, - }, -} -var defaultServers = []Server{ - { - Name: "server", - Path: "server", - Trackers: []string{ - "wss://tracker.btorrent.xyz", - "wss://tracker.openwebtorrent.com", - "http://p4p.arenabg.com:1337/announce", - "udp://tracker.opentrackr.org:1337/announce", - "udp://open.tracker.cl:1337/announce", - "http://openbittorrent.com:80/announce", - }, - }, -} diff --git a/src/config/load.go b/src/config/load.go index 53f1156..f6bfbda 100644 --- a/src/config/load.go +++ b/src/config/load.go @@ -15,22 +15,22 @@ var k = koanf.New(".") var Config = defaultConfig -func Load(path string) (*Settings, error) { +func Load(path string) (*Settings, *koanf.Koanf, error) { err := k.Load(structs.Provider(defaultConfig, "koanf"), nil) if err != nil { - return nil, err + return nil, nil, err } if path != "" { _, err := os.Stat(path) if err != nil && !os.IsNotExist(err) { // its ok if file doesnt exist - return nil, err + return nil, nil, err } err = k.Load(file.Provider(path), yaml.Parser()) if err != nil { - return nil, err + return nil, nil, err } } @@ -39,23 +39,23 @@ func Load(path string) (*Settings, error) { strings.TrimPrefix(s, "TSTOR_")), "_", ".", -1) }), nil) if err != nil { - return nil, err + return nil, nil, err } data, err := k.Marshal(yaml.Parser()) if err != nil { - return nil, err + return nil, nil, err } err = os.WriteFile(path, data, os.ModePerm) if err != nil { - return nil, err + return nil, nil, err } conf := Settings{} err = k.Unmarshal("", &conf) if err != nil { - return nil, err + return nil, nil, err } - return &conf, nil + return &conf, k, nil } diff --git a/src/config/model.go b/src/config/model.go index c41d042..c0a88e4 100644 --- a/src/config/model.go +++ b/src/config/model.go @@ -4,7 +4,8 @@ package config type Settings struct { WebUi WebUi `koanf:"webUi"` - Sources Sources `koanf:"sources"` + DaemonsPluginsDir string `koanf:"daemons_plugins_dir"` + Daemons any `koanf:"daemons"` Mounts Mounts `koanf:"mounts"` Log Log `koanf:"log"` @@ -14,11 +15,6 @@ type Settings struct { OtelHttp string `koanf:"otel_http"` } -type Sources struct { - TorrentClient TorrentClient `koanf:"torrent"` - QBittorrent QBittorrent `koanf:"qbittorrent"` -} - type WebUi struct { Port int `koanf:"port"` IP string `koanf:"ip"` @@ -32,11 +28,6 @@ type Log struct { Path string `koanf:"path"` } -type QBittorrent struct { - DataFolder string `koanf:"data_folder,omitempty"` - MetadataFolder string `koanf:"metadata_folder,omitempty"` -} - type TorrentClient struct { // ReadTimeout int `koanf:"read_timeout,omitempty"` // AddTimeout int `koanf:"add_timeout,omitempty"` diff --git a/src/daemon/daemon.go b/src/daemon/daemon.go new file mode 100644 index 0000000..7e06ce4 --- /dev/null +++ b/src/daemon/daemon.go @@ -0,0 +1,17 @@ +package daemon + +import ( + "context" + + "git.kmsign.ru/royalcat/tstor/src/vfs" + "github.com/knadh/koanf/v2" +) + +type DaemonConstructor func(context.Context, *koanf.Koanf) (Daemon, error) + +type Daemon interface { + Name() string + Extensions() []string + GetFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) + Close(ctx context.Context) error +} diff --git a/src/daemon/hostedfs.go b/src/daemon/hostedfs.go new file mode 100644 index 0000000..dc80163 --- /dev/null +++ b/src/daemon/hostedfs.go @@ -0,0 +1,33 @@ +package daemon + +import ( + "git.kmsign.ru/royalcat/tstor/src/vfs" +) + +func NewHostedFS(sourceFS vfs.Filesystem, daemons []Daemon) (vfs.Filesystem, error) { + factories := map[string]vfs.FsFactory{} + + // factories := map[string]vfs.FsFactory{ + // ".torrent": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) { + // tfs, err := tsrv.GetTorrentFS(ctx, sourcePath, f) + // if err != nil { + // return nil, err + // } + // return vfs.NewResolveFS(tfs, vfs.ArchiveFactories), nil + // }, + // ".ts-ytdlp": ytdlpsrv.BuildFS, + // } + + // // add default torrent factory for root filesystem + // for k, v := range vfs.ArchiveFactories { + // factories[k] = v + // } + + for _, daemon := range daemons { + for _, ext := range daemon.Extensions() { + factories[ext] = daemon.GetFS + } + } + + return vfs.NewResolveFS(sourceFS, factories), nil +} diff --git a/src/daemon/plugin.go b/src/daemon/plugin.go new file mode 100644 index 0000000..3769fae --- /dev/null +++ b/src/daemon/plugin.go @@ -0,0 +1,46 @@ +package daemon + +import ( + "context" + "fmt" + "plugin" + + "github.com/knadh/koanf/v2" +) + +const ( + SymDaemonName = "DaemonKey" + SymNewDaemon = "NewDaemonKey" +) + +func LoadFromPlugin(ctx context.Context, path string, rootKoanf *koanf.Koanf) (Daemon, error) { + p, err := plugin.Open(path) + if err != nil { + return nil, fmt.Errorf("error opening plugin: %w", err) + } + symName, err := p.Lookup(SymDaemonName) + if err != nil { + return nil, fmt.Errorf("error looking up DaemonName symbol: %w", err) + } + daemonName, ok := symName.(*string) + if !ok { + return nil, fmt.Errorf("DaemonName symbol is not a string") + } + symNewDaemon, err := p.Lookup(SymNewDaemon) + if err != nil { + return nil, fmt.Errorf("error looking up NewDaemon symbol: %w", err) + } + newDaemon, ok := symNewDaemon.(*DaemonConstructor) + if !ok { + return nil, fmt.Errorf("NewDaemon symbol is not a DaemonConstructor") + } + + name := *daemonName + + daemon, err := (*newDaemon)(ctx, rootKoanf.Cut("daemons."+name)) + if err != nil { + return nil, fmt.Errorf("error creating daemon: %w", err) + } + + return daemon, err +} diff --git a/src/daemons/storage.go b/src/daemons/storage.go deleted file mode 100644 index b67dc7f..0000000 --- a/src/daemons/storage.go +++ /dev/null @@ -1,29 +0,0 @@ -package daemons - -import ( - "context" - - "git.kmsign.ru/royalcat/tstor/daemons/qbittorrent" - "git.kmsign.ru/royalcat/tstor/daemons/ytdlp" - "git.kmsign.ru/royalcat/tstor/src/vfs" -) - -func NewHostedFS(sourceFS vfs.Filesystem, tsrv *qbittorrent.Daemon, ytdlpsrv *ytdlp.Daemon) vfs.Filesystem { - factories := map[string]vfs.FsFactory{ - ".torrent": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) { - tfs, err := tsrv.GetTorrentFS(ctx, sourcePath, f) - if err != nil { - return nil, err - } - return vfs.NewResolveFS(tfs, vfs.ArchiveFactories), nil - }, - ".ts-ytdlp": ytdlpsrv.BuildFS, - } - - // add default torrent factory for root filesystem - for k, v := range vfs.ArchiveFactories { - factories[k] = v - } - - return vfs.NewResolveFS(sourceFS, factories) -} diff --git a/src/delivery/graphql/generated.go b/src/delivery/graphql/generated.go index 6d8296a..fe91a39 100644 --- a/src/delivery/graphql/generated.go +++ b/src/delivery/graphql/generated.go @@ -40,7 +40,6 @@ type Config struct { } type ResolverRoot interface { - ArchiveFS() ArchiveFSResolver Mutation() MutationResolver QBitTorrentDaemonMutation() QBitTorrentDaemonMutationResolver QBitTorrentDaemonQuery() QBitTorrentDaemonQueryResolver @@ -58,12 +57,6 @@ type DirectiveRoot struct { } type ComplexityRoot struct { - ArchiveFS struct { - Entries func(childComplexity int) int - Name func(childComplexity int) int - Size func(childComplexity int) int - } - Mutation struct { DedupeStorage func(childComplexity int) int QbitTorrentDaemon func(childComplexity int) int @@ -129,9 +122,6 @@ type ComplexityRoot struct { } } -type ArchiveFSResolver interface { - Entries(ctx context.Context, obj *model.ArchiveFs) ([]model.FsEntry, error) -} type MutationResolver interface { QbitTorrentDaemon(ctx context.Context) (*model.QBitTorrentDaemonMutation, error) UploadFile(ctx context.Context, dir string, file graphql.Upload) (bool, error) @@ -180,27 +170,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in _ = ec switch typeName + "." + field { - case "ArchiveFS.entries": - if e.complexity.ArchiveFS.Entries == nil { - break - } - - return e.complexity.ArchiveFS.Entries(childComplexity), true - - case "ArchiveFS.name": - if e.complexity.ArchiveFS.Name == nil { - break - } - - return e.complexity.ArchiveFS.Name(childComplexity), true - - case "ArchiveFS.size": - if e.complexity.ArchiveFS.Size == nil { - break - } - - return e.complexity.ArchiveFS.Size(childComplexity), true - case "Mutation.dedupeStorage": if e.complexity.Mutation.DedupeStorage == nil { break @@ -568,11 +537,6 @@ type Schema { {Name: "../../../graphql/subscription.graphql", Input: `type Subscription { taskProgress(taskID: ID!): Progress } - -interface Progress { - current: Int! - total: Int! -} `, BuiltIn: false}, {Name: "../../../graphql/sources/qbittorrent_mutation.graphql", Input: `type QBitTorrentDaemonMutation { cleanup(run: Boolean!): QBitCleanupResponse! @resolver @@ -664,11 +628,16 @@ type ResolverFS implements Dir & FsEntry { entries: [FsEntry!]! @resolver } -type ArchiveFS implements Dir & FsEntry { - name: String! - entries: [FsEntry!]! @resolver +# type ArchiveFS implements Dir & FsEntry { +# name: String! +# entries: [FsEntry!]! @resolver - size: Int! +# size: Int! +# } +`, BuiltIn: false}, + {Name: "../../../graphql/types/progress.graphql", Input: `interface Progress { + current: Int! + total: Int! } `, BuiltIn: false}, } @@ -1001,160 +970,6 @@ func (ec *executionContext) field___Type_fields_argsIncludeDeprecated( // region **************************** field.gotpl ***************************** -func (ec *executionContext) _ArchiveFS_name(ctx context.Context, field graphql.CollectedField, obj *model.ArchiveFs) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_ArchiveFS_name(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.Name, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(string) - fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_ArchiveFS_name(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "ArchiveFS", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type String does not have child fields") - }, - } - return fc, nil -} - -func (ec *executionContext) _ArchiveFS_entries(ctx context.Context, field graphql.CollectedField, obj *model.ArchiveFs) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_ArchiveFS_entries(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - directive0 := func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return ec.resolvers.ArchiveFS().Entries(rctx, obj) - } - - directive1 := func(ctx context.Context) (interface{}, error) { - if ec.directives.Resolver == nil { - var zeroVal []model.FsEntry - return zeroVal, errors.New("directive resolver is not implemented") - } - return ec.directives.Resolver(ctx, obj, directive0) - } - - tmp, err := directive1(rctx) - if err != nil { - return nil, graphql.ErrorOnPath(ctx, err) - } - if tmp == nil { - return nil, nil - } - if data, ok := tmp.([]model.FsEntry); ok { - return data, nil - } - return nil, fmt.Errorf(`unexpected type %T from directive, should be []git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model.FsEntry`, tmp) - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.([]model.FsEntry) - fc.Result = res - return ec.marshalNFsEntry2ᚕgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐFsEntryᚄ(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_ArchiveFS_entries(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "ArchiveFS", - Field: field, - IsMethod: true, - IsResolver: true, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("FieldContext.Child cannot be called on type INTERFACE") - }, - } - return fc, nil -} - -func (ec *executionContext) _ArchiveFS_size(ctx context.Context, field graphql.CollectedField, obj *model.ArchiveFs) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_ArchiveFS_size(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.Size, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(int64) - fc.Result = res - return ec.marshalNInt2int64(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_ArchiveFS_size(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "ArchiveFS", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type Int does not have child fields") - }, - } - return fc, nil -} - func (ec *executionContext) _Mutation_qbitTorrentDaemon(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Mutation_qbitTorrentDaemon(ctx, field) if err != nil { @@ -4944,13 +4759,6 @@ func (ec *executionContext) _Dir(ctx context.Context, sel ast.SelectionSet, obj return graphql.Null } return ec._ResolverFS(ctx, sel, obj) - case model.ArchiveFs: - return ec._ArchiveFS(ctx, sel, &obj) - case *model.ArchiveFs: - if obj == nil { - return graphql.Null - } - return ec._ArchiveFS(ctx, sel, obj) default: panic(fmt.Errorf("unexpected type %T", obj)) } @@ -4997,13 +4805,6 @@ func (ec *executionContext) _FsEntry(ctx context.Context, sel ast.SelectionSet, return graphql.Null } return ec._ResolverFS(ctx, sel, obj) - case model.ArchiveFs: - return ec._ArchiveFS(ctx, sel, &obj) - case *model.ArchiveFs: - if obj == nil { - return graphql.Null - } - return ec._ArchiveFS(ctx, sel, obj) case model.Dir: if obj == nil { return graphql.Null @@ -5032,86 +4833,6 @@ func (ec *executionContext) _Progress(ctx context.Context, sel ast.SelectionSet, // region **************************** object.gotpl **************************** -var archiveFSImplementors = []string{"ArchiveFS", "Dir", "FsEntry"} - -func (ec *executionContext) _ArchiveFS(ctx context.Context, sel ast.SelectionSet, obj *model.ArchiveFs) graphql.Marshaler { - fields := graphql.CollectFields(ec.OperationContext, sel, archiveFSImplementors) - - out := graphql.NewFieldSet(fields) - deferred := make(map[string]*graphql.FieldSet) - for i, field := range fields { - switch field.Name { - case "__typename": - out.Values[i] = graphql.MarshalString("ArchiveFS") - case "name": - out.Values[i] = ec._ArchiveFS_name(ctx, field, obj) - if out.Values[i] == graphql.Null { - atomic.AddUint32(&out.Invalids, 1) - } - case "entries": - field := field - - innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - } - }() - res = ec._ArchiveFS_entries(ctx, field, obj) - if res == graphql.Null { - atomic.AddUint32(&fs.Invalids, 1) - } - return res - } - - if field.Deferrable != nil { - dfs, ok := deferred[field.Deferrable.Label] - di := 0 - if ok { - dfs.AddField(field) - di = len(dfs.Values) - 1 - } else { - dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) - deferred[field.Deferrable.Label] = dfs - } - dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { - return innerFunc(ctx, dfs) - }) - - // don't run the out.Concurrently() call below - out.Values[i] = graphql.Null - continue - } - - out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) - case "size": - out.Values[i] = ec._ArchiveFS_size(ctx, field, obj) - if out.Values[i] == graphql.Null { - atomic.AddUint32(&out.Invalids, 1) - } - default: - panic("unknown field " + strconv.Quote(field.Name)) - } - } - out.Dispatch(ctx) - if out.Invalids > 0 { - return graphql.Null - } - - atomic.AddInt32(&ec.deferred, int32(len(deferred))) - - for label, dfs := range deferred { - ec.processDeferredGroup(graphql.DeferredGroup{ - Label: label, - Path: graphql.GetPath(ctx), - FieldSet: dfs, - Context: ctx, - }) - } - - return out -} - var mutationImplementors = []string{"Mutation"} func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { diff --git a/src/delivery/graphql/model/entry.go b/src/delivery/graphql/model/entry.go index b4686f8..4e7bb68 100644 --- a/src/delivery/graphql/model/entry.go +++ b/src/delivery/graphql/model/entry.go @@ -13,19 +13,20 @@ type FsElem interface { func FillFsEntry(ctx context.Context, e FsElem, fs vfs.Filesystem, path string) (FsEntry, error) { switch e.(type) { - case *vfs.ArchiveFS: - e := e.(*vfs.ArchiveFS) - return ArchiveFs{ - Name: e.Name(), - Size: e.Size(), - FS: e, - }, nil case *vfs.ResolverFS: e := e.(*vfs.ResolverFS) return ResolverFs{ Name: e.Name(), FS: e, }, nil + + // case *vfs.ArchiveFS: + // e := e.(*vfs.ArchiveFS) + // return ArchiveFs{ + // Name: e.Name(), + // Size: e.Size(), + // FS: e, + // }, nil // case *torrent.TorrentFS: // e := e.(*torrent.TorrentFS) // torrent, err := MapTorrent(ctx, e.Torrent) diff --git a/src/delivery/graphql/model/mappers.go b/src/delivery/graphql/model/mappers.go index 1770772..42d98f8 100644 --- a/src/delivery/graphql/model/mappers.go +++ b/src/delivery/graphql/model/mappers.go @@ -7,51 +7,3 @@ func Apply[I any, O any](in []I, f func(I) O) []O { } return out } - -// func MapPeerSource(source atorrent.PeerSource) string { -// switch source { -// case atorrent.PeerSourceDirect: -// return "Direct" -// case atorrent.PeerSourceUtHolepunch: -// return "Ut Holepunch" -// case atorrent.PeerSourceDhtAnnouncePeer: -// return "DHT Announce" -// case atorrent.PeerSourceDhtGetPeers: -// return "DHT" -// case atorrent.PeerSourceIncoming: -// return "Incoming" -// case atorrent.PeerSourceTracker: -// return "Tracker" -// case atorrent.PeerSourcePex: -// return "PEX" -// default: -// return "Unknown" -// } -// } - -// func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) { -// prio, err := t.Priority(ctx) -// if err != nil { -// return nil, err -// } - -// return &Torrent{ -// Infohash: t.InfoHash(), -// Name: t.Name(), -// BytesCompleted: t.BytesCompleted(), -// BytesMissing: t.BytesMissing(), -// Priority: prio, -// T: t, -// }, nil -// } - -// func MapTorrentStats(s torrent.TorrentStats) *TorrentStats { -// return &TorrentStats{ -// Timestamp: s.Timestamp, -// DownloadedBytes: uint(s.DownloadedBytes), -// UploadedBytes: uint(s.UploadedBytes), -// TotalPeers: uint(s.TotalPeers), -// ActivePeers: uint(s.ActivePeers), -// ConnectedSeeders: uint(s.ConnectedSeeders), -// } -// } diff --git a/src/delivery/graphql/model/models_gen.go b/src/delivery/graphql/model/models_gen.go index a4c9b32..7118904 100644 --- a/src/delivery/graphql/model/models_gen.go +++ b/src/delivery/graphql/model/models_gen.go @@ -33,28 +33,6 @@ type Progress interface { GetTotal() int64 } -type ArchiveFs struct { - Name string `json:"name"` - Entries []FsEntry `json:"entries"` - Size int64 `json:"size"` - FS *vfs.ArchiveFS `json:"-"` -} - -func (ArchiveFs) IsDir() {} -func (this ArchiveFs) GetName() string { return this.Name } -func (this ArchiveFs) GetEntries() []FsEntry { - if this.Entries == nil { - return nil - } - interfaceSlice := make([]FsEntry, 0, len(this.Entries)) - for _, concrete := range this.Entries { - interfaceSlice = append(interfaceSlice, concrete) - } - return interfaceSlice -} - -func (ArchiveFs) IsFsEntry() {} - type BooleanFilter struct { Eq *bool `json:"eq,omitempty"` } diff --git a/src/delivery/graphql/resolver/fs.resolvers.go b/src/delivery/graphql/resolver/fs.resolvers.go index d00625c..106f2c2 100644 --- a/src/delivery/graphql/resolver/fs.resolvers.go +++ b/src/delivery/graphql/resolver/fs.resolvers.go @@ -11,23 +11,6 @@ import ( "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model" ) -// Entries is the resolver for the entries field. -func (r *archiveFSResolver) Entries(ctx context.Context, obj *model.ArchiveFs) ([]model.FsEntry, error) { - entries, err := obj.FS.ReadDir(ctx, ".") - if err != nil { - return nil, err - } - out := []model.FsEntry{} - for _, e := range entries { - entry, err := model.FillFsEntry(ctx, e, obj.FS, ".") - if err != nil { - return nil, err - } - out = append(out, entry) - } - return out, nil -} - // Entries is the resolver for the entries field. func (r *resolverFSResolver) Entries(ctx context.Context, obj *model.ResolverFs) ([]model.FsEntry, error) { entries, err := obj.FS.ReadDir(ctx, ".") @@ -62,15 +45,11 @@ func (r *simpleDirResolver) Entries(ctx context.Context, obj *model.SimpleDir) ( return out, nil } -// ArchiveFS returns graph.ArchiveFSResolver implementation. -func (r *Resolver) ArchiveFS() graph.ArchiveFSResolver { return &archiveFSResolver{r} } - // ResolverFS returns graph.ResolverFSResolver implementation. func (r *Resolver) ResolverFS() graph.ResolverFSResolver { return &resolverFSResolver{r} } // SimpleDir returns graph.SimpleDirResolver implementation. func (r *Resolver) SimpleDir() graph.SimpleDirResolver { return &simpleDirResolver{r} } -type archiveFSResolver struct{ *Resolver } type resolverFSResolver struct{ *Resolver } type simpleDirResolver struct{ *Resolver } diff --git a/src/delivery/http.go b/src/delivery/http.go index 6915c9d..2ee3adc 100644 --- a/src/delivery/http.go +++ b/src/delivery/http.go @@ -8,6 +8,7 @@ import ( "git.kmsign.ru/royalcat/tstor/daemons/qbittorrent" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" + "git.kmsign.ru/royalcat/tstor/src/daemon" "git.kmsign.ru/royalcat/tstor/src/vfs" echopprof "github.com/labstack/echo-contrib/pprof" "github.com/labstack/echo/v4" @@ -15,7 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -func Run(qbitdaemon *qbittorrent.Daemon, vfs vfs.Filesystem, cfg *config.Settings) error { +func Run(cfg config.WebUi, vfs vfs.Filesystem, daemons []daemon.Daemon) error { log := slog.With() r := echo.New() @@ -26,14 +27,21 @@ func Run(qbitdaemon *qbittorrent.Daemon, vfs vfs.Filesystem, cfg *config.Setting // Logger(), ) + var qbitdaemon *qbittorrent.Daemon + for _, dm := range daemons { + if dm.Name() == qbittorrent.DaemonName { + qbitdaemon = dm.(*qbittorrent.Daemon) + } + } + echopprof.Register(r) r.Any("/graphql", echo.WrapHandler((GraphQLHandler(qbitdaemon, vfs)))) r.GET("/metrics", echo.WrapHandler(promhttp.Handler())) - log.Info("starting webserver", "host", fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port)) + log.Info("starting webserver", "host", fmt.Sprintf("%s:%d", cfg.IP, cfg.Port)) - return r.Start((fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port))) + return r.Start((fmt.Sprintf("%s:%d", cfg.IP, cfg.Port))) } func Logger() echo.MiddlewareFunc { diff --git a/src/vfs/archive.go b/src/vfs/archive.go deleted file mode 100644 index 56e49b5..0000000 --- a/src/vfs/archive.go +++ /dev/null @@ -1,440 +0,0 @@ -package vfs - -import ( - "archive/zip" - "context" - "fmt" - "io" - "io/fs" - "path" - "strings" - "sync" - "time" - - "git.kmsign.ru/royalcat/tstor/pkg/ioutils" - "github.com/bodgit/sevenzip" - "github.com/nwaples/rardecode/v2" - "github.com/royalcat/ctxio" -) - -var ArchiveFactories = map[string]FsFactory{ - ".zip": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) { - stat, err := f.Info() - if err != nil { - return nil, err - } - return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), ZipLoader) - }, - ".rar": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) { - stat, err := f.Info() - if err != nil { - return nil, err - } - return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), RarLoader) - }, - ".7z": func(ctx context.Context, sourcePath string, f File) (Filesystem, error) { - stat, err := f.Info() - if err != nil { - return nil, err - } - return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), SevenZipLoader) - }, -} - -type archiveLoader func(ctx context.Context, archivePath string, r File, size int64) (map[string]fileEntry, error) - -var _ Filesystem = &ArchiveFS{} - -type fileEntry struct { - fs.FileInfo - open func(ctx context.Context) (File, error) -} - -type ArchiveFS struct { - name string - size int64 - files map[string]fileEntry -} - -// Rename implements Filesystem. -func (a *ArchiveFS) Rename(ctx context.Context, oldpath string, newpath string) error { - return ErrNotImplemented -} - -// ModTime implements Filesystem. -func (a *ArchiveFS) ModTime() time.Time { - return time.Time{} -} - -// Mode implements Filesystem. -func (a *ArchiveFS) Mode() fs.FileMode { - return fs.ModeDir -} - -// Size implements Filesystem. -func (a *ArchiveFS) Size() int64 { - return int64(a.size) -} - -// Sys implements Filesystem. -func (a *ArchiveFS) Sys() any { - return nil -} - -// FsName implements Filesystem. -func (a *ArchiveFS) FsName() string { - return "archivefs" -} - -func NewArchive(ctx context.Context, archivePath, name string, f File, size int64, loader archiveLoader) (*ArchiveFS, error) { - archiveFiles, err := loader(ctx, archivePath, f, size) - if err != nil { - return nil, err - } - - // TODO make optional - singleDir := true - for k := range archiveFiles { - if !strings.HasPrefix(k, "/"+name+"/") { - singleDir = false - break - } - } - - files := make(map[string]fileEntry, len(archiveFiles)) - for k, v := range archiveFiles { - // TODO make optional - if strings.Contains(k, "/__MACOSX/") { - continue - } - - if singleDir { - k, _ = strings.CutPrefix(k, "/"+name) - } - - files[k] = v - } - - // FIXME configurable - files["/.forcegallery"] = fileEntry{ - FileInfo: NewFileInfo("/.forcegallery", 0, time.Time{}), - open: func(ctx context.Context) (File, error) { - return NewMemoryFile(".forcegallery", []byte{}), nil - }, - } - - return &ArchiveFS{ - name: name, - size: size, - files: files, - }, nil -} - -// Unlink implements Filesystem. -func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error { - return ErrNotImplemented -} - -func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) { - if filename == Separator { - return NewDirFile(filename), nil - } - - f, ok := a.files[filename] - if ok { - return f.open(ctx) - } - - for p := range a.files { - if strings.HasPrefix(p, filename) { - return NewDirFile(filename), nil - } - } - - return nil, ErrNotExist -} - -func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { - infos := make(map[string]fs.FileInfo, len(a.files)) - for k, v := range a.files { - infos[k] = v - } - - return ListDirFromInfo(infos, path) -} - -// Stat implements Filesystem. -func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { - if entry, ok := afs.files[filename]; ok { - return entry, nil - } - - for p, _ := range afs.files { - if strings.HasPrefix(p, filename) { - return NewDirInfo(path.Base(filename), time.Time{}), nil - } - } - - return nil, ErrNotExist -} - -// Info implements Filesystem. -func (a *ArchiveFS) Info() (fs.FileInfo, error) { - return a, nil -} - -// IsDir implements Filesystem. -func (a *ArchiveFS) IsDir() bool { - return true -} - -// Name implements Filesystem. -func (a *ArchiveFS) Name() string { - return a.name -} - -// Type implements Filesystem. -func (a *ArchiveFS) Type() fs.FileMode { - return fs.ModeDir -} - -var _ File = (*archiveFile)(nil) - -func newArchiveFile(name string, size int64, rr *randomReaderFromLinear) *archiveFile { - return &archiveFile{ - name: name, - size: size, - rr: rr, - } -} - -type archiveFile struct { - name string - size int64 - - m sync.Mutex - offset int64 - - rr *randomReaderFromLinear -} - -// Seek implements File. -func (d *archiveFile) Seek(offset int64, whence int) (int64, error) { - switch whence { - case io.SeekStart: - d.offset = offset - - case io.SeekCurrent: - d.offset += offset - case io.SeekEnd: - d.offset = d.size + offset - } - return d.offset, nil -} - -// Name implements File. -func (d *archiveFile) Name() string { - return d.name -} - -// Type implements File. -func (d *archiveFile) Type() fs.FileMode { - return ModeFileRO -} - -func (d *archiveFile) Info() (fs.FileInfo, error) { - return NewFileInfo(d.name, d.size, time.Time{}), nil -} - -func (d *archiveFile) Size() int64 { - return d.size -} - -func (d *archiveFile) IsDir() bool { - return false -} - -func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) { - ctx, span := tracer.Start(ctx, "archive.File.Read") - defer span.End() - - n, err = d.rr.ReadAt(ctx, p, d.offset) - d.offset += int64(n) - return n, err -} - -func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { - d.m.Lock() - defer d.m.Unlock() - - return d.rr.ReadAt(ctx, p, off) -} - -func (d *archiveFile) Close(ctx context.Context) error { - // FIXME close should do nothing as archive fs currently reuse the same file instances - return nil -} - -type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error) - -var _ archiveLoader = ZipLoader - -func ZipLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) { - hash, err := FileHash(ctx, f) - if err != nil { - return nil, err - } - - reader := ctxio.IoReaderAt(ctx, f) - zr, err := zip.NewReader(reader, size) - if err != nil { - return nil, err - } - - out := make(map[string]fileEntry) - for i := range zr.File { - zipFile := zr.File[i] - if zipFile.FileInfo().IsDir() { - continue - } - - i := i - af := func(ctx context.Context) (ctxio.ReadCloser, error) { - reader := ctxio.IoReaderAt(ctx, f) - - zr, err := zip.NewReader(reader, size) - if err != nil { - return nil, fmt.Errorf("failed to create zip reader: %w", err) - } - - rc, err := zr.File[i].Open() - if err != nil { - return nil, fmt.Errorf("failed to open file in zip archive: %w", err) - } - - return ctxio.WrapIoReadCloser(rc), nil - } - - info := zipFile.FileInfo() - - rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af) - - out[AbsPath(zipFile.Name)] = fileEntry{ - FileInfo: info, - open: func(ctx context.Context) (File, error) { - return newArchiveFile(info.Name(), info.Size(), rr), nil - }, - } - } - - return out, nil -} - -var _ archiveLoader = SevenZipLoader - -func SevenZipLoader(ctx context.Context, archivePath string, ctxreader File, size int64) (map[string]fileEntry, error) { - hash, err := FileHash(ctx, ctxreader) - if err != nil { - return nil, err - } - - reader := ctxio.IoReaderAt(ctx, ctxreader) - r, err := sevenzip.NewReader(reader, size) - if err != nil { - return nil, err - } - - out := make(map[string]fileEntry) - for i, f := range r.File { - f := f - if f.FileInfo().IsDir() { - continue - } - - i := i - af := func(ctx context.Context) (ctxio.ReadCloser, error) { - reader := ctxio.IoReaderAt(ctx, ctxreader) - zr, err := sevenzip.NewReader(reader, size) - if err != nil { - return nil, err - } - - rc, err := zr.File[i].Open() - if err != nil { - return nil, err - } - - return ctxio.WrapIoReadCloser(rc), nil - } - - info := f.FileInfo() - - rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af) - - out[AbsPath(f.Name)] = fileEntry{ - FileInfo: f.FileInfo(), - open: func(ctx context.Context) (File, error) { - return newArchiveFile(info.Name(), info.Size(), rr), nil - }, - } - } - - return out, nil -} - -var _ archiveLoader = RarLoader - -func RarLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) { - hash, err := FileHash(ctx, f) - if err != nil { - return nil, err - } - - reader := ioutils.WrapIoReadSeeker(ctx, f, size) - - r, err := rardecode.NewReader(reader) - if err != nil { - return nil, err - } - - out := make(map[string]fileEntry) - for { - header, err := r.Next() - if err == io.EOF { - break - } - if err != nil { - return nil, err - } - - name := header.Name - af := func(ctx context.Context) (ctxio.ReadCloser, error) { - reader := ioutils.WrapIoReadSeeker(ctx, f, size) - r, err := rardecode.NewReader(reader) - if err != nil { - return nil, err - } - - for header, err := r.Next(); err != io.EOF; header, err = r.Next() { - if err != nil { - return nil, err - } - if header.Name == name { - return ctxio.NopCloser(ctxio.WrapIoReader(r)), nil - } - } - return nil, fmt.Errorf("file with name '%s' not found", name) - } - - rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af) - - out[AbsPath(header.Name)] = fileEntry{ - FileInfo: NewFileInfo(header.Name, header.UnPackedSize, header.ModificationTime), - open: func(ctx context.Context) (File, error) { - return newArchiveFile(header.Name, header.UnPackedSize, rr), nil - }, - } - } - - return out, nil -} diff --git a/src/vfs/resolver_test.go b/src/vfs/resolver_test.go index 5b9c141..5604a87 100644 --- a/src/vfs/resolver_test.go +++ b/src/vfs/resolver_test.go @@ -1,106 +1,103 @@ package vfs_test -import ( - "archive/zip" - "bytes" - "context" - "testing" +// func createZip(files map[string][]byte) ([]byte, error) { +// buf := bytes.NewBuffer(nil) +// zw := zip.NewWriter(buf) - "git.kmsign.ru/royalcat/tstor/src/vfs" - "github.com/stretchr/testify/require" -) +// for name, data := range files { +// fw, err := zw.Create(name) +// if err != nil { +// return nil, err +// } -func createZip(files map[string][]byte) ([]byte, error) { - buf := bytes.NewBuffer(nil) - zw := zip.NewWriter(buf) +// _, err = fw.Write(data) +// if err != nil { +// return nil, err +// } +// } +// err := zw.Flush() +// if err != nil { +// return nil, err +// } - for name, data := range files { - fw, err := zw.Create(name) - if err != nil { - return nil, err - } +// err = zw.Close() +// if err != nil { +// return nil, err +// } - _, err = fw.Write(data) - if err != nil { - return nil, err - } - } - err := zw.Flush() - if err != nil { - return nil, err - } +// return buf.Bytes(), nil +// } - err = zw.Close() - if err != nil { - return nil, err - } +// FIXME +// func TestResolverFs(t *testing.T) { +// t.Parallel() +// ctx := context.Background() - return buf.Bytes(), nil -} +// testZip, err := createZip(map[string][]byte{ +// "123.txt": []byte("123"), +// "files/321.txt": []byte("321"), +// }) +// require.NoError(t, err) -func TestResolverFs(t *testing.T) { - t.Parallel() - ctx := context.Background() +// fs := vfs.NewResolveFS( +// vfs.NewMemoryFS( +// "/", +// map[string]*vfs.MemoryFile{ +// "/data/123.zip": vfs.NewMemoryFile("123.zip", testZip), +// }, +// ), +// map[string]vfs.FsFactory{}, +// ) - testZip, err := createZip(map[string][]byte{ - "123.txt": []byte("123"), - "files/321.txt": []byte("321"), - }) - require.NoError(t, err) +// t.Run("dir", func(t *testing.T) { +// t.Parallel() +// require := require.New(t) - fs := vfs.NewResolveFS(vfs.NewMemoryFS("/", map[string]*vfs.MemoryFile{ - "/data/123.zip": vfs.NewMemoryFile("123.zip", testZip), - }), vfs.ArchiveFactories) +// dirs := []string{ +// "/data", "/", "/.", +// "/data/123.zip", "/data/123.zip/files", "/data/123.zip/files/.", +// } - t.Run("dir", func(t *testing.T) { - t.Parallel() - require := require.New(t) +// for _, dir := range dirs { +// file, err := fs.Open(ctx, dir) +// require.NoError(err) +// require.True(file.IsDir()) - dirs := []string{ - "/data", "/", "/.", - "/data/123.zip", "/data/123.zip/files", "/data/123.zip/files/.", - } +// stat, err := file.Info() +// require.NoError(err) +// require.True(stat.IsDir()) +// } - for _, dir := range dirs { - file, err := fs.Open(ctx, dir) - require.NoError(err) - require.True(file.IsDir()) +// entries, err := fs.ReadDir(ctx, "/data") +// require.NoError(err) +// require.Len(entries, 1) - stat, err := file.Info() - require.NoError(err) - require.True(stat.IsDir()) - } +// for _, e := range entries { +// switch e.Name() { +// case "123.zip": +// require.True(e.IsDir()) +// require.IsType(&vfs.MemoryFs{}, e) +// } +// } - entries, err := fs.ReadDir(ctx, "/data") - require.NoError(err) - require.Len(entries, 1) +// entries, err = fs.ReadDir(ctx, "/data/123.zip/files") +// require.NoError(err) +// require.Len(entries, 1) - for _, e := range entries { - switch e.Name() { - case "123.zip": - require.True(e.IsDir()) - require.IsType(&vfs.ArchiveFS{}, e) - } - } +// entries, err = fs.ReadDir(ctx, "/data/123.zip") +// require.NoError(err) +// require.Len(entries, 3) - entries, err = fs.ReadDir(ctx, "/data/123.zip/files") - require.NoError(err) - require.Len(entries, 1) - - entries, err = fs.ReadDir(ctx, "/data/123.zip") - require.NoError(err) - require.Len(entries, 3) - - for _, e := range entries { - switch e.Name() { - case "files": - require.True(e.IsDir()) - case "123.txt": - require.False(e.IsDir()) - } - } - }) -} +// for _, e := range entries { +// switch e.Name() { +// case "files": +// require.True(e.IsDir()) +// case "123.txt": +// require.False(e.IsDir()) +// } +// } +// }) +// } // func TestResolver(t *testing.T) { // t.Parallel() diff --git a/ui/lib/api/schema.graphql b/ui/lib/api/schema.graphql index 612c6f3..4036f3f 100644 --- a/ui/lib/api/schema.graphql +++ b/ui/lib/api/schema.graphql @@ -1,11 +1,6 @@ directive @oneOf on INPUT_OBJECT | FIELD_DEFINITION directive @resolver on INPUT_FIELD_DEFINITION | FIELD_DEFINITION directive @stream on FIELD_DEFINITION -type ArchiveFS implements Dir & FsEntry { - name: String! - entries: [FsEntry!]! @resolver - size: Int! -} input BooleanFilter @oneOf { eq: Boolean }