rename and delete in root fs fix
Some checks failed
docker / build-docker (push) Failing after 1m33s

This commit is contained in:
royalcat 2024-10-14 03:58:42 +03:00
parent 80884aca6a
commit 63e63c1c37
18 changed files with 282 additions and 58 deletions

View file

@ -11,14 +11,12 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"syscall"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
wnfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/delivery"
"git.kmsign.ru/royalcat/tstor/src/sources"
"git.kmsign.ru/royalcat/tstor/src/sources/qbittorrent"
"git.kmsign.ru/royalcat/tstor/src/sources/ytdlp"
@ -189,15 +187,6 @@ func run(configPath string) error {
}
}()
go func() {
logFilename := filepath.Join(conf.Log.Path, "logs")
err := delivery.Run(nil, sfs, logFilename, conf)
if err != nil {
log.Error(ctx, "error initializing HTTP server", rlog.Error(err))
}
}()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

3
go.mod
View file

@ -2,6 +2,8 @@ module git.kmsign.ru/royalcat/tstor
go 1.22.3
replace github.com/iceber/iouring-go => github.com/royalcat/iouring-go v0.0.0-20240925200811-286062ac1b23
require (
github.com/99designs/gqlgen v0.17.49
github.com/agoda-com/opentelemetry-go/otelslog v0.1.1
@ -21,6 +23,7 @@ require (
github.com/grafana/otel-profiling-go v0.5.1
github.com/grafana/pyroscope-go v1.1.2
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90
github.com/knadh/koanf/parsers/yaml v0.1.0
github.com/knadh/koanf/providers/env v0.1.0
github.com/knadh/koanf/providers/file v0.1.0

3
go.sum
View file

@ -528,6 +528,8 @@ github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkL
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
github.com/royalcat/iouring-go v0.0.0-20240925200811-286062ac1b23 h1:3yOlLKYd6iSGkRUOCPuBQibjjvZyrGB/4sm0fh3nNuQ=
github.com/royalcat/iouring-go v0.0.0-20240925200811-286062ac1b23/go.mod h1:LEzdaZarZ5aqROlLIwJ4P7h3+4o71008fSy6wpaEB+s=
github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85 h1:AAuCp03M23u4qrK3dT1afFgf+diEijvSFnHb93Lv3PY=
github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv/kvbadger v0.0.0-20240707205211-fedd4883af85 h1:OXRYz+eDPAlQjE1UCSIoBzVHjQ3Ayx7fGSM/Zlo3bhI=
@ -782,6 +784,7 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

112
pkg/uring/file.go Normal file
View file

@ -0,0 +1,112 @@
package uring
import (
"context"
"os"
"github.com/iceber/iouring-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var tracer = otel.Tracer("github.com/royalcat/tstor/pkg/uring")
type FS struct {
ur *iouring.IOURing
}
func NewFS(ur *iouring.IOURing) *FS {
return &FS{
ur: ur,
}
}
func (o *FS) OpenFile(ctx context.Context, name string) (File, error) {
ctx, span := tracer.Start(ctx, "uring.FS.OpenFile", trace.WithAttributes(attribute.String("name", name)))
defer span.End()
f, err := os.Open(name)
if err != nil {
return File{}, err
}
return File{
ur: o.ur,
f: f,
}, nil
}
func NewFile(ur *iouring.IOURing, f *os.File) *File {
return &File{
ur: ur,
f: f,
}
}
type File struct {
ur *iouring.IOURing
f *os.File
}
func (o *File) pread(ctx context.Context, b []byte, off uint64) (int, error) {
ctx, span := tracer.Start(ctx, "uring.File.pread", trace.WithAttributes(attribute.Int("size", len(b))))
defer span.End()
req, err := o.ur.Pread(o.f, b, off, nil)
if err != nil {
return 0, err
}
select {
case <-req.Done():
return req.GetRes()
case <-ctx.Done():
if _, err := req.Cancel(); err != nil {
return 0, err
}
<-req.Done()
return 0, ctx.Err()
}
}
func (f *File) ReadAt(ctx context.Context, b []byte, off int64) (n int, err error) {
ctx, span := tracer.Start(ctx, "uring.File.ReadAt", trace.WithAttributes(attribute.Int("size", len(b))))
defer span.End()
return f.f.ReadAt(b, off)
for len(b) > 0 {
if ctx.Err() != nil {
err = ctx.Err()
break
}
m, e := f.pread(ctx, b, uint64(off))
if e != nil {
err = e
break
}
n += m
b = b[m:]
off += int64(m)
}
return n, err
}
func (o *File) Close(ctx context.Context) error {
return o.f.Close()
}
func waitRequest(ctx context.Context, req iouring.Request) (int, error) {
select {
case <-req.Done():
return req.GetRes()
case <-ctx.Done():
if _, err := req.Cancel(); err != nil {
return 0, err
}
return 0, ctx.Err()
}
}

View file

@ -118,8 +118,8 @@ func (bfs *fsWrapper) Remove(ctx context.Context, filename string) error {
}
// Rename implements billy.Filesystem.
func (*fsWrapper) Rename(ctx context.Context, oldpath string, newpath string) error {
return billy.ErrNotSupported
func (bfs *fsWrapper) Rename(ctx context.Context, oldpath string, newpath string) error {
return bfs.fs.Rename(ctx, oldpath, newpath)
}
// Root implements billy.Filesystem.

View file

@ -19,6 +19,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
"github.com/iceber/iouring-go"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel"
)
@ -30,6 +31,7 @@ type Daemon struct {
qb qbittorrent.Client
client *cacheClient
dataDir string
ur *iouring.IOURing
log *rlog.Logger
}
@ -90,7 +92,8 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
}
for { // wait for qbittorrent to start
_, err = qb.Application().Version(ctx)
ver, err := qb.Application().Version(ctx)
log.Info(ctx, "qbittorrent started", slog.String("version", ver))
if err == nil {
break
}
@ -110,10 +113,16 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
return nil, err
}
ur, err := iouring.New(8, iouring.WithAsync())
if err != nil {
return nil, err
}
return &Daemon{
qb: qb,
proc: proc,
dataDir: conf.DataFolder,
ur: ur,
client: wrapClient(qb),
log: rlog.Component("qbittorrent"),
}, nil
@ -162,7 +171,7 @@ func (fs *Daemon) GetTorrentFS(ctx context.Context, file vfs.File) (vfs.Filesyst
return nil, fmt.Errorf("error syncing torrent state: %w", err)
}
return newTorrentFS(ctx, fs.client, file.Name(), ih.HexString(), torrentPath)
return newTorrentFS(ctx, fs.ur, fs.client, file.Name(), ih.HexString(), torrentPath)
}
func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainfo.Hash, torrentPath string) error {

View file

@ -15,7 +15,9 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/pkg/uring"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/iceber/iouring-go"
)
type FS struct {
@ -25,6 +27,8 @@ type FS struct {
hash string
dataDir string // directory where torrent files are stored
ur *iouring.IOURing
entries map[string]fileEntry
log *rlog.Logger
@ -39,7 +43,7 @@ type fileEntry struct {
var _ vfs.Filesystem = (*FS)(nil)
func newTorrentFS(ctx context.Context, client *cacheClient, name string, hash string, dataDir string) (*FS, error) {
func newTorrentFS(ctx context.Context, ur *iouring.IOURing, client *cacheClient, name string, hash string, dataDir string) (*FS, error) {
ctx, span := trace.Start(ctx, "newTorrentFS")
defer span.End()
@ -69,6 +73,8 @@ func newTorrentFS(ctx context.Context, client *cacheClient, name string, hash st
entries: entries,
ur: ur,
log: rlog.Component("qbittorrent", "fs"),
FilesystemPrototype: vfs.FilesystemPrototype(name),
@ -82,7 +88,7 @@ func (f *FS) Open(ctx context.Context, name string) (vfs.File, error) {
}
if entry, ok := f.entries[name]; ok {
return openFile(ctx, f.client, f.dataDir, f.hash, entry.Content)
return openFile(ctx, f.ur, f.client, f.dataDir, f.hash, entry.Content)
}
for p := range f.entries {
@ -144,6 +150,27 @@ func (f *FS) Unlink(ctx context.Context, filename string) error {
return vfs.ErrNotExist
}
func (f *FS) Rename(ctx context.Context, oldpath string, newpath string) error {
oldpath = vfs.AbsPath(path.Clean(oldpath))
newpath = vfs.AbsPath(path.Clean(newpath))
if _, ok := f.entries[oldpath]; ok {
err := f.client.qb.Torrent().RenameFile(ctx, f.hash, vfs.RelPath(oldpath), vfs.RelPath(newpath))
if err != nil {
return fmt.Errorf("failed to rename file %s to %s: %w", oldpath, newpath, err)
}
f.mu.Lock()
defer f.mu.Unlock()
f.entries[newpath] = f.entries[oldpath]
return nil
}
return vfs.ErrNotExist
}
func (f *FS) removeFile(ctx context.Context, hash string, content *qbittorrent.TorrentContent) error {
log := f.log.With(slog.String("hash", hash), slog.String("file", content.Name))
@ -170,12 +197,17 @@ func (f *FS) removeFile(ctx context.Context, hash string, content *qbittorrent.T
return nil
}
func openFile(ctx context.Context, client *cacheClient, torrentDir string, hash string, content *qbittorrent.TorrentContent) (*File, error) {
func openFile(ctx context.Context, ur *iouring.IOURing, client *cacheClient, torrentDir string, hash string, content *qbittorrent.TorrentContent) (*File, error) {
props, err := client.getProperties(ctx, hash)
if err != nil {
return nil, err
}
file, err := os.OpenFile(path.Join(torrentDir, content.Name), os.O_RDONLY, 0)
if err != nil {
return nil, err
}
return &File{
client: client,
hash: hash,
@ -186,6 +218,8 @@ func openFile(ctx context.Context, client *cacheClient, torrentDir string, hash
pieceSize: props.PieceSize,
fileSize: content.Size,
file: uring.NewFile(ur, file),
offset: 0,
}, nil
}
@ -199,8 +233,9 @@ type File struct {
pieceSize int
fileSize int64
mu sync.Mutex
file *uring.File
offset int64
osfile *os.File
}
var _ vfs.File = (*File)(nil)
@ -298,33 +333,26 @@ func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) e
}
// Read implements vfs.File.
func (f *File) Read(ctx context.Context, p []byte) (n int, err error) {
func (f *File) Read(ctx context.Context, p []byte) (int, error) {
f.mu.Lock()
defer f.mu.Unlock()
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
return 0, err
}
descriptor, err := f.descriptor()
if err != nil {
return 0, err
}
n, err = descriptor.ReadAt(p, f.offset)
n, err := f.file.ReadAt(ctx, p, f.offset)
f.offset += int64(n)
return n, err
}
// ReadAt implements vfs.File.
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
return 0, err
}
descriptor, err := f.descriptor()
if err != nil {
return 0, err
}
return descriptor.ReadAt(p, off)
return f.file.ReadAt(ctx, p, off)
}
// Size implements vfs.File.
@ -337,28 +365,9 @@ func (f *File) Type() fs.FileMode {
return fs.ModeDir
}
func (f *File) descriptor() (*os.File, error) {
if f.osfile != nil {
return f.osfile, nil
}
osfile, err := os.Open(path.Join(f.torrentDir, f.filePath))
if err != nil {
return nil, err
}
f.osfile = osfile
return f.osfile, nil
}
// Close implements vfs.File.
func (f *File) Close(ctx context.Context) error {
if f.osfile != nil {
err := f.osfile.Close()
f.osfile = nil
return err
}
return nil
return f.file.Close(ctx)
}
type fileInfo struct {

View file

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"os/exec"
@ -85,10 +86,13 @@ func downloadLatestQbitRelease(ctx context.Context, binPath string) error {
if err != nil {
return err
}
return downloadFile(binPath, downloadUrl)
slog.InfoContext(ctx, "downloading latest qbittorrent-nox release", slog.String("url", downloadUrl))
return downloadFile(ctx, binPath, downloadUrl)
}
func downloadFile(filepath string, webUrl string) error {
func downloadFile(ctx context.Context, filepath string, webUrl string) error {
if stat, err := os.Stat(filepath); err == nil {
resp, err := http.Head(webUrl)
if err != nil {
@ -107,6 +111,7 @@ func downloadFile(filepath string, webUrl string) error {
}
if resp.ContentLength == stat.Size() && lastModified.Before(stat.ModTime()) {
slog.InfoContext(ctx, "there is already newest version of the file", slog.String("filepath", filepath))
return nil
}
}
@ -118,8 +123,13 @@ func downloadFile(filepath string, webUrl string) error {
}
defer out.Close()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, webUrl, nil)
if err != nil {
return err
}
// Get the data
resp, err := http.Get(webUrl)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

View file

@ -383,6 +383,11 @@ func (fs *TorrentFS) Unlink(ctx context.Context, name string) error {
return fs.Torrent.ExcludeFile(ctx, tfile.file)
}
// Rename implements vfs.Filesystem.
func (s *TorrentFS) Rename(ctx context.Context, oldpath string, newpath string) error {
return vfs.ErrNotImplemented
}
var _ vfs.File = (*torrentFile)(nil)
type torrentFile struct {

View file

@ -68,3 +68,8 @@ func (s *SourceFS) Stat(ctx context.Context, filename string) (fs.FileInfo, erro
func (s *SourceFS) Unlink(ctx context.Context, filename string) error {
return vfs.ErrNotImplemented
}
// Rename implements vfs.Filesystem.
func (s *SourceFS) Rename(ctx context.Context, oldpath string, newpath string) error {
return vfs.ErrNotImplemented
}

View file

@ -54,6 +54,11 @@ type ArchiveFS struct {
files map[string]File
}
// Rename implements Filesystem.
func (a *ArchiveFS) Rename(ctx context.Context, oldpath string, newpath string) error {
return ErrNotImplemented
}
// ModTime implements Filesystem.
func (a *ArchiveFS) ModTime() time.Time {
return time.Time{}
@ -257,6 +262,9 @@ func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
}
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
ctx, span := tracer.Start(ctx, "archive.File.Read")
defer span.End()
d.m.Lock()
defer d.m.Unlock()

View file

@ -65,6 +65,11 @@ func (c *CtxBillyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, e
return entries, nil
}
// Rename implements Filesystem.
func (c *CtxBillyFs) Rename(ctx context.Context, oldpath string, newpath string) error {
return c.fs.Rename(ctx, oldpath, newpath)
}
type infoEntry struct {
fs.FileInfo
}
@ -93,7 +98,7 @@ func (c *CtxBillyFs) Type() fs.FileMode {
// Unlink implements Filesystem.
func (c *CtxBillyFs) Unlink(ctx context.Context, filename string) error {
return fs.ErrInvalid
return c.fs.Remove(ctx, filename)
}
func NewCtxBillyFile(info fs.FileInfo, bf ctxbilly.File) *CtxBillyFile {

View file

@ -14,6 +14,11 @@ type DummyFs struct {
name string
}
// Rename implements Filesystem.
func (d *DummyFs) Rename(ctx context.Context, oldpath string, newpath string) error {
return ErrNotImplemented
}
// ModTime implements Filesystem.
func (d *DummyFs) ModTime() time.Time {
return time.Time{}

View file

@ -37,6 +37,8 @@ type Filesystem interface {
Stat(ctx context.Context, filename string) (fs.FileInfo, error)
Unlink(ctx context.Context, filename string) error
Rename(ctx context.Context, oldpath, newpath string) error
// As filesystem mounted to some path, make sense to have the filesystem implement DirEntry
fs.DirEntry
}

View file

@ -202,6 +202,25 @@ func (fs *LogFS) Unlink(ctx context.Context, filename string) (err error) {
return err
}
// Rename implements Filesystem.
func (lfs *LogFS) Rename(ctx context.Context, oldpath string, newpath string) error {
ctx, cancel := context.WithTimeout(ctx, lfs.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "Rename",
lfs.traceAttrs(
attribute.String("oldpath", oldpath),
attribute.String("newpath", newpath),
),
)
defer span.End()
err := lfs.fs.Rename(ctx, oldpath, newpath)
if isLoggableError(err) {
lfs.log.Error(ctx, "Failed to rename", rlog.Error(err))
}
return err
}
type LogFile struct {
filename string
f File

View file

@ -60,6 +60,11 @@ func (mfs *MemoryFs) Type() fs.FileMode {
return fs.ModeDir
}
// Rename implements Filesystem.
func (mfs *MemoryFs) Rename(ctx context.Context, oldpath string, newpath string) error {
return ErrNotImplemented
}
func NewMemoryFS(name string, files map[string]*MemoryFile) *MemoryFs {
return &MemoryFs{
name: name,

View file

@ -2,6 +2,7 @@ package vfs
import (
"context"
"fmt"
"io/fs"
"os"
"path"
@ -29,7 +30,11 @@ func (fs *OsFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error)
// Unlink implements Filesystem.
func (fs *OsFS) Unlink(ctx context.Context, filename string) error {
return os.RemoveAll(path.Join(fs.hostDir, filename))
err := os.Remove(path.Join(fs.hostDir, filename))
if err != nil {
return fmt.Errorf("failed to remove %s: %w", filename, err)
}
return nil
}
// Open implements Filesystem.
@ -41,6 +46,11 @@ func (fs *OsFS) Open(ctx context.Context, filename string) (File, error) {
return NewLazyOsFile(path.Join(fs.hostDir, filename))
}
// Rename implements Filesystem.
func (fs *OsFS) Rename(ctx context.Context, oldpath string, newpath string) error {
return os.Rename(path.Join(fs.hostDir, oldpath), path.Join(fs.hostDir, newpath))
}
// ReadDir implements Filesystem.
func (o *OsFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) {
return os.ReadDir(path.Join(o.hostDir, dir))

View file

@ -175,6 +175,31 @@ func (r *ResolverFS) Unlink(ctx context.Context, filename string) error {
return r.rootFS.Unlink(ctx, fsPath)
}
// Rename implements Filesystem.
func (r *ResolverFS) Rename(ctx context.Context, oldpath string, newpath string) error {
oldFsPath, oldNestedFs, oldNestedFsPath, err := r.resolver.ResolvePath(ctx, oldpath, r.rootFS.Open)
if err != nil {
return err
}
newFsPath, newNestedFs, newNestedFsPath, err := r.resolver.ResolvePath(ctx, newpath, r.rootFS.Open)
if err != nil {
return err
}
if oldNestedFs == nil && newNestedFs == nil {
return r.rootFS.Rename(ctx, oldFsPath, newFsPath)
}
fmt.Println(oldNestedFs)
fmt.Println(newNestedFs)
if oldNestedFs == nil || newNestedFs == nil || oldNestedFs == newNestedFs {
return oldNestedFs.Rename(ctx, oldNestedFsPath, newNestedFsPath)
}
return fmt.Errorf("rename between different nested filesystems is not supported")
}
// Info implements Filesystem.
func (r *ResolverFS) Info() (fs.FileInfo, error) {
return r, nil