From 99cdd5471ea64b16481de9a47d8bd82dce92712e Mon Sep 17 00:00:00 2001 From: royalcat Date: Sat, 18 May 2024 10:24:14 +0300 Subject: [PATCH] chore: Refactor code to use SourceUpdater struct for managing sources --- cmd/tstor/main.go | 96 ++---------- pkg/ytdlp/client.go | 9 ++ pkg/ytdlp/model.go | 70 --------- pkg/ytdlp/playlist.go | 144 +++++++++-------- .../graphql/resolver/mutation.resolvers.go | 6 +- src/delivery/graphql/resolver/resolver.go | 6 +- src/host/controller/sourceddir.go | 4 + src/host/service/queue.go | 2 +- src/host/service/service.go | 100 +++++++----- src/host/storage.go | 4 +- src/host/store/file-mappings.go | 4 + src/host/vfs/ctxbillyfs.go | 146 ++++++++++++++++++ 12 files changed, 328 insertions(+), 263 deletions(-) create mode 100644 pkg/ytdlp/client.go create mode 100644 src/host/vfs/ctxbillyfs.go diff --git a/cmd/tstor/main.go b/cmd/tstor/main.go index 6c80826..c72c564 100644 --- a/cmd/tstor/main.go +++ b/cmd/tstor/main.go @@ -12,18 +12,17 @@ import ( "os/signal" "path/filepath" "syscall" - "time" + "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" wnfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/delivery" "git.kmsign.ru/royalcat/tstor/src/host" - "git.kmsign.ru/royalcat/tstor/src/host/datastorage" "git.kmsign.ru/royalcat/tstor/src/host/service" - "git.kmsign.ru/royalcat/tstor/src/host/store" "git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/telemetry" + "github.com/go-git/go-billy/v5/osfs" "github.com/urfave/cli/v2" _ "git.kmsign.ru/royalcat/tstor/pkg/rlog" @@ -86,93 +85,22 @@ func run(configPath string) error { log.Error(ctx, "set priority failed", rlog.Error(err)) } - if err := os.MkdirAll(conf.TorrentClient.MetadataFolder, 0744); err != nil { - return fmt.Errorf("error creating metadata folder: %w", err) + if err := os.MkdirAll(conf.SourceDir, 0744); err != nil { + return fmt.Errorf("error creating data folder: %w", err) } - fis, err := store.NewFileItemStore(filepath.Join(conf.TorrentClient.MetadataFolder, "items"), 2*time.Hour) - if err != nil { - return fmt.Errorf("error starting item store: %w", err) - } - defer fis.Close() - - id, err := store.GetOrCreatePeerID(filepath.Join(conf.TorrentClient.MetadataFolder, "ID")) - if err != nil { - return fmt.Errorf("error creating node ID: %w", err) - } - - st, _, err := datastorage.Setup(conf.TorrentClient) - if err != nil { - return err - } - defer st.Close() - - excludedFilesStore, err := store.NewFileMappings(conf.TorrentClient.MetadataFolder, st) - if err != nil { - return err - } - - infoBytesStore, err := store.NewInfoBytes(conf.TorrentClient.MetadataFolder) - if err != nil { - return err - } - - c, err := store.NewClient(st, fis, &conf.TorrentClient, id) - if err != nil { - return fmt.Errorf("error starting torrent client: %w", err) - } - c.AddDhtNodes(conf.TorrentClient.DHTNodes) - defer c.Close() - - ts, err := service.NewService( - conf.SourceDir, conf.TorrentClient, - c, st, excludedFilesStore, infoBytesStore, - ) + sourceFs := osfs.New(conf.SourceDir, osfs.WithBoundOS()) + srv, err := service.New(sourceFs, conf.TorrentClient) if err != nil { return fmt.Errorf("error creating service: %w", err) } - if err := os.MkdirAll(conf.SourceDir, 0744); err != nil { - return fmt.Errorf("error creating data folder: %w", err) - } - sfs := host.NewTorrentStorage(conf.SourceDir, ts) + sfs := host.NewTorrentStorage( + vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(sourceFs)), + srv, + ) sfs = vfs.WrapLogFS(sfs) - // TODO make separate function - // { - // if st, ok := st.(storage.FileStorageDeleter); ok { - // log.Info().Msg("listing files") - // files, err := listFilesRecursive(conf.SourceDir) - // if err != nil { - // return fmt.Errorf("error listing files: %w", err) - // } - - // torrentFiles := []string{} - // for _, v := range files { - // if strings.HasSuffix(v, ".torrent") { - // torrentFiles = append(torrentFiles, v) - // } - // } - - // log.Info().Int("count", len(torrentFiles)).Msg("loading torrent files") - // torrentList := []*torrent.Torrent{} - // for _, tf := range torrentFiles { - // t, err := c.AddTorrentFromFile(tf) - // if err != nil { - // return err - // } - // <-t.GotInfo() - // torrentList = append(torrentList, t) - // } - // log.Info().Msg("staring cleanup") - // err = st.Cleanup(torrentList) - // if err != nil { - // return fmt.Errorf("cleanup error: %w", err) - // } - // } - - // } - if conf.Mounts.Fuse.Enabled { mh := fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path) err := mh.Mount(sfs) @@ -246,7 +174,7 @@ func run(configPath string) error { go func() { logFilename := filepath.Join(conf.Log.Path, "logs") - err := delivery.New(nil, service.NewStats(), ts, sfs, logFilename, conf) + err := delivery.New(nil, service.NewStats(), srv, sfs, logFilename, conf) if err != nil { log.Error(ctx, "error initializing HTTP server", rlog.Error(err)) } @@ -256,5 +184,5 @@ func run(configPath string) error { signal.Notify(sigChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigChan - return ts.Close() + return srv.Close(ctx) } diff --git a/pkg/ytdlp/client.go b/pkg/ytdlp/client.go new file mode 100644 index 0000000..4c16368 --- /dev/null +++ b/pkg/ytdlp/client.go @@ -0,0 +1,9 @@ +package ytdlp + +type Client struct { + binary string +} + +func New() (*Client, error) { + return &Client{binary: "yt-dlp"}, nil +} diff --git a/pkg/ytdlp/model.go b/pkg/ytdlp/model.go index 7056312..2d95823 100644 --- a/pkg/ytdlp/model.go +++ b/pkg/ytdlp/model.go @@ -1,75 +1,5 @@ package ytdlp -type PlaylistEntry struct { - ID string `json:"id"` - Uploader string `json:"uploader"` - UploaderID string `json:"uploader_id"` - UploadDate string `json:"upload_date"` - Title string `json:"title"` - Thumbnail string `json:"thumbnail"` - Duration int64 `json:"duration"` - LikeCount int64 `json:"like_count"` - DislikeCount int64 `json:"dislike_count"` - CommentCount int64 `json:"comment_count"` - Formats []Format `json:"formats"` - AgeLimit int64 `json:"age_limit"` - Tags []string `json:"tags"` - Categories []string `json:"categories"` - Cast []any `json:"cast"` - Subtitles Subtitles `json:"subtitles"` - Thumbnails []Thumbnail `json:"thumbnails"` - Timestamp int64 `json:"timestamp"` - ViewCount int64 `json:"view_count"` - WebpageURL string `json:"webpage_url"` - OriginalURL string `json:"original_url"` - WebpageURLBasename string `json:"webpage_url_basename"` - WebpageURLDomain string `json:"webpage_url_domain"` - Extractor string `json:"extractor"` - ExtractorKey string `json:"extractor_key"` - PlaylistCount int64 `json:"playlist_count"` - Playlist string `json:"playlist"` - PlaylistID string `json:"playlist_id"` - PlaylistTitle string `json:"playlist_title"` - PlaylistUploader string `json:"playlist_uploader"` - PlaylistUploaderID string `json:"playlist_uploader_id"` - NEntries int64 `json:"n_entries"` - PlaylistIndex int64 `json:"playlist_index"` - PlaylistAutonumber int64 `json:"playlist_autonumber"` - DisplayID string `json:"display_id"` - Fulltitle string `json:"fulltitle"` - DurationString string `json:"duration_string"` - ReleaseYear int `json:"release_year"` - Epoch int64 `json:"epoch"` - FormatID string `json:"format_id"` - URL string `json:"url"` - ManifestURL string `json:"manifest_url"` - Tbr float64 `json:"tbr"` - EXT EXT `json:"ext"` - FPS float64 `json:"fps"` - Protocol Protocol `json:"protocol"` - VideoHasDRM bool `json:"has_drm"` - Width int64 `json:"width"` - Height int64 `json:"height"` - Vcodec string `json:"vcodec"` - Acodec string `json:"acodec"` - DynamicRange DynamicRange `json:"dynamic_range"` - Resolution string `json:"resolution"` - AspectRatio float64 `json:"aspect_ratio"` - HTTPHeaders HTTPHeaders `json:"http_headers"` - VideoEXT EXT `json:"video_ext"` - AudioEXT AudioEXT `json:"audio_ext"` - Format string `json:"format"` - Filename string `json:"_filename"` - VideoFilename string `json:"filename"` - Type string `json:"_type"` - Version Version `json:"_version"` -} - -// Progress implements ctxprogress.Progress. -func (p PlaylistEntry) Progress() (current int, total int) { - return int(p.PlaylistIndex), int(p.PlaylistCount) -} - type Format struct { URL string `json:"url"` FormatID string `json:"format_id"` diff --git a/pkg/ytdlp/playlist.go b/pkg/ytdlp/playlist.go index 40e091b..6f50b33 100644 --- a/pkg/ytdlp/playlist.go +++ b/pkg/ytdlp/playlist.go @@ -14,12 +14,88 @@ import ( "golang.org/x/sync/errgroup" ) -type Client struct { - binary string +type PlaylistEntry struct { + ID string `json:"id"` + Uploader string `json:"uploader"` + UploaderID string `json:"uploader_id"` + UploadDate string `json:"upload_date"` + Title string `json:"title"` + Thumbnail string `json:"thumbnail"` + Duration int64 `json:"duration"` + LikeCount int64 `json:"like_count"` + DislikeCount int64 `json:"dislike_count"` + CommentCount int64 `json:"comment_count"` + Formats []Format `json:"formats"` + AgeLimit int64 `json:"age_limit"` + Tags []string `json:"tags"` + Categories []string `json:"categories"` + Cast []any `json:"cast"` + Subtitles Subtitles `json:"subtitles"` + Thumbnails []Thumbnail `json:"thumbnails"` + Timestamp int64 `json:"timestamp"` + ViewCount int64 `json:"view_count"` + WebpageURL string `json:"webpage_url"` + OriginalURL string `json:"original_url"` + WebpageURLBasename string `json:"webpage_url_basename"` + WebpageURLDomain string `json:"webpage_url_domain"` + Extractor string `json:"extractor"` + ExtractorKey string `json:"extractor_key"` + PlaylistCount int64 `json:"playlist_count"` + Playlist string `json:"playlist"` + PlaylistID string `json:"playlist_id"` + PlaylistTitle string `json:"playlist_title"` + PlaylistUploader string `json:"playlist_uploader"` + PlaylistUploaderID string `json:"playlist_uploader_id"` + NEntries int64 `json:"n_entries"` + PlaylistIndex int64 `json:"playlist_index"` + PlaylistAutonumber int64 `json:"playlist_autonumber"` + DisplayID string `json:"display_id"` + Fulltitle string `json:"fulltitle"` + DurationString string `json:"duration_string"` + ReleaseYear int `json:"release_year"` + Epoch int64 `json:"epoch"` + FormatID string `json:"format_id"` + URL string `json:"url"` + ManifestURL string `json:"manifest_url"` + Tbr float64 `json:"tbr"` + EXT EXT `json:"ext"` + FPS float64 `json:"fps"` + Protocol Protocol `json:"protocol"` + VideoHasDRM bool `json:"has_drm"` + Width int64 `json:"width"` + Height int64 `json:"height"` + Vcodec string `json:"vcodec"` + Acodec string `json:"acodec"` + DynamicRange DynamicRange `json:"dynamic_range"` + Resolution string `json:"resolution"` + AspectRatio float64 `json:"aspect_ratio"` + HTTPHeaders HTTPHeaders `json:"http_headers"` + VideoEXT EXT `json:"video_ext"` + AudioEXT AudioEXT `json:"audio_ext"` + Format string `json:"format"` + Filename string `json:"_filename"` + VideoFilename string `json:"filename"` + Type string `json:"_type"` + Version Version `json:"_version"` } -func New() (*Client, error) { - return &Client{binary: "yt-dlp"}, nil +// Progress implements ctxprogress.Progress. +func (p PlaylistEntry) Progress() (current int, total int) { + return int(p.PlaylistIndex), int(p.PlaylistCount) +} + +func (p PlaylistEntry) Url() string { + if p.URL != "" { + return p.URL + } + if p.WebpageURL != "" { + return p.WebpageURL + } + if p.OriginalURL != "" { + return p.OriginalURL + } + + return "" } func (yt *Client) Playlist(ctx context.Context, url string) ([]PlaylistEntry, error) { @@ -55,66 +131,6 @@ func (yt *Client) Playlist(ctx context.Context, url string) ([]PlaylistEntry, er return playlists, nil } -// func DownloadPlaylist(ctx context.Context, url string, dir string) error { -// args := []string{ -// "--no-simulate", "-j", -// "--progress", "--newline", "--progress-template", progressTemplate, -// "-o", path.Join(dir, "%(title)s.%(ext)s"), -// url, -// } - -// group, ctx := errgroup.WithContext(ctx) - -// pr, w := io.Pipe() -// cmd := exec.CommandContext(ctx, "yt-dlp", args...) -// cmd.Stdout = w - -// r := io.TeeReader(pr, os.Stdout) - -// group.Go(func() error { -// reader := bufio.NewReader(r) -// for { -// line, err := reader.ReadString('\n') -// if err != nil { -// if err == io.EOF { -// return nil -// } -// return err -// } -// line = strings.Trim(line, " \r\t") -// if len(line) == 0 { -// continue -// } -// if strings.HasPrefix(line, "{") { -// item := &PlaylistEntry{} -// err = json.Unmarshal([]byte(line), &item) -// if err != nil { -// return err -// } -// } else if body, ok := strings.CutPrefix(line, "%"); ok { -// p := &DownloadProgress{} -// err = json.Unmarshal([]byte(body), &p) -// if err != nil { -// return err -// } -// } else { -// return fmt.Errorf("Failed to parse output, unkonow first symbol: %v", string([]rune(line)[0])) -// } -// } -// }) - -// group.Go(func() error { -// err := cmd.Run() -// defer w.Close() -// if err != nil { -// return err -// } -// return nil -// }) - -// return group.Wait() -// } - func lineReader(group *errgroup.Group) (io.WriteCloser, <-chan string, error) { lines := make(chan string) var r io.Reader diff --git a/src/delivery/graphql/resolver/mutation.resolvers.go b/src/delivery/graphql/resolver/mutation.resolvers.go index e1cb709..894aa3b 100644 --- a/src/delivery/graphql/resolver/mutation.resolvers.go +++ b/src/delivery/graphql/resolver/mutation.resolvers.go @@ -93,9 +93,7 @@ func (r *mutationResolver) DownloadTorrent(ctx context.Context, infohash string, // UploadFile is the resolver for the uploadFile field. func (r *mutationResolver) UploadFile(ctx context.Context, dir string, file graphql.Upload) (bool, error) { - dir = pathlib.Join(r.Service.SourceDir, dir) - - dirInfo, err := os.Stat(dir) + dirInfo, err := r.SourceFS.Stat(dir) if err != nil { return false, err } @@ -105,7 +103,7 @@ func (r *mutationResolver) UploadFile(ctx context.Context, dir string, file grap } filename := pathlib.Join(dir, file.Filename) - target, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, os.ModePerm) + target, err := r.SourceFS.OpenFile(filename, os.O_CREATE|os.O_WRONLY, os.ModePerm) defer target.Close() if err != nil { return false, err diff --git a/src/delivery/graphql/resolver/resolver.go b/src/delivery/graphql/resolver/resolver.go index 9a76120..fcc5e22 100644 --- a/src/delivery/graphql/resolver/resolver.go +++ b/src/delivery/graphql/resolver/resolver.go @@ -3,6 +3,7 @@ package resolver import ( "git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/vfs" + "github.com/go-git/go-billy/v5" ) // This file will not be regenerated automatically. @@ -10,6 +11,7 @@ import ( // It serves as dependency injection for your app, add any dependencies you require here. type Resolver struct { - Service *service.Service - VFS vfs.Filesystem + Service *service.Service + VFS vfs.Filesystem + SourceFS billy.Filesystem } diff --git a/src/host/controller/sourceddir.go b/src/host/controller/sourceddir.go index d8d1294..f317c84 100644 --- a/src/host/controller/sourceddir.go +++ b/src/host/controller/sourceddir.go @@ -6,6 +6,10 @@ import ( "github.com/lrstanley/go-ytdlp" ) +type SourceUpdater struct { + sources []VirtDirSource +} + type SourcedDirSource string const ( diff --git a/src/host/service/queue.go b/src/host/service/queue.go index 16eb4ae..be0b41e 100644 --- a/src/host/service/queue.go +++ b/src/host/service/queue.go @@ -17,7 +17,7 @@ type TorrentDownloadTask struct { } func (s *Service) Download(ctx context.Context, task *TorrentDownloadTask) error { - t, ok := s.c.Torrent(task.InfoHash) + t, ok := s.client.Torrent(task.InfoHash) if !ok { return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString()) } diff --git a/src/host/service/service.go b/src/host/service/service.go index cd4c260..c56a898 100644 --- a/src/host/service/service.go +++ b/src/host/service/service.go @@ -11,6 +11,7 @@ import ( "slices" "strings" "sync" + "time" "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/pkg/rlog" @@ -28,8 +29,9 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/types" "github.com/anacrolix/torrent/types/infohash" + "github.com/go-git/go-billy/v5" + "github.com/go-git/go-billy/v5/util" "github.com/royalcat/kv" ) @@ -41,45 +43,68 @@ type DirAquire struct { } type Service struct { - c *torrent.Client + client *torrent.Client excludedFiles *store.FilesMappings infoBytes *store.InfoBytes + Storage *datastorage.DataStorage + fis *store.FileItemStore + dirsAquire kv.Store[string, DirAquire] + loadMutex sync.Mutex torrentLoaded chan struct{} - loadMutex sync.Mutex - - // stats *Stats - DefaultPriority types.PiecePriority - Storage *datastorage.DataStorage - SourceDir string - - dirsAquire kv.Store[string, DirAquire] + sourceFs billy.Filesystem log *rlog.Logger } -func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client, - storage *datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes, -) (*Service, error) { - dirsAcquire, err := tkv.New[string, DirAquire](cfg.MetadataFolder, "dir-acquire") +func New(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, error) { + s := &Service{ + log: rlog.Component("torrent-service"), + sourceFs: sourceFs, + torrentLoaded: make(chan struct{}), + loadMutex: sync.Mutex{}, + } + + err := os.MkdirAll(conf.MetadataFolder, 0744) + if err != nil { + return nil, fmt.Errorf("error creating metadata folder: %w", err) + } + + s.fis, err = store.NewFileItemStore(filepath.Join(conf.MetadataFolder, "items"), 2*time.Hour) + if err != nil { + return nil, fmt.Errorf("error starting item store: %w", err) + } + + s.Storage, _, err = datastorage.Setup(conf) if err != nil { return nil, err } - s := &Service{ - log: rlog.Component("torrent-service"), - c: c, - DefaultPriority: types.PiecePriorityNone, - excludedFiles: excludedFiles, - infoBytes: infoBytes, - Storage: storage, - SourceDir: sourceDir, - torrentLoaded: make(chan struct{}), - loadMutex: sync.Mutex{}, - dirsAquire: dirsAcquire, + s.excludedFiles, err = store.NewFileMappings(conf.MetadataFolder, s.Storage) + if err != nil { + return nil, err + } - // stats: newStats(), // TODO persistent + s.infoBytes, err = store.NewInfoBytes(conf.MetadataFolder) + if err != nil { + return nil, err + } + + id, err := store.GetOrCreatePeerID(filepath.Join(conf.MetadataFolder, "ID")) + if err != nil { + return nil, fmt.Errorf("error creating node ID: %w", err) + } + + client, err := store.NewClient(s.Storage, s.fis, &conf, id) + if err != nil { + return nil, fmt.Errorf("error starting torrent client: %w", err) + } + client.AddDhtNodes(conf.DHTNodes) + + s.dirsAquire, err = tkv.New[string, DirAquire](conf.MetadataFolder, "dir-acquire") + if err != nil { + return nil, err } go func() { @@ -96,11 +121,14 @@ func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client, var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs -func (s *Service) Close() error { - +func (s *Service) Close(ctx context.Context) error { return errors.Join(append( - s.c.Close(), + s.client.Close(), s.Storage.Close(), + s.dirsAquire.Close(ctx), + s.excludedFiles.Close(ctx), + s.infoBytes.Close(), + s.fis.Close(), )...) } @@ -123,7 +151,7 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err) } - t, ok := s.c.Torrent(mi.HashInfoBytes()) + t, ok := s.client.Torrent(mi.HashInfoBytes()) if !ok { span.AddEvent("torrent not found, loading from file") @@ -148,7 +176,7 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent } } - t, _ = s.c.AddTorrentOpt(torrent.AddTorrentOpts{ + t, _ = s.client.AddTorrentOpt(torrent.AddTorrentOpts{ InfoHash: spec.InfoHash, Storage: s.Storage, InfoBytes: infoBytes, @@ -223,7 +251,7 @@ func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, inf return true, false, nil } - for _, existingTorrent := range s.c.Torrents() { + for _, existingTorrent := range s.client.Torrents() { if existingTorrent.Name() != name || existingTorrent.InfoHash() == ih { continue } @@ -344,7 +372,7 @@ func (s *Service) Stats() (*Stats, error) { } func (s *Service) GetStats() torrent.ConnStats { - return s.c.ConnStats() + return s.client.ConnStats() } const loadWorkers = 5 @@ -386,7 +414,7 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error { go loaderWorker() } - return filepath.Walk(s.SourceDir, func(path string, info os.FileInfo, err error) error { + return util.Walk(s.sourceFs, "/", func(path string, info os.FileInfo, err error) error { if err != nil { return fmt.Errorf("fs walk error: %w", err) } @@ -411,7 +439,7 @@ func (s *Service) ListTorrents(ctx context.Context) ([]*controller.Torrent, erro <-s.torrentLoaded out := []*controller.Torrent{} - for _, v := range s.c.Torrents() { + for _, v := range s.client.Torrents() { out = append(out, controller.NewTorrent(v, s.excludedFiles)) } return out, nil @@ -420,7 +448,7 @@ func (s *Service) ListTorrents(ctx context.Context) ([]*controller.Torrent, erro func (s *Service) GetTorrent(infohashHex string) (*controller.Torrent, error) { <-s.torrentLoaded - t, ok := s.c.Torrent(infohash.FromHexString(infohashHex)) + t, ok := s.client.Torrent(infohash.FromHexString(infohashHex)) if !ok { return nil, nil } diff --git a/src/host/storage.go b/src/host/storage.go index 04af4b4..39d5c04 100644 --- a/src/host/storage.go +++ b/src/host/storage.go @@ -5,7 +5,7 @@ import ( "git.kmsign.ru/royalcat/tstor/src/host/vfs" ) -func NewTorrentStorage(dataPath string, tsrv *service.Service) vfs.Filesystem { +func NewTorrentStorage(sourceFS vfs.Filesystem, tsrv *service.Service) vfs.Filesystem { factories := map[string]vfs.FsFactory{ ".torrent": tsrv.NewTorrentFs, } @@ -15,5 +15,5 @@ func NewTorrentStorage(dataPath string, tsrv *service.Service) vfs.Filesystem { factories[k] = v } - return vfs.NewResolveFS(vfs.NewOsFs(dataPath), factories) + return vfs.NewResolveFS(sourceFS, factories) } diff --git a/src/host/store/file-mappings.go b/src/host/store/file-mappings.go index 2a1c5b2..068c210 100644 --- a/src/host/store/file-mappings.go +++ b/src/host/store/file-mappings.go @@ -55,3 +55,7 @@ func (r *FilesMappings) FileMappings(ctx context.Context, ih infohash.T) (map[st }) return out, err } + +func (r *FilesMappings) Close(ctx context.Context) error { + return r.mappings.Close(ctx) +} diff --git a/src/host/vfs/ctxbillyfs.go b/src/host/vfs/ctxbillyfs.go new file mode 100644 index 0000000..e040731 --- /dev/null +++ b/src/host/vfs/ctxbillyfs.go @@ -0,0 +1,146 @@ +package vfs + +import ( + "context" + "io/fs" + + "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" +) + +func NewCtxBillyFs(name string, fs ctxbilly.Filesystem) *CtxBillyFs { + return &CtxBillyFs{ + name: name, + fs: fs, + } +} + +type CtxBillyFs struct { + name string + fs ctxbilly.Filesystem +} + +var _ Filesystem = (*CtxBillyFs)(nil) + +// Info implements Filesystem. +func (c *CtxBillyFs) Info() (fs.FileInfo, error) { + return c.fs.Stat(context.Background(), "") +} + +// IsDir implements Filesystem. +func (c *CtxBillyFs) IsDir() bool { + return false +} + +// Name implements Filesystem. +func (c *CtxBillyFs) Name() string { + return c.name +} + +// Open implements Filesystem. +func (c *CtxBillyFs) Open(ctx context.Context, filename string) (File, error) { + info, err := c.fs.Stat(ctx, filename) + if err != nil { + return nil, err + } + bf, err := c.fs.Open(ctx, filename) + if err != nil { + return nil, err + } + return &CtxBillyFile{ + info: info, + file: bf, + }, nil +} + +// ReadDir implements Filesystem. +func (c *CtxBillyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { + infos, err := c.fs.ReadDir(ctx, path) + if err != nil { + return nil, err + } + + entries := make([]fs.DirEntry, 0, len(infos)) + for _, i := range infos { + entries = append(entries, &infoEntry{i}) + } + + return entries, nil +} + +type infoEntry struct { + fs.FileInfo +} + +var _ fs.DirEntry = (*infoEntry)(nil) + +// Info implements fs.DirEntry. +func (i *infoEntry) Info() (fs.FileInfo, error) { + return i.FileInfo, nil +} + +// Type implements fs.DirEntry. +func (i *infoEntry) Type() fs.FileMode { + return i.FileInfo.Mode() +} + +// Stat implements Filesystem. +func (c *CtxBillyFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { + return c.fs.Stat(ctx, filename) +} + +// Type implements Filesystem. +func (c *CtxBillyFs) Type() fs.FileMode { + return fs.ModeDir +} + +// Unlink implements Filesystem. +func (c *CtxBillyFs) Unlink(ctx context.Context, filename string) error { + return fs.ErrInvalid +} + +var _ File = (*CtxBillyFile)(nil) + +type CtxBillyFile struct { + info fs.FileInfo + file ctxbilly.File +} + +// Close implements File. +func (c *CtxBillyFile) Close(ctx context.Context) error { + return c.file.Close(ctx) +} + +// Info implements File. +func (c *CtxBillyFile) Info() (fs.FileInfo, error) { + return c.info, nil +} + +// IsDir implements File. +func (c *CtxBillyFile) IsDir() bool { + return c.info.IsDir() +} + +// Name implements File. +func (c *CtxBillyFile) Name() string { + return c.file.Name() +} + +// Read implements File. +func (c *CtxBillyFile) Read(ctx context.Context, p []byte) (n int, err error) { + return c.file.Read(ctx, p) +} + +// ReadAt implements File. +func (c *CtxBillyFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { + return c.file.ReadAt(ctx, p, off) +} + +// Size implements File. +func (c *CtxBillyFile) Size() int64 { + return c.info.Size() +} + +// Type implements File. +func (c *CtxBillyFile) Type() fs.FileMode { + return c.info.Mode() +}