qbittorrent fs

This commit is contained in:
royalcat 2024-09-01 02:00:13 +03:00
parent ae4501ae21
commit f75188b412
51 changed files with 4048 additions and 231 deletions

View file

@ -6,6 +6,16 @@ var defaultConfig = Settings{
Port: 4444,
IP: "0.0.0.0",
},
Sources: Sources{
QBittorrent: QBittorrent{
DataFolder: "./qbittorrent/data",
MetadataFolder: "./qbittorrent/metadata",
},
TorrentClient: TorrentClient{
DataFolder: "./torrent/data",
MetadataFolder: "./torrent/metadata",
},
},
Mounts: Mounts{
HttpFs: HttpFs{
Enabled: true,
@ -27,17 +37,6 @@ var defaultConfig = Settings{
},
},
TorrentClient: TorrentClient{
DataFolder: "./torrent/data",
MetadataFolder: "./torrent/metadata",
DHTNodes: []string{},
// GlobalCacheSize: 2048,
// AddTimeout: 60,
// ReadTimeout: 120,
},
Log: Log{
Path: "/tmp/tstor",
MaxBackups: 2,

View file

@ -2,16 +2,23 @@ package config
// Config is the main config object
type Settings struct {
WebUi WebUi `koanf:"webUi"`
TorrentClient TorrentClient `koanf:"torrent"`
Mounts Mounts `koanf:"mounts"`
Log Log `koanf:"log"`
WebUi WebUi `koanf:"webUi"`
Sources Sources `koanf:"sources"`
Mounts Mounts `koanf:"mounts"`
Log Log `koanf:"log"`
SourceDir string `koanf:"source_dir"`
OtelHttp string `koanf:"otel_http"`
}
type Sources struct {
TorrentClient TorrentClient `koanf:"torrent"`
QBittorrent QBittorrent `koanf:"qbittorrent"`
}
type WebUi struct {
Port int `koanf:"port"`
IP string `koanf:"ip"`
@ -25,6 +32,11 @@ type Log struct {
Path string `koanf:"path"`
}
type QBittorrent struct {
DataFolder string `koanf:"data_folder,omitempty"`
MetadataFolder string `koanf:"metadata_folder,omitempty"`
}
type TorrentClient struct {
// ReadTimeout int `koanf:"read_timeout,omitempty"`
// AddTimeout int `koanf:"add_timeout,omitempty"`

View file

@ -7,13 +7,13 @@ import (
nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
nfshelper "git.kmsign.ru/royalcat/tstor/pkg/go-nfs/helpers"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"git.kmsign.ru/royalcat/tstor/src/vfs"
)
func NewNFSv3Handler(fs vfs.Filesystem, config config.NFS) (nfs.Handler, error) {
nfslog := slog.With("component", "nfs")
nfs.SetLogger(log.NewNFSLog(nfslog))
nfs.SetLogger(logwrap.NewNFSLog(nfslog))
nfs.Log.SetLevel(nfs.InfoLevel)
bfs := &fsWrapper{fs: fs, log: nfslog, timeout: time.Minute}

View file

@ -10,7 +10,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
@ -51,7 +51,7 @@ var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvh
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) {
opts := kvbadger.DefaultOptions[handle](path.Join(config.CachePath, "handlers"))
opts.Codec = kv.CodecBinary[handle, *handle]{}
opts.BadgerOptions.Logger = log.BadgerLogger("nfs", "kvhandler")
opts.BadgerOptions.Logger = logwrap.BadgerLogger("nfs", "kvhandler")
activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts)
if err != nil {

View file

@ -1,4 +1,4 @@
package log
package logwrap
import (
"context"

View file

@ -1,4 +1,4 @@
package log
package logwrap
const FileName = "tstor.log"

View file

@ -1,4 +1,4 @@
package log
package logwrap
import (
"fmt"

View file

@ -1,4 +1,4 @@
package log
package logwrap
import (
"context"

48
src/logwrap/writer.go Normal file
View file

@ -0,0 +1,48 @@
package logwrap
import (
"bufio"
"bytes"
"context"
"sync"
"log/slog"
)
type SlogWriter struct {
ctx context.Context
level slog.Level
log *slog.Logger
mu sync.Mutex
buffer *bytes.Buffer
scanner *bufio.Scanner
}
func NewSlogWriter(ctx context.Context, level slog.Level, log *slog.Logger) *SlogWriter {
buf := &bytes.Buffer{}
return &SlogWriter{
ctx: ctx,
level: level,
log: log,
buffer: buf,
scanner: bufio.NewScanner(buf),
}
}
func (sw *SlogWriter) Write(p []byte) (n int, err error) {
sw.mu.Lock()
defer sw.mu.Unlock()
n, err = sw.buffer.Write(p)
if err != nil {
return n, err
}
for sw.scanner.Scan() {
sw.log.Log(sw.ctx, sw.level, sw.scanner.Text())
}
return n, err
}

View file

@ -6,35 +6,54 @@ import (
"slices"
"time"
"github.com/xuthus5/qbittorrent-client-go/qbittorrent"
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
)
type client struct {
type cacheClient struct {
qb qbittorrent.Client
}
func wrapClient(qb qbittorrent.Client) *client {
return &client{qb: qb}
func wrapClient(qb qbittorrent.Client) *cacheClient {
return &cacheClient{qb: qb}
}
func (f *client) getFileContent(ctx context.Context, hash string, contextIndex int) (*qbittorrent.TorrentContent, error) {
contents, err := f.qb.Torrent().GetContents(hash)
var errNotFound = fmt.Errorf("not found")
func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) {
info, err := f.qb.Torrent().GetProperties(ctx, hash)
if err != nil {
return nil, err
}
contentIndex := slices.IndexFunc(contents, func(c *qbittorrent.TorrentContent) bool {
return c.Index == contextIndex
return info, nil
}
func (f *cacheClient) listContent(ctx context.Context, hash string) ([]*qbittorrent.TorrentContent, error) {
contents, err := f.qb.Torrent().GetContents(ctx, hash)
if err != nil {
return nil, err
}
return contents, nil
}
func (f *cacheClient) getContent(ctx context.Context, hash string, contentIndex int) (*qbittorrent.TorrentContent, error) {
contents, err := f.qb.Torrent().GetContents(ctx, hash, contentIndex)
if err != nil {
return nil, err
}
contentI := slices.IndexFunc(contents, func(c *qbittorrent.TorrentContent) bool {
return c.Index == contentIndex
})
if contentIndex == -1 {
if contentI == -1 {
return nil, fmt.Errorf("content not found")
}
return contents[contentIndex], nil
return contents[contentI], nil
}
func (f *client) isPieceComplete(ctx context.Context, hash string, pieceIndex int) (bool, error) {
completion, err := f.qb.Torrent().GetPiecesStates(hash)
func (f *cacheClient) isPieceComplete(ctx context.Context, hash string, pieceIndex int) (bool, error) {
completion, err := f.qb.Torrent().GetPiecesStates(ctx, hash)
if err != nil {
return false, err
}
@ -46,7 +65,7 @@ func (f *client) isPieceComplete(ctx context.Context, hash string, pieceIndex in
return false, nil
}
func (f *client) waitPieceToComplete(ctx context.Context, hash string, pieceIndex int) error {
func (f *cacheClient) waitPieceToComplete(ctx context.Context, hash string, pieceIndex int) error {
const checkingInterval = 1 * time.Second
ok, err := f.isPieceComplete(ctx, hash, pieceIndex)

View file

@ -1,28 +1,107 @@
package qbittorrent
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path"
"path/filepath"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
"github.com/royalcat/ctxio"
"github.com/xuthus5/qbittorrent-client-go/qbittorrent"
)
type Daemon struct {
proc *os.Process
qb qbittorrent.Client
client *client
client *cacheClient
dataDir string
log *rlog.Logger
}
func NewDaemon(dir string) (*Daemon, error) {
const defaultConf = `
[LegalNotice]
Accepted=true
dataDir := dir + "/data"
qb, err := qbittorrent.NewClient(&qbittorrent.Config{
Address: "localhost:8080",
[Preferences]
WebUI\LocalHostAuth=false
WebUI\Password_PBKDF2="@ByteArray(qef5I4wZBkDG+PP6/5mQwA==:LoTmorQM/QM5RHI4+dOiu6xfAz9xak6fhR4ZGpRtJF3JNCGG081Yrtva4G71kXz//ODUuWQKTLlrZPuIDvzqUQ==)"
`
func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
ctx := context.Background()
log := rlog.Component("qbittorrent")
binPath := conf.MetadataFolder + "/qbittorrent-nox"
err := downloadLatestQbitRelease(ctx, binPath)
if err != nil {
return nil, err
}
daemonLog := log.WithComponent("process")
outLog := logwrap.NewSlogWriter(ctx, slog.LevelInfo, daemonLog.Slog())
errLog := logwrap.NewSlogWriter(ctx, slog.LevelError, daemonLog.Slog())
_, err = os.Stat(conf.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf")
if errors.Is(err, os.ErrNotExist) {
err = os.MkdirAll(conf.MetadataFolder+"/profile/qBittorrent/config", 0744)
if err != nil {
return nil, err
}
err = os.WriteFile(conf.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644)
if err != nil {
return nil, err
}
}
err = os.MkdirAll(conf.DataFolder, 0744)
if err != nil {
return nil, err
}
const port = 25436
proc, err := runQBittorrent(binPath, conf.MetadataFolder+"/profile", port, outLog, errLog)
if err != nil {
return nil, err
}
time.Sleep(time.Second)
qb, err := qbittorrent.NewClient(ctx, &qbittorrent.Config{
Address: fmt.Sprintf("http://localhost:%d", port),
})
if err != nil {
return nil, err
}
for { // wait for qbittorrent to start
_, err = qb.Application().Version(ctx)
if err == nil {
break
}
log.Warn(ctx, "waiting for qbittorrent to start", rlog.Error(err))
time.Sleep(time.Second)
}
dataDir, err := filepath.Abs(conf.DataFolder)
if err != nil {
return nil, err
}
err = qb.Application().SetPreferences(ctx, &qbittorrent.Preferences{
SavePath: dataDir,
})
if err != nil {
return nil, err
@ -30,37 +109,20 @@ func NewDaemon(dir string) (*Daemon, error) {
return &Daemon{
qb: qb,
dataDir: dataDir,
proc: proc,
dataDir: conf.DataFolder,
client: wrapClient(qb),
log: rlog.Component("qbittorrent"),
}, nil
}
func (fs *Daemon) torrentPath(ih infohash.T) string {
return path.Join(fs.dataDir, ih.HexString())
}
func (fs *Daemon) addTorrent(ctx context.Context, f vfs.File) error {
file, err := ctxio.ReadAll(ctx, f)
func (d *Daemon) Close(ctx context.Context) error {
err := d.proc.Signal(os.Interrupt)
if err != nil {
return err
}
mi, err := metainfo.Load(bytes.NewBuffer(file))
if err != nil {
return err
}
ih := mi.HashInfoBytes()
err = fs.qb.Torrent().AddNewTorrent(&qbittorrent.TorrentAddOption{
Torrents: []*qbittorrent.TorrentAddFileMetadata{
{
Data: file,
},
},
SavePath: fs.torrentPath(ih),
// SequentialDownload: "true",
// FirstLastPiecePrio: "true",
})
_, err = d.proc.Wait()
if err != nil {
return err
}
@ -68,27 +130,107 @@ func (fs *Daemon) addTorrent(ctx context.Context, f vfs.File) error {
return nil
}
func (fs *Daemon) TorrentFS(ctx context.Context, file vfs.File) (*FS, error) {
func (d *Daemon) torrentPath(ih infohash.T) (string, error) {
return filepath.Abs(path.Join(d.dataDir, ih.HexString()))
}
func (fs *Daemon) TorrentFS(ctx context.Context, file vfs.File) (vfs.Filesystem, error) {
log := fs.log.With(slog.String("file", file.Name()))
ih, err := readInfoHash(ctx, file)
if err != nil {
return nil, err
}
log = log.With(slog.String("infohash", ih.HexString()))
existing, err := fs.qb.Torrent().GetTorrents(&qbittorrent.TorrentOption{
torrentPath, err := fs.torrentPath(ih)
if err != nil {
return nil, fmt.Errorf("error getting torrent path: %w", err)
}
log = log.With(slog.String("torrentPath", torrentPath))
log.Debug(ctx, "creating fs for torrent")
err = fs.syncTorrentState(ctx, file, ih, torrentPath)
if err != nil {
return nil, fmt.Errorf("error syncing torrent state: %w", err)
}
return newTorrentFS(ctx, fs.client, file.Name(), ih.HexString(), torrentPath)
}
func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainfo.Hash, torrentPath string) error {
log := d.log.With(slog.String("file", file.Name()), slog.String("infohash", ih.HexString()))
existing, err := d.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
Hashes: []string{ih.HexString()},
})
if err != nil {
return nil, err
return fmt.Errorf("error to check torrent existence: %w", err)
}
log = log.With(slog.String("torrentPath", torrentPath))
if len(existing) == 0 {
err := fs.addTorrent(ctx, file)
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return nil, err
return err
}
data, err := ctxio.ReadAll(ctx, file)
if err != nil {
return err
}
err = d.qb.Torrent().AddNewTorrent(ctx, &qbittorrent.TorrentAddOption{
Torrents: []*qbittorrent.TorrentAddFileMetadata{
{
Data: data,
},
},
SavePath: torrentPath,
// SequentialDownload: "true",
FirstLastPiecePrio: "true",
})
if err != nil {
return err
}
for {
_, err := d.qb.Torrent().GetProperties(ctx, ih.HexString())
if err == nil {
break
}
log.Error(ctx, "waiting for torrent to be added", rlog.Error(err))
time.Sleep(time.Millisecond * 15)
}
log.Info(ctx, "added torrent", slog.String("infohash", ih.HexString()))
if err != nil {
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
return err
}
return nil
} else if len(existing) == 1 {
// info := existing[0]
props, err := d.qb.Torrent().GetProperties(ctx, ih.HexString())
if err != nil {
return err
}
if props.SavePath != torrentPath {
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)
if err != nil {
return err
}
}
return nil
}
return newTorrentFS(fs.client, file.Name(), ih.HexString(), fs.torrentPath(ih))
return fmt.Errorf("multiple torrents with the same infohash")
}
// TODO caching
@ -97,5 +239,15 @@ func readInfoHash(ctx context.Context, file vfs.File) (infohash.T, error) {
if err != nil {
return infohash.T{}, err
}
return mi.HashInfoBytes(), nil
info, err := mi.UnmarshalInfo()
if err != nil {
return infohash.T{}, err
}
if info.HasV2() {
ih := infohash_v2.HashBytes(mi.InfoBytes)
return *(&ih).ToShort(), nil
}
return infohash.HashBytes(mi.InfoBytes), nil
}

View file

@ -2,87 +2,123 @@ package qbittorrent
import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path"
"strings"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
"git.kmsign.ru/royalcat/tstor/src/vfs"
)
type FS struct {
client *client
client *cacheClient
name string
hash string
dataDir string
content map[string]*qbittorrent.TorrentContent
files map[string]fs.FileInfo
vfs.FilesystemPrototype
}
var _ vfs.Filesystem = (*FS)(nil)
func newTorrentFS(client *client, name string, hash string, dataDir string) (*FS, error) {
func newTorrentFS(ctx context.Context, client *cacheClient, name string, hash string, dataDir string) (*FS, error) {
cnts, err := client.listContent(ctx, hash)
if err != nil {
return nil, fmt.Errorf("failed to list content for hash %s: %w", hash, err)
}
content := make(map[string]*qbittorrent.TorrentContent, len(cnts))
files := make(map[string]fs.FileInfo, len(cnts))
for _, cnt := range cnts {
path := vfs.AbsPath(cnt.Name)
files[path] = vfs.NewFileInfo(cnt.Name, cnt.Size)
content[path] = cnt
}
return &FS{
client: client,
name: name,
hash: hash,
client: client,
name: name,
hash: hash,
dataDir: dataDir,
content: content,
files: files,
FilesystemPrototype: vfs.FilesystemPrototype(name),
}, nil
}
// Info implements vfs.Filesystem.
func (f *FS) Info() (fs.FileInfo, error) {
return vfs.NewDirInfo(f.name), nil
}
// IsDir implements vfs.Filesystem.
func (f *FS) IsDir() bool {
return true
}
// Name implements vfs.Filesystem.
func (f *FS) Name() string {
return path.Base(f.dataDir)
}
// Open implements vfs.Filesystem.
func (f *FS) Open(ctx context.Context, filename string) (vfs.File, error) {
panic("unimplemented")
func (f *FS) Open(ctx context.Context, name string) (vfs.File, error) {
if name == vfs.Separator {
return vfs.NewDirFile(name), nil
}
cnt, ok := f.content[name]
if ok {
return openFile(ctx, f.client, f.dataDir, f.hash, cnt)
}
for p := range f.content {
if strings.HasPrefix(p, name) {
return vfs.NewDirFile(name), nil
}
}
return nil, vfs.ErrNotExist
}
// ReadDir implements vfs.Filesystem.
func (f *FS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
panic("unimplemented")
func (fs *FS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
return vfs.ListDirFromInfo(fs.files, name)
}
// Stat implements vfs.Filesystem.
func (f *FS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
return vfs.NewDirInfo(f.name), nil
}
// Type implements vfs.Filesystem.
func (f *FS) Type() fs.FileMode {
return vfs.ROMode
func (f *FS) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
info, ok := f.files[name]
if !ok {
return nil, vfs.ErrNotExist
}
return info, nil
}
// Unlink implements vfs.Filesystem.
func (f *FS) Unlink(ctx context.Context, filename string) error {
panic("unimplemented")
return vfs.ErrNotImplemented
}
func openFile(ctx context.Context, client client, hash, filePath string) *File {
client.getFileContent(ctx, hash, 0)
func openFile(ctx context.Context, client *cacheClient, torrentDir string, hash string, content *qbittorrent.TorrentContent) (*File, error) {
props, err := client.getProperties(ctx, hash)
if err != nil {
return nil, err
}
return &File{
client: client,
hash: hash,
filePath: filePath,
}
client: client,
hash: hash,
torrentDir: torrentDir,
filePath: content.Name,
contentIndex: content.Index,
pieceSize: props.PieceSize,
fileSize: content.Size,
offset: 0,
}, nil
}
type File struct {
client client
client *cacheClient
hash string
dataDir string
torrentDir string
filePath string // path inside a torrent directory
contentIndex int
pieceSize int
@ -94,16 +130,6 @@ type File struct {
var _ vfs.File = (*File)(nil)
// Close implements vfs.File.
func (f *File) Close(ctx context.Context) error {
if f.osfile != nil {
err := f.osfile.Close()
f.osfile = nil
return err
}
return nil
}
// Info implements vfs.File.
func (f *File) Info() (fs.FileInfo, error) {
return &fileInfo{name: path.Base(f.filePath), size: f.fileSize}, nil
@ -173,7 +199,7 @@ func (f *File) Size() int64 {
// Type implements vfs.File.
func (f *File) Type() fs.FileMode {
return vfs.ROMode
return fs.ModeDir
}
func (f *File) descriptor() (*os.File, error) {
@ -181,7 +207,7 @@ func (f *File) descriptor() (*os.File, error) {
return f.osfile, nil
}
osfile, err := os.Open(path.Join(f.dataDir, f.filePath))
osfile, err := os.Open(path.Join(f.torrentDir, f.filePath))
if err != nil {
return nil, err
}
@ -190,6 +216,16 @@ func (f *File) descriptor() (*os.File, error) {
return f.osfile, nil
}
// Close implements vfs.File.
func (f *File) Close(ctx context.Context) error {
if f.osfile != nil {
err := f.osfile.Close()
f.osfile = nil
return err
}
return nil
}
type fileInfo struct {
name string
size int64

View file

@ -20,25 +20,26 @@ import (
const (
repoOwner = "userdocs"
repoName = "qbittorrent-nox-static"
binName = "qbittorrent-nox"
)
func runQBittorrent(binDir string, profileDir string, stdout, stderr io.Writer) (*os.Process, error) {
cmd := exec.Command(
path.Join(binDir, binName),
fmt.Sprintf("--profile=%s", profileDir),
)
func runQBittorrent(binPath string, profileDir string, port int, stdout, stderr io.Writer) (*os.Process, error) {
err := os.Chmod(binPath, 0755)
if err != nil {
return nil, err
}
cmd := exec.Command(binPath, fmt.Sprintf("--profile=%s", profileDir), fmt.Sprintf("--webui-port=%d", port))
cmd.Stdin = bytes.NewReader([]byte("y\n"))
cmd.Stdout = stdout
cmd.Stderr = stderr
err := cmd.Start()
err = cmd.Start()
if err != nil {
return nil, err
}
return cmd.Process, nil
}
func downloadLatestRelease(ctx context.Context, binPath string) error {
func downloadLatestQbitRelease(ctx context.Context, binPath string) error {
client := github.NewClient(nil)
rel, _, err := client.Repositories.GetLatestRelease(ctx, repoOwner, repoName)
if err != nil {

View file

@ -11,8 +11,8 @@ func TestDownloadQBittorent(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
require := require.New(t)
err := downloadLatestRelease(ctx, tempDir)
err := downloadLatestQbitRelease(ctx, tempDir)
require.NoError(err)
err = downloadLatestRelease(ctx, tempDir)
err = downloadLatestQbitRelease(ctx, tempDir)
require.NoError(err)
}

View file

@ -1,14 +1,14 @@
package sources
import (
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
"git.kmsign.ru/royalcat/tstor/src/sources/qbittorrent"
"git.kmsign.ru/royalcat/tstor/src/sources/ytdlp"
"git.kmsign.ru/royalcat/tstor/src/vfs"
)
func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Daemon, ytdlpsrv *ytdlp.Daemon) vfs.Filesystem {
func NewHostedFS(sourceFS vfs.Filesystem, tsrv *qbittorrent.Daemon, ytdlpsrv *ytdlp.Daemon) vfs.Filesystem {
factories := map[string]vfs.FsFactory{
".torrent": tsrv.NewTorrentFs,
".torrent": tsrv.TorrentFS,
".ts-ytdlp": ytdlpsrv.BuildFS,
}

View file

@ -6,7 +6,7 @@ import (
"os"
"git.kmsign.ru/royalcat/tstor/src/config"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"github.com/anacrolix/dht/v2/bep44"
tlog "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
@ -37,7 +37,7 @@ func newClientConfig(st storage.ClientImpl, fis bep44.Store, cfg *config.Torrent
// }
tl := tlog.NewLogger("torrent-client")
tl.SetHandlers(&dlog.Torrent{L: l})
tl.SetHandlers(&logwrap.Torrent{L: l})
torrentCfg.Logger = tl
torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) {

View file

@ -121,13 +121,13 @@ func NewDaemon(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, e
return nil, err
}
go func() {
ctx := context.Background()
err := s.backgroudFileLoad(ctx)
if err != nil {
s.log.Error(ctx, "initial torrent load failed", rlog.Error(err))
}
}()
// go func() {
// ctx := context.Background()
// err := s.backgroudFileLoad(ctx)
// if err != nil {
// s.log.Error(ctx, "initial torrent load failed", rlog.Error(err))
// }
// }()
go func() {
ctx := context.Background()

View file

@ -5,7 +5,7 @@ import (
"encoding/gob"
"time"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"github.com/anacrolix/dht/v2/bep44"
"github.com/dgraph-io/badger/v4"
)
@ -19,7 +19,7 @@ type dhtFileItemStore struct {
func newDHTStore(path string, itemsTTL time.Duration) (*dhtFileItemStore, error) {
opts := badger.DefaultOptions(path).
WithLogger(dlog.BadgerLogger("torrent-client", "dht-item-store")).
WithLogger(logwrap.BadgerLogger("torrent-client", "dht-item-store")).
WithValueLogFileSize(1<<26 - 1)
db, err := badger.Open(opts)

View file

@ -138,26 +138,26 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
}
// TODO optional
if len(fs.filesCache) == 1 && fs.resolver.IsNestedFs(fs.Torrent.Name()) {
filepath := "/" + fs.Torrent.Name()
if file, ok := fs.filesCache[filepath]; ok {
nestedFs, err := fs.resolver.NestedFs(ctx, filepath, file)
if err != nil {
return nil, err
}
if nestedFs == nil {
goto DEFAULT_DIR // FIXME
}
fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/")
if err != nil {
return nil, err
}
// if len(fs.filesCache) == 1 && fs.resolver.IsNestedFs(fs.Torrent.Name()) {
// filepath := "/" + fs.Torrent.Name()
// if file, ok := fs.filesCache[filepath]; ok {
// nestedFs, err := fs.resolver.NestedFs(ctx, filepath, file)
// if err != nil {
// return nil, err
// }
// if nestedFs == nil {
// goto DEFAULT_DIR // FIXME
// }
// fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/")
// if err != nil {
// return nil, err
// }
return fs.filesCache, nil
}
}
// return fs.filesCache, nil
// }
// }
// DEFAULT_DIR:
DEFAULT_DIR:
rootDir := "/" + fs.Torrent.Name() + "/"
singleDir := true
for k, _ := range fs.filesCache {
@ -315,7 +315,6 @@ func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file vfs.File,
return nil, err
}
if nestedFs != nil {
return nestedFs.Open(ctx, nestedFsPath)
}

View file

@ -6,7 +6,7 @@ import (
"fmt"
"path/filepath"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
@ -21,7 +21,7 @@ type infoBytesStore struct {
func newInfoBytesStore(metaDir string) (*infoBytesStore, error) {
opts := badger.
DefaultOptions(filepath.Join(metaDir, "infobytes")).
WithLogger(dlog.BadgerLogger("torrent-client", "infobytes"))
WithLogger(logwrap.BadgerLogger("torrent-client", "infobytes"))
db, err := badger.Open(opts)
if err != nil {
return nil, err

View file

@ -5,7 +5,7 @@ import (
"encoding/binary"
"fmt"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/royalcat/kv"
@ -86,7 +86,7 @@ var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil)
func newPieceCompletion(dir string) (storage.PieceCompletion, error) {
opts := kvbadger.DefaultOptions[PieceCompletionState](dir)
opts.Codec = kv.CodecBinary[PieceCompletionState, *PieceCompletionState]{}
opts.BadgerOptions = opts.BadgerOptions.WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion"))
opts.BadgerOptions = opts.BadgerOptions.WithLogger(logwrap.BadgerLogger("torrent-client", "piece-completion"))
db, err := kvbadger.NewBagerKVBinaryKey[pieceKey, PieceCompletionState](opts)
if err != nil {

View file

@ -7,7 +7,7 @@ import (
"slices"
"time"
"git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
)
@ -17,7 +17,7 @@ func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error)
badger.
DefaultOptions(path.Join(metaDir, "stats")).
WithNumVersionsToKeep(int(^uint(0) >> 1)).
WithLogger(log.BadgerLogger("stats")), // Infinity
WithLogger(logwrap.BadgerLogger("stats")), // Infinity
)
if err != nil {
return nil, err

View file

@ -3,14 +3,14 @@ package tkv
import (
"path"
tlog "git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/logwrap"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
)
func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
opts := kvbadger.DefaultOptions[V](path.Join(dbdir, name))
opts.BadgerOptions.Logger = tlog.BadgerLogger(name, "badger")
opts.BadgerOptions.Logger = logwrap.BadgerLogger(name, "badger")
store, err = kvbadger.NewBadgerKVBytesKey[K, V](opts)
if err != nil {
return nil, err

View file

@ -109,3 +109,25 @@ func (fi *fileInfo) IsDir() bool {
func (fi *fileInfo) Sys() interface{} {
return nil
}
type FilesystemPrototype string
// Info implements Filesystem.
func (p FilesystemPrototype) Info() (fs.FileInfo, error) {
return NewDirInfo(string(p)), nil
}
// IsDir implements Filesystem.
func (p FilesystemPrototype) IsDir() bool {
return true
}
// Name implements Filesystem.
func (p FilesystemPrototype) Name() string {
return string(p)
}
// Type implements Filesystem.
func (p FilesystemPrototype) Type() fs.FileMode {
return fs.ModeDir
}

86
src/vfs/hash.go Normal file
View file

@ -0,0 +1,86 @@
package vfs
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
)
const chunkSize int64 = 64 * 1024
var ErrOsHashLen = errors.New("oshash: buffer length must be a multiple of 8")
type Hash string
func FileHash(ctx context.Context, f File) (Hash, error) {
_, err := f.Seek(0, io.SeekStart)
if err != nil {
return "", fmt.Errorf("error seeking file: %w", err)
}
defer f.Seek(0, io.SeekStart)
fileSize := f.Size()
if fileSize <= 8 {
return "", fmt.Errorf("cannot calculate oshash where size < 8 (%d)", fileSize)
}
fileChunkSize := chunkSize
if fileSize < fileChunkSize {
// Must be a multiple of 8.
fileChunkSize = (fileSize / 8) * 8
}
head := make([]byte, fileChunkSize)
tail := make([]byte, fileChunkSize)
// read the head of the file into the start of the buffer
_, err = f.Read(ctx, head)
if err != nil {
return "", err
}
// seek to the end of the file - the chunk size
_, err = f.Seek(-fileChunkSize, io.SeekEnd)
if err != nil {
return "", err
}
// read the tail of the file
_, err = f.Read(ctx, tail)
if err != nil {
return "", err
}
return oshash(fileSize, head, tail)
}
func sumBytes(buf []byte) (uint64, error) {
if len(buf)%8 != 0 {
return 0, ErrOsHashLen
}
sz := len(buf) / 8
var sum uint64
for j := 0; j < sz; j++ {
sum += binary.LittleEndian.Uint64(buf[8*j : 8*(j+1)])
}
return sum, nil
}
func oshash(size int64, head []byte, tail []byte) (Hash, error) {
headSum, err := sumBytes(head)
if err != nil {
return "", fmt.Errorf("oshash head: %w", err)
}
tailSum, err := sumBytes(tail)
if err != nil {
return "", fmt.Errorf("oshash tail: %w", err)
}
// Compute the sum of the head, tail and file size
result := headSum + tailSum + uint64(size)
// output as hex
return Hash(fmt.Sprintf("%016x", result)), nil
}

View file

@ -111,6 +111,7 @@ func (fs *LogFS) Open(ctx context.Context, filename string) (file File, err erro
ctx, span := tracer.Start(ctx, "Open",
fs.traceAttrs(attribute.String("filename", filename)),
)
log := fs.log.With(slog.String("filename", filename))
defer func() {
if err != nil {
span.RecordError(err)
@ -120,7 +121,7 @@ func (fs *LogFS) Open(ctx context.Context, filename string) (file File, err erro
file, err = fs.fs.Open(ctx, filename)
if isLoggableError(err) {
fs.log.Error(ctx, "Failed to open file")
log.Error(ctx, "Failed to open file", rlog.Error(err))
}
file = WrapLogFile(file, filename, fs.log, fs.readTimeout, fs.tel)

View file

@ -92,13 +92,14 @@ func (r *ResolverFS) Open(ctx context.Context, filename string) (File, error) {
}
// ReadDir implements Filesystem.
func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) {
func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
log := r.log.With(slog.String("name", name))
ctx, span := tracer.Start(ctx, "ReadDir",
r.traceAttrs(attribute.String("name", dir)),
r.traceAttrs(attribute.String("name", name)),
)
defer span.End()
fsPath, nestedFs, nestedFsPath, err := r.resolver.ResolvePath(ctx, dir, r.rootFS.Open)
fsPath, nestedFs, nestedFsPath, err := r.resolver.ResolvePath(ctx, name, r.rootFS.Open)
if err != nil {
return nil, err
}
@ -113,34 +114,22 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
out := make([]fs.DirEntry, 0, len(entries))
for _, e := range entries {
if r.resolver.IsNestedFs(e.Name()) {
filepath := path.Join("/", dir, e.Name())
file, err := r.Open(ctx, filepath)
filepath := path.Join("/", name, e.Name())
file, err := r.rootFS.Open(ctx, filepath)
if err != nil {
return nil, err
}
// it is factory responsibility to close file then needed
err = func() error {
factoryCtx, cancel := subTimeout(ctx)
defer cancel()
nestedfs, err := r.resolver.NestedFs(factoryCtx, filepath, file)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.log.Error(ctx, "creating fs timed out",
slog.String("filename", e.Name()),
)
return nil
}
return err
}
out = append(out, nestedfs)
return nil
}()
if err != nil {
nestedfs, err := r.resolver.nestedFs(ctx, filepath, file)
if errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
if err != nil {
log.Error(ctx, "error creating nested fs", rlog.Error(err))
out = append(out, e)
continue
}
out = append(out, nestedfs)
} else {
out = append(out, e)
}
@ -214,14 +203,14 @@ type FsFactory func(ctx context.Context, f File) (Filesystem, error)
func NewResolver(factories map[string]FsFactory) *Resolver {
return &Resolver{
factories: factories,
fsmap: map[string]Filesystem{},
fsmap: map[Hash]Filesystem{},
}
}
type Resolver struct {
m sync.Mutex
factories map[string]FsFactory
fsmap map[string]Filesystem // filesystem cache
fsmap map[Hash]Filesystem // filesystem cache
// TODO: add fsmap clean
}
@ -236,26 +225,35 @@ func (r *Resolver) IsNestedFs(f string) bool {
return false
}
func (r *Resolver) NestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) {
func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) {
if file.IsDir() {
return nil, file.Close(ctx)
}
fileHash, err := FileHash(ctx, file)
if err != nil {
return nil, fmt.Errorf("error calculating file hash: %w", err)
}
if nestedFs, ok := r.fsmap[fileHash]; ok {
return nestedFs, file.Close(ctx)
}
for ext, nestFactory := range r.factories {
if !strings.HasSuffix(fsPath, ext) {
continue
}
if nestedFs, ok := r.fsmap[fsPath]; ok {
return nestedFs, nil
}
nestedFs, err := nestFactory(ctx, file)
if err != nil {
return nil, fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err)
}
r.fsmap[fsPath] = nestedFs
r.fsmap[fileHash] = nestedFs
return nestedFs, nil
}
return nil, nil
return nil, file.Close(ctx)
}
// open requeue raw open, without resolver call
@ -289,6 +287,19 @@ PARTS_LOOP:
nestedFsPath = AbsPath(path.Join(parts[nestOn:]...))
file, err := rawOpen(ctx, fsPath)
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
fileHash, err := FileHash(ctx, file)
if err != nil {
return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
}
err = file.Close(ctx)
if err != nil {
return "", nil, "", fmt.Errorf("error closing file: %w", err)
}
// we dont need lock until now
// it must be before fsmap read to exclude race condition:
// read -> write
@ -296,7 +307,7 @@ PARTS_LOOP:
r.m.Lock()
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fsPath]; ok {
if nestedFs, ok := r.fsmap[fileHash]; ok {
span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
return fsPath, nestedFs, nestedFsPath, nil
} else {
@ -307,13 +318,13 @@ PARTS_LOOP:
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
// it is factory responsibility to close file then needed
// it is factory responsibility to close file handler then needed
nestedFs, err := nestFactory(ctx, fsFile)
if err != nil {
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}
r.fsmap[fsPath] = nestedFs
r.fsmap[fileHash] = nestedFs
span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
@ -366,3 +377,27 @@ func ListDirFromFiles[F File](m map[string]F, name string) ([]fs.DirEntry, error
return out, nil
}
func ListDirFromInfo(m map[string]fs.FileInfo, name string) ([]fs.DirEntry, error) {
out := make([]fs.DirEntry, 0, len(m))
name = AddTrailSlash(path.Clean(name))
for p, f := range m {
if strings.HasPrefix(p, name) {
parts := strings.Split(trimRelPath(p, name), Separator)
if len(parts) == 1 {
out = append(out, NewFileInfo(parts[0], f.Size()))
} else {
out = append(out, NewDirInfo(parts[0]))
}
}
}
slices.SortStableFunc(out, func(de1, de2 fs.DirEntry) int {
return strings.Compare(de1.Name(), de2.Name())
})
out = slices.CompactFunc(out, func(de1, de2 fs.DirEntry) bool {
return de1.Name() == de2.Name()
})
return out, nil
}