storage rework

This commit is contained in:
royalcat 2024-06-15 01:14:44 +03:00
parent 06153d61c9
commit e9df8925d1
49 changed files with 1825 additions and 1303 deletions

View file

@ -30,7 +30,7 @@ func (r *mutationResolver) ValidateTorrents(ctx context.Context, filter model.To
return false, nil
}
t.ValidateTorrent()
t.ValidateTorrent(ctx)
return true, nil
}
@ -40,7 +40,7 @@ func (r *mutationResolver) ValidateTorrents(ctx context.Context, filter model.To
return false, err
}
for _, v := range torrents {
if err := v.ValidateTorrent(); err != nil {
if err := v.ValidateTorrent(ctx); err != nil {
return false, err
}
}

View file

@ -11,7 +11,7 @@ import (
// It serves as dependency injection for your app, add any dependencies you require here.
type Resolver struct {
Service *torrent.Service
Service *torrent.Daemon
VFS vfs.Filesystem
SourceFS billy.Filesystem
}

View file

@ -15,7 +15,7 @@ import (
"github.com/labstack/echo/v4/middleware"
)
func New(fc *filecache.Cache, s *torrent.Service, vfs vfs.Filesystem, logPath string, cfg *config.Settings) error {
func New(fc *filecache.Cache, s *torrent.Daemon, vfs vfs.Filesystem, logPath string, cfg *config.Settings) error {
log := slog.With()
r := echo.New()

View file

@ -18,7 +18,7 @@ import (
"github.com/ravilushqa/otelgqlgen"
)
func GraphQLHandler(service *torrent.Service, vfs vfs.Filesystem) http.Handler {
func GraphQLHandler(service *torrent.Daemon, vfs vfs.Filesystem) http.Handler {
graphqlHandler := handler.NewDefaultServer(
graph.NewExecutableSchema(
graph.Config{

View file

@ -6,11 +6,12 @@ import (
nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
nfshelper "git.kmsign.ru/royalcat/tstor/pkg/go-nfs/helpers"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/vfs"
)
func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
func NewNFSv3Handler(fs vfs.Filesystem, config config.NFS) (nfs.Handler, error) {
nfslog := slog.With("component", "nfs")
nfs.SetLogger(log.NewNFSLog(nfslog))
nfs.Log.SetLevel(nfs.InfoLevel)
@ -18,7 +19,7 @@ func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
bfs := &fsWrapper{fs: fs, log: nfslog, timeout: time.Minute}
handler := nfshelper.NewNullAuthHandler(bfs)
cacheHelper, err := NewKvHandler(handler, bfs)
cacheHelper, err := NewKvHandler(handler, bfs, config)
if err != nil {
return nil, err
}

View file

@ -6,6 +6,7 @@ import (
"path"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/config"
@ -14,6 +15,7 @@ import (
"github.com/google/uuid"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
)
type handle []string
@ -45,17 +47,20 @@ func bytesToPath(path []string) string {
var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvhandler")
// NewKvHandler provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem) (nfs.Handler, error) {
activeHandles, err := kv.NewBadgerKVMarhsler[uuid.UUID, handle](path.Join(config.Config.Mounts.NFS.CachePath, "handlers"))
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) {
opts := kvbadger.DefaultOptions(path.Join(config.CachePath, "handlers"))
opts.DefaultTTL = time.Hour
activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts)
if err != nil {
return nil, err
}
reverseCache := map[string]uuid.UUID{}
activeHandles.Range(context.Background(), func(k uuid.UUID, v handle) bool {
activeHandles.Range(context.Background(), func(k uuid.UUID, v handle) error {
reverseCache[v.String()] = k
return true
return nil
})
c := &CachingHandler{

View file

@ -6,7 +6,7 @@ import (
"git.kmsign.ru/royalcat/tstor/src/vfs"
)
func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Service, ytdlpsrv *ytdlp.Service) vfs.Filesystem {
func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Daemon, ytdlpsrv *ytdlp.Daemon) vfs.Filesystem {
factories := map[string]vfs.FsFactory{
".torrent": tsrv.NewTorrentFs,
".ts-ytdlp": ytdlpsrv.BuildFS,

View file

@ -14,7 +14,6 @@ import (
dlog "git.kmsign.ru/royalcat/tstor/src/log"
)
// MOVE
func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) {
l := slog.With("component", "torrent-client")

View file

@ -2,9 +2,11 @@ package torrent
import (
"context"
"log/slog"
"slices"
"strings"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent"
)
@ -12,10 +14,15 @@ type Controller struct {
torrentFilePath string
t *torrent.Torrent
rep *filesMappingsStore
log *rlog.Logger
}
func newController(t *torrent.Torrent, rep *filesMappingsStore) *Controller {
return &Controller{t: t, rep: rep}
return &Controller{
t: t,
rep: rep,
log: rlog.Component("torrent/controller").With(slog.String("infohash", t.InfoHash().HexString())),
}
}
func (s *Controller) TorrentFilePath() string {
@ -107,8 +114,41 @@ func (s *Controller) isFileComplete(startIndex int, endIndex int) bool {
return true
}
func (s *Controller) ValidateTorrent() error {
<-s.t.GotInfo()
s.t.VerifyData()
func (s *Controller) ValidateTorrent(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.t.GotInfo():
}
for i := 0; i < s.t.NumPieces(); i++ {
if ctx.Err() != nil {
return ctx.Err()
}
s.t.Piece(i).VerifyData()
}
return nil
}
func (c *Controller) initializeTorrentPriories(ctx context.Context) error {
log := c.log.WithComponent("initializeTorrentPriories")
// files, err := c.Files(ctx)
// if err != nil {
// return err
// }
// for _, file := range files {
// if file == nil {
// continue
// }
// file.SetPriority(torrent.PiecePriorityNormal)
// }
log.Info(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name()))
return nil
}

View file

@ -8,11 +8,11 @@ import (
"log/slog"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/vfs"
@ -38,7 +38,7 @@ type DirAquire struct {
Hashes []infohash.T
}
type Service struct {
type Daemon struct {
client *torrent.Client
excludedFiles *filesMappingsStore
infoBytes *infoBytesStore
@ -54,8 +54,8 @@ type Service struct {
log *rlog.Logger
}
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, error) {
s := &Service{
func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) {
s := &Daemon{
log: rlog.Component("torrent-service"),
sourceFs: sourceFs,
torrentLoaded: make(chan struct{}),
@ -115,9 +115,9 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service,
return s, nil
}
var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs
var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs
func (s *Service) Close(ctx context.Context) error {
func (s *Daemon) Close(ctx context.Context) error {
return errors.Join(append(
s.client.Close(),
s.Storage.Close(),
@ -128,7 +128,7 @@ func (s *Service) Close(ctx context.Context) error {
)...)
}
func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
ctx, span := tracer.Start(ctx, "LoadTorrent")
defer span.End()
log := s.log
@ -197,138 +197,31 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, err
}
span.AddEvent("got info")
info := t.Info()
if info == nil {
return nil, fmt.Errorf("info is nil")
}
// info := t.Info()
// if info == nil {
// return nil, fmt.Errorf("info is nil")
// }
compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, *info)
if err != nil {
return nil, err
}
if !compatable {
return nil, fmt.Errorf(
"torrent with name '%s' not compatable existing infohash: %s, new: %s",
t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
)
}
// compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, *info)
// if err != nil {
// return nil, err
// }
// if !compatable {
// return nil, fmt.Errorf(
// "torrent with name '%s' not compatable existing infohash: %s, new: %s",
// t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
// )
// }
}
return newController(t, s.excludedFiles), nil
}
ctl := newController(t, s.excludedFiles)
func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, info metainfo.Info) (compatable bool, tryLater bool, err error) {
log := s.log.With(
slog.String("new-name", info.BestName()),
slog.String("new-infohash", ih.String()),
)
name := info.BestName()
aq, found, err := s.dirsAquire.Get(ctx, info.BestName())
err = ctl.initializeTorrentPriories(ctx)
if err != nil {
return false, false, err
}
if !found {
err = s.dirsAquire.Set(ctx, name, DirAquire{
Name: name,
Hashes: slices.Compact([]infohash.T{ih}),
})
if err != nil {
return false, false, err
}
log.Debug(ctx, "acquiring was not found, so created")
return true, false, nil
log.Error(ctx, "error initializing torrent priorities", rlog.Error(err))
}
if slices.Contains(aq.Hashes, ih) {
log.Debug(ctx, "hash already know to be compatable")
return true, false, nil
}
for _, existingTorrent := range s.client.Torrents() {
if existingTorrent.Name() != name || existingTorrent.InfoHash() == ih {
continue
}
existingInfo := existingTorrent.Info()
existingFiles := slices.Clone(existingInfo.Files)
newFiles := slices.Clone(info.Files)
if !s.checkTorrentFilesCompatable(ctx, aq, existingFiles, newFiles) {
return false, false, nil
}
aq.Hashes = slicesUnique(append(aq.Hashes, ih))
err = s.dirsAquire.Set(ctx, aq.Name, aq)
if err != nil {
log.Warn(ctx, "torrent not compatible")
return false, false, err
}
}
if slices.Contains(aq.Hashes, ih) {
log.Debug(ctx, "hash is compatable")
return true, false, nil
}
log.Debug(ctx, "torrent with same name not found, try later")
return false, true, nil
}
func (s *Service) checkTorrentFilesCompatable(ctx context.Context, aq DirAquire, existingFiles, newFiles []metainfo.FileInfo) bool {
log := s.log.With(slog.String("name", aq.Name))
pathCmp := func(a, b metainfo.FileInfo) int {
return slices.Compare(a.BestPath(), b.BestPath())
}
slices.SortStableFunc(existingFiles, pathCmp)
slices.SortStableFunc(newFiles, pathCmp)
// torrents basically equals
if slices.EqualFunc(existingFiles, newFiles, func(fi1, fi2 metainfo.FileInfo) bool {
return fi1.Length == fi2.Length && slices.Equal(fi1.BestPath(), fi1.BestPath())
}) {
return true
}
if len(newFiles) > len(existingFiles) {
type fileInfo struct {
Path string
Length int64
}
mapInfo := func(fi metainfo.FileInfo) fileInfo {
return fileInfo{
Path: strings.Join(fi.BestPath(), "/"),
Length: fi.Length,
}
}
existingFiles := apply(existingFiles, mapInfo)
newFiles := apply(newFiles, mapInfo)
for _, n := range newFiles {
if slices.Contains(existingFiles, n) {
continue
}
for _, e := range existingFiles {
if e.Path == n.Path && e.Length != n.Length {
log.Warn(ctx, "torrents not compatible, has files with different length",
slog.String("path", n.Path),
slog.Int64("existing-length", e.Length),
slog.Int64("new-length", e.Length),
)
return false
}
}
}
}
return true
return ctl, nil
}
func isValidInfoHashBytes(d []byte) bool {
@ -337,17 +230,17 @@ func isValidInfoHashBytes(d []byte) bool {
return err == nil
}
func (s *Service) Stats() (*Stats, error) {
func (s *Daemon) Stats() (*Stats, error) {
return &Stats{}, nil
}
func (s *Service) GetStats() torrent.ConnStats {
func (s *Daemon) GetStats() torrent.ConnStats {
return s.client.ConnStats()
}
const loadWorkers = 5
func (s *Service) loadTorrentFiles(ctx context.Context) error {
func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes(
attribute.Int("workers", loadWorkers),
))
@ -363,16 +256,23 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
}()
loaderWorker := func() {
wg.Add(1)
for path := range loaderPaths {
file, err := vfs.NewLazyOsFile(path)
info, err := s.sourceFs.Stat(path)
if err != nil {
log.Error(ctx, "error stat torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
file, err := s.sourceFs.Open(path)
if err != nil {
log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
defer file.Close(ctx)
defer file.Close()
_, err = s.LoadTorrent(ctx, file)
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
_, err = s.LoadTorrent(ctx, vfile)
if err != nil {
log.Error(ctx, "failed adding torrent", rlog.Error(err))
}
@ -380,11 +280,12 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
wg.Done()
}
wg.Add(loadWorkers)
for range loadWorkers {
go loaderWorker()
}
return util.Walk(s.sourceFs, "/", func(path string, info os.FileInfo, err error) error {
return util.Walk(s.sourceFs, "", func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("fs walk error: %w", err)
}
@ -405,7 +306,7 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
})
}
func (s *Service) ListTorrents(ctx context.Context) ([]*Controller, error) {
func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) {
<-s.torrentLoaded
out := []*Controller{}
@ -415,7 +316,7 @@ func (s *Service) ListTorrents(ctx context.Context) ([]*Controller, error) {
return out, nil
}
func (s *Service) GetTorrent(infohashHex string) (*Controller, error) {
func (s *Daemon) GetTorrent(infohashHex string) (*Controller, error) {
<-s.torrentLoaded
t, ok := s.client.Torrent(infohash.FromHexString(infohashHex))

View file

@ -0,0 +1,92 @@
package torrent
import (
"path"
"slices"
"sync"
"git.kmsign.ru/royalcat/tstor/pkg/slicesutils"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
)
type dupInfo struct {
infohash infohash.T
fileinfo metainfo.FileInfo
}
type dupIndex struct {
mu sync.RWMutex
torrents map[infohash.T][]metainfo.FileInfo
sha1 map[string][]dupInfo // bittorrent v1
piecesRoot map[[32]byte][]dupInfo // bittorrent v2
}
func newDupIndex() *dupIndex {
return &dupIndex{
torrents: map[infohash.T][]metainfo.FileInfo{},
sha1: map[string][]dupInfo{},
piecesRoot: map[[32]byte][]dupInfo{},
}
}
func (c *dupIndex) AddFile(fileinfo metainfo.FileInfo, ih infohash.T) {
c.mu.Lock()
defer c.mu.Unlock()
c.torrents[ih] = append(c.torrents[ih], fileinfo)
if fileinfo.Sha1 != "" {
c.sha1[fileinfo.Sha1] = append(c.sha1[fileinfo.Sha1], dupInfo{fileinfo: fileinfo, infohash: ih})
}
if fileinfo.PiecesRoot.Ok {
c.piecesRoot[fileinfo.PiecesRoot.Value] = append(c.piecesRoot[fileinfo.PiecesRoot.Value], dupInfo{fileinfo: fileinfo, infohash: ih})
}
}
func (c *dupIndex) DuplicateFiles(fileinfo metainfo.FileInfo, ih infohash.T) []dupInfo {
c.mu.RLock()
defer c.mu.RUnlock()
if fileinfo.Sha1 != "" {
if dups, ok := c.sha1[fileinfo.Sha1]; ok {
return slices.Clone(dups)
}
}
if fileinfo.PiecesRoot.Ok {
if dups, ok := c.piecesRoot[fileinfo.PiecesRoot.Value]; ok {
return slices.Clone(dups)
}
}
return []dupInfo{}
}
func (c *dupIndex) Includes(ih infohash.T, files []metainfo.FileInfo) []dupInfo {
c.mu.RLock()
defer c.mu.RUnlock()
out := []dupInfo{}
for ih, v := range c.torrents {
intersection := slicesutils.IntersectionFunc(files, v, func(a, b metainfo.FileInfo) bool {
mostly := path.Join(a.BestPath()...) == path.Join(b.BestPath()...) && a.Length == b.Length
if a.Sha1 != "" && b.Sha1 != "" {
return mostly && a.Sha1 == b.Sha1
}
if a.PiecesRoot.Ok && b.PiecesRoot.Ok {
return mostly && a.PiecesRoot.Value == b.PiecesRoot.Value
}
return mostly
})
for _, v := range intersection {
out = append(out, dupInfo{infohash: ih, fileinfo: v})
}
}
return []dupInfo{}
}

View file

@ -7,10 +7,12 @@ import (
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
)
func newFileMappingsStore(metaDir string, storage TorrentFileDeleter) (*filesMappingsStore, error) {
str, err := kv.NewBadgerKVBytes[string, string](filepath.Join(metaDir, "file-mappings"))
opts := kvbadger.DefaultOptions(filepath.Join(metaDir, "file-mappings"))
str, err := kvbadger.NewBadgerKVBytes[string, string](opts)
if err != nil {
return nil, err
}
@ -46,9 +48,9 @@ func (r *filesMappingsStore) ExcludeFile(ctx context.Context, file *torrent.File
func (r *filesMappingsStore) FileMappings(ctx context.Context, ih infohash.T) (map[string]string, error) {
out := map[string]string{}
err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) bool {
err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) error {
out[k] = v
return true
return nil
})
return out, err
}

View file

@ -34,7 +34,7 @@ type TorrentFS struct {
var _ vfs.Filesystem = (*TorrentFS)(nil)
func (s *Service) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
defer f.Close(ctx)
info, err := f.Info()
@ -240,6 +240,17 @@ func (fs *TorrentFS) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption
}, add...)...)
}
func (tfs *TorrentFS) readContext(ctx context.Context) (context.Context, context.CancelFunc) {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true))
return context.WithTimeout(ctx, time.Millisecond)
}
return ctx, func() {}
}
// Stat implements Filesystem.
func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
ctx, span := tracer.Start(ctx, "Stat",
@ -251,25 +262,21 @@ func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e
return tfs, nil
}
var err error
ctx, cancel := tfs.readContext(ctx)
defer func() {
cancel()
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
return nestedFs.Stat(ctx, nestedFsPath)
}
@ -286,24 +293,20 @@ func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file vfs.File,
return vfs.NewDirFile(tfs.name), nil
}
ctx, cancel := tfs.readContext(ctx)
defer func() {
cancel()
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
return nestedFs.Open(ctx, nestedFsPath)
}
@ -317,25 +320,21 @@ func (tfs *TorrentFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry,
)
defer span.End()
var err error
ctx, cancel := tfs.readContext(ctx)
defer func() {
cancel()
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, name, tfs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
return nestedFs.ReadDir(ctx, nestedFsPath)
}
files, err := tfs.files(ctx)
@ -394,12 +393,12 @@ type torrentFile struct {
const secondaryTimeout = time.Hour * 24
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
// select {
// case <-file.Torrent().GotInfo():
// break
// case <-ctx.Done():
// return nil, ctx.Err()
// }
select {
case <-file.Torrent().GotInfo():
break
case <-ctx.Done():
return nil, ctx.Err()
}
r := file.NewReader()
r.SetReadahead(1024 * 1024 * 16) // TODO configurable
@ -448,6 +447,16 @@ func (rw *torrentFile) Close(ctx context.Context) error {
return rw.tr.Close()
}
func (tf *torrentFile) readTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
lastReadTimeout := tf.lastReadTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true))
return context.WithTimeout(ctx, time.Millisecond)
}
return ctx, func() {}
}
// Read implements ctxio.Reader.
func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
ctx, span := tracer.Start(ctx, "Read",
@ -461,13 +470,8 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
tf.mu.RLock()
defer tf.mu.RUnlock()
lastReadTimeout := tf.lastReadTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
ctx, cancel := tf.readTimeout(ctx)
defer cancel()
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()
@ -490,13 +494,8 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int,
tf.mu.RLock()
defer tf.mu.RUnlock()
lastReadTimeout := tf.lastReadTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { /// make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
ctx, cancel := tf.readTimeout(ctx)
defer cancel()
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()

View file

@ -5,12 +5,13 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/kvtrace"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
"go.opentelemetry.io/otel/attribute"
)
func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
dir := path.Join(dbdir, name)
store, err = kv.NewBadgerKV[K, V](dir)
opts := kvbadger.DefaultOptions(path.Join(dbdir, name))
store, err = kvbadger.NewBadgerKVBytesKey[K, V](opts)
if err != nil {
return nil, err
}

View file

@ -1,168 +0,0 @@
package torrent
import (
"context"
"fmt"
"io"
"os"
"path"
atorrent "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/hashicorp/go-multierror"
)
// NOT USED
type PieceStorage struct {
basePath string
completion storage.PieceCompletion
}
func NewPieceStorage(path string, completion storage.PieceCompletion) *PieceStorage {
return &PieceStorage{
basePath: path,
completion: completion,
}
}
// OpenTorrent implements FileStorageDeleter.
func (p *PieceStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
torrentPath := path.Join(p.basePath, infoHash.HexString())
descriptors := []*os.File{}
return storage.TorrentImpl{
Piece: func(piece metainfo.Piece) storage.PieceImpl {
hash := piece.Hash().HexString()
piecePrefixDir := path.Join(torrentPath, hash[:2])
err := os.MkdirAll(piecePrefixDir, os.ModePerm|os.ModeDir)
if err != nil {
return &errPiece{err: err}
}
piecePath := path.Join(torrentPath, hash[:2], hash)
file, err := os.OpenFile(piecePath, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return &errPiece{err: err}
}
pk := metainfo.PieceKey{
InfoHash: infoHash,
Index: piece.Index(),
}
return newPieceFile(pk, file, p.completion)
// file, err os.OpenFile(piecePath)
},
Flush: func() error {
var res error
for _, f := range descriptors {
if err := f.Sync(); err != nil {
res = multierror.Append(res, err)
}
}
return res
},
Close: func() error {
var res error
for _, f := range descriptors {
if err := f.Close(); err != nil {
res = multierror.Append(res, err)
}
}
return res
},
}, nil
}
// Close implements FileStorageDeleter.
func (p *PieceStorage) Close() error {
return nil
}
// DeleteFile implements FileStorageDeleter.
func (p *PieceStorage) DeleteFile(file *atorrent.File) error {
return fmt.Errorf("not implemented")
}
// CleanupDirs implements DataStorage.
func (p *PieceStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) (int, error) {
return 0, nil // TODO
}
// CleanupFiles implements DataStorage.
func (p *PieceStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) (int, error) {
return 0, nil // TODO
}
func newPieceFile(pk metainfo.PieceKey, file *os.File, completion storage.PieceCompletionGetSetter) *piece {
return &piece{
pk: pk,
File: file,
completion: completion,
}
}
type piece struct {
*os.File
pk metainfo.PieceKey
completion storage.PieceCompletionGetSetter
}
// Completion implements storage.PieceImpl.
func (p *piece) Completion() storage.Completion {
compl, err := p.completion.Get(p.pk)
if err != nil {
return storage.Completion{Complete: false, Ok: false, Err: err}
}
return compl
}
// MarkComplete implements storage.PieceImpl.
func (p *piece) MarkComplete() error {
return p.completion.Set(p.pk, true)
}
// MarkNotComplete implements storage.PieceImpl.
func (p *piece) MarkNotComplete() error {
return p.completion.Set(p.pk, false)
}
var _ storage.PieceImpl = (*piece)(nil)
var _ io.WriterTo = (*piece)(nil)
type errPiece struct {
err error
}
// WriteTo implements io.WriterTo.
func (p *errPiece) WriteTo(io.Writer) (int64, error) {
return 0, p.err
}
// ReadAt implements storage.PieceImpl.
func (p *errPiece) ReadAt([]byte, int64) (int, error) {
return 0, p.err
}
// WriteAt implements storage.PieceImpl.
func (p *errPiece) WriteAt([]byte, int64) (int, error) {
return 0, p.err
}
// Completion implements storage.PieceImpl.
func (p *errPiece) Completion() storage.Completion {
return storage.Completion{Complete: false, Ok: false, Err: p.err}
}
// MarkComplete implements storage.PieceImpl.
func (p *errPiece) MarkComplete() error {
return p.err
}
// MarkNotComplete implements storage.PieceImpl.
func (p *errPiece) MarkNotComplete() error {
return p.err
}
var _ storage.PieceImpl = (*errPiece)(nil)
var _ io.WriterTo = (*errPiece)(nil)

View file

@ -15,7 +15,7 @@ type DownloadTask struct {
File string
}
func (s *Service) Download(ctx context.Context, task *DownloadTask) error {
func (s *Daemon) Download(ctx context.Context, task *DownloadTask) error {
t, ok := s.client.Torrent(task.InfoHash)
if !ok {
return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString())
@ -101,7 +101,7 @@ type TorrentProgress struct {
Total int64
}
func (s *Service) DownloadProgress(ctx context.Context) (<-chan TorrentProgress, error) {
func (s *Daemon) DownloadProgress(ctx context.Context) (<-chan TorrentProgress, error) {
torrents, err := s.ListTorrents(ctx)
if err != nil {
return nil, err

View file

@ -2,9 +2,7 @@ package torrent
import (
"context"
"crypto/sha1"
"fmt"
"io"
"errors"
"io/fs"
"log/slog"
"os"
@ -12,72 +10,62 @@ import (
"path/filepath"
"slices"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/dustin/go-humanize"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/sys/unix"
)
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
func NewFileStorage(baseDir string, pc storage.PieceCompletion) *fileStorage {
return &fileStorage{
ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{
client: storage.NewFileOpts(storage.NewFileClientOpts{
ClientBaseDir: baseDir,
PieceCompletion: pc,
TorrentDirMaker: torrentDir,
FilePathMaker: filePath,
TorrentDirMaker: func(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string {
return torrentDir(baseDir, infoHash)
},
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
return filePath(*opts.File)
},
}),
baseDir: baseDir,
pieceCompletion: pc,
log: slog.With("component", "torrent-client"),
dupIndex: newDupIndex(),
log: rlog.Component("daemon", "torrent"),
}
}
// File-based storage for torrents, that isn't yet bound to a particular torrent.
type fileStorage struct {
baseDir string
storage.ClientImplCloser
baseDir string
client storage.ClientImplCloser
pieceCompletion storage.PieceCompletion
log *slog.Logger
dupIndex *dupIndex
log *rlog.Logger
}
var _ storage.ClientImplCloser = (*fileStorage)(nil)
func (me *fileStorage) Close() error {
return me.pieceCompletion.Close()
return errors.Join(
me.client.Close(),
me.pieceCompletion.Close(),
)
}
func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string {
dirName := info.Name
if dirName == "" {
dirName = infoHash.HexString()
}
return filepath.Join(baseDir, dirName)
}
func filePath(opts storage.FilePathMakerOpts) string {
return filepath.Join(opts.File.Path...)
}
func (fs *fileStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string {
return filepath.Join(torrentDir(fs.baseDir, info, infoHash), filePath(storage.FilePathMakerOpts{
Info: info,
File: fileInfo,
}))
func (fs *fileStorage) fullFilePath(infoHash metainfo.Hash, fileInfo metainfo.FileInfo) string {
return filepath.Join(
torrentDir(fs.baseDir, infoHash),
filePath(fileInfo),
)
}
func (fs *fileStorage) DeleteFile(file *torrent.File) error {
info := file.Torrent().Info()
infoHash := file.Torrent().InfoHash()
torrentDir := torrentDir(fs.baseDir, info, infoHash)
torrentDir := torrentDir(fs.baseDir, infoHash)
fileInfo := file.FileInfo()
relFilePath := filePath(storage.FilePathMakerOpts{
Info: info,
File: &fileInfo,
})
relFilePath := filePath(fileInfo)
filePath := path.Join(torrentDir, relFilePath)
for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
pk := metainfo.PieceKey{InfoHash: infoHash, Index: i}
@ -90,11 +78,11 @@ func (fs *fileStorage) DeleteFile(file *torrent.File) error {
}
func (fs *fileStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) {
log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun)
log := fs.log.With(slog.Int("expectedTorrents", len(expected)), slog.Bool("dryRun", dryRun))
expectedEntries := []string{}
for _, e := range expected {
expectedEntries = append(expectedEntries, e.Torrent().Name())
expectedEntries = append(expectedEntries, e.Torrent().InfoHash().HexString())
}
entries, err := os.ReadDir(fs.baseDir)
@ -113,11 +101,11 @@ func (fs *fileStorage) CleanupDirs(ctx context.Context, expected []*Controller,
return nil, ctx.Err()
}
log.Info("deleting trash data", "dirsCount", len(toDelete))
log.Info(ctx, "deleting trash data", slog.Int("dirsCount", len(toDelete)))
if !dryRun {
for i, name := range toDelete {
p := path.Join(fs.baseDir, name)
log.Warn("deleting trash data", "path", p)
log.Warn(ctx, "deleting trash data", slog.String("path", p))
err := os.RemoveAll(p)
if err != nil {
return toDelete[:i], err
@ -129,7 +117,7 @@ func (fs *fileStorage) CleanupDirs(ctx context.Context, expected []*Controller,
}
func (s *fileStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) {
log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun)
log := s.log.With(slog.Int("expectedTorrents", len(expected)), slog.Bool("dryRun", dryRun))
expectedEntries := []string{}
{
@ -140,7 +128,7 @@ func (s *fileStorage) CleanupFiles(ctx context.Context, expected []*Controller,
}
for _, f := range files {
expectedEntries = append(expectedEntries, s.filePath(e.Torrent().Info(), e.Torrent().InfoHash(), ptr(f.FileInfo())))
expectedEntries = append(expectedEntries, s.fullFilePath(e.Torrent().InfoHash(), f.FileInfo()))
}
}
}
@ -176,10 +164,10 @@ func (s *fileStorage) CleanupFiles(ctx context.Context, expected []*Controller,
return toDelete, ctx.Err()
}
log.Info("deleting trash data", "filesCount", len(toDelete))
log.Info(ctx, "deleting trash data", slog.Int("filesCount", len(toDelete)))
if !dryRun {
for i, p := range toDelete {
s.log.Warn("deleting trash data", "path", p)
s.log.Warn(ctx, "deleting trash data", slog.String("path", p))
err := os.Remove(p)
if err != nil {
return toDelete[i:], err
@ -206,212 +194,3 @@ func (s *fileStorage) iterFiles(ctx context.Context, iter func(ctx context.Conte
return iter(ctx, path, info)
})
}
func (s *fileStorage) Dedupe(ctx context.Context) (uint64, error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe"))
defer span.End()
log := s.log
sizeMap := map[int64][]string{}
err := s.iterFiles(ctx, func(ctx context.Context, path string, info fs.FileInfo) error {
size := info.Size()
sizeMap[size] = append(sizeMap[size], path)
return nil
})
if err != nil {
return 0, err
}
maps.DeleteFunc(sizeMap, func(k int64, v []string) bool {
return len(v) <= 1
})
span.AddEvent("collected files with same size", trace.WithAttributes(
attribute.Int("count", len(sizeMap)),
))
var deduped uint64 = 0
i := 0
for _, paths := range sizeMap {
if i%100 == 0 {
log.Info("deduping in progress", "current", i, "total", len(sizeMap))
}
i++
if ctx.Err() != nil {
return deduped, ctx.Err()
}
slices.Sort(paths)
paths = slices.Compact(paths)
if len(paths) <= 1 {
continue
}
paths, err = applyErr(paths, filepath.Abs)
if err != nil {
return deduped, err
}
dedupedGroup, err := s.dedupeFiles(ctx, paths)
if err != nil {
log.Error("Error applying dedupe", "files", paths, "error", err.Error())
continue
}
if dedupedGroup > 0 {
deduped += dedupedGroup
log.Info("deduped file group",
slog.String("files", fmt.Sprint(paths)),
slog.String("deduped", humanize.Bytes(dedupedGroup)),
slog.String("deduped_total", humanize.Bytes(deduped)),
)
}
}
return deduped, nil
}
func applyErr[E, O any](in []E, apply func(E) (O, error)) ([]O, error) {
out := make([]O, 0, len(in))
for _, p := range in {
o, err := apply(p)
if err != nil {
return out, err
}
out = append(out, o)
}
return out, nil
}
// const blockSize uint64 = 4096
func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("dedupeFiles"), trace.WithAttributes(
attribute.StringSlice("files", paths),
))
defer func() {
span.SetAttributes(attribute.Int64("deduped", int64(deduped)))
if err != nil {
span.RecordError(err)
}
span.End()
}()
log := s.log
srcF, err := os.Open(paths[0])
if err != nil {
return deduped, err
}
defer srcF.Close()
srcStat, err := srcF.Stat()
if err != nil {
return deduped, err
}
srcFd := int(srcF.Fd())
srcSize := srcStat.Size()
fsStat := unix.Statfs_t{}
err = unix.Fstatfs(srcFd, &fsStat)
if err != nil {
span.RecordError(err)
return deduped, err
}
srcHash, err := filehash(srcF)
if err != nil {
return deduped, err
}
if int64(fsStat.Bsize) > srcSize { // for btrfs it means file in residing in not deduplicatable metadata
return deduped, nil
}
blockSize := uint64((srcSize % int64(fsStat.Bsize)) * int64(fsStat.Bsize))
span.SetAttributes(attribute.Int64("blocksize", int64(blockSize)))
rng := unix.FileDedupeRange{
Src_offset: 0,
Src_length: blockSize,
Info: []unix.FileDedupeRangeInfo{},
}
for _, dst := range paths[1:] {
if ctx.Err() != nil {
return deduped, ctx.Err()
}
destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm)
if err != nil {
return deduped, err
}
defer destF.Close()
dstHash, err := filehash(destF)
if err != nil {
return deduped, err
}
if srcHash != dstHash {
destF.Close()
continue
}
rng.Info = append(rng.Info, unix.FileDedupeRangeInfo{
Dest_fd: int64(destF.Fd()),
Dest_offset: 0,
})
}
if len(rng.Info) == 0 {
return deduped, nil
}
log.Info("found same files, deduping", "files", paths, "size", humanize.Bytes(uint64(srcStat.Size())))
if ctx.Err() != nil {
return deduped, ctx.Err()
}
rng.Src_offset = 0
for i := range rng.Info {
rng.Info[i].Dest_offset = 0
}
err = unix.IoctlFileDedupeRange(srcFd, &rng)
if err != nil {
return deduped, err
}
for i := range rng.Info {
deduped += rng.Info[i].Bytes_deduped
rng.Info[i].Status = 0
rng.Info[i].Bytes_deduped = 0
}
return deduped, nil
}
const compareBlockSize = 1024 * 128
func filehash(r io.Reader) ([20]byte, error) {
buf := make([]byte, compareBlockSize)
_, err := r.Read(buf)
if err != nil && err != io.EOF {
return [20]byte{}, err
}
return sha1.Sum(buf), nil
}
func ptr[D any](v D) *D {
return &v
}

View file

@ -0,0 +1,229 @@
package torrent
import (
"context"
"crypto/sha1"
"fmt"
"io"
"io/fs"
"log/slog"
"os"
"path/filepath"
"slices"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/dustin/go-humanize"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/sys/unix"
)
func (s *fileStorage) Dedupe(ctx context.Context) (uint64, error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe"))
defer span.End()
log := s.log
sizeMap := map[int64][]string{}
err := s.iterFiles(ctx, func(ctx context.Context, path string, info fs.FileInfo) error {
size := info.Size()
sizeMap[size] = append(sizeMap[size], path)
return nil
})
if err != nil {
return 0, err
}
maps.DeleteFunc(sizeMap, func(k int64, v []string) bool {
return len(v) <= 1
})
span.AddEvent("collected files with same size", trace.WithAttributes(
attribute.Int("count", len(sizeMap)),
))
var deduped uint64 = 0
i := 0
for _, paths := range sizeMap {
if i%100 == 0 {
log.Info(ctx, "deduping in progress", slog.Int("current", i), slog.Int("total", len(sizeMap)))
}
i++
if ctx.Err() != nil {
return deduped, ctx.Err()
}
slices.Sort(paths)
paths = slices.Compact(paths)
if len(paths) <= 1 {
continue
}
paths, err = applyErr(paths, filepath.Abs)
if err != nil {
return deduped, err
}
dedupedGroup, err := s.dedupeFiles(ctx, paths)
if err != nil {
log.Error(ctx, "Error applying dedupe", slog.Any("files", paths), rlog.Error(err))
continue
}
if dedupedGroup > 0 {
deduped += dedupedGroup
log.Info(ctx, "deduped file group",
slog.String("files", fmt.Sprint(paths)),
slog.String("deduped", humanize.Bytes(dedupedGroup)),
slog.String("deduped_total", humanize.Bytes(deduped)),
)
}
}
return deduped, nil
}
func applyErr[E, O any](in []E, apply func(E) (O, error)) ([]O, error) {
out := make([]O, 0, len(in))
for _, p := range in {
o, err := apply(p)
if err != nil {
return out, err
}
out = append(out, o)
}
return out, nil
}
// const blockSize uint64 = 4096
func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("dedupeFiles"), trace.WithAttributes(
attribute.StringSlice("files", paths),
))
defer func() {
span.SetAttributes(attribute.Int64("deduped", int64(deduped)))
if err != nil {
span.RecordError(err)
}
span.End()
}()
log := s.log
srcF, err := os.Open(paths[0])
if err != nil {
return deduped, err
}
defer srcF.Close()
srcStat, err := srcF.Stat()
if err != nil {
return deduped, err
}
srcFd := int(srcF.Fd())
srcSize := srcStat.Size()
fsStat := unix.Statfs_t{}
err = unix.Fstatfs(srcFd, &fsStat)
if err != nil {
span.RecordError(err)
return deduped, err
}
srcHash, err := filehash(srcF)
if err != nil {
return deduped, err
}
if int64(fsStat.Bsize) > srcSize { // for btrfs it means file in residing in not deduplicatable metadata
return deduped, nil
}
blockSize := uint64((srcSize % int64(fsStat.Bsize)) * int64(fsStat.Bsize))
span.SetAttributes(attribute.Int64("blocksize", int64(blockSize)))
rng := unix.FileDedupeRange{
Src_offset: 0,
Src_length: blockSize,
Info: []unix.FileDedupeRangeInfo{},
}
for _, dst := range paths[1:] {
if ctx.Err() != nil {
return deduped, ctx.Err()
}
destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm)
if err != nil {
return deduped, err
}
defer destF.Close()
dstHash, err := filehash(destF)
if err != nil {
return deduped, err
}
if srcHash != dstHash {
destF.Close()
continue
}
rng.Info = append(rng.Info, unix.FileDedupeRangeInfo{
Dest_fd: int64(destF.Fd()),
Dest_offset: 0,
})
}
if len(rng.Info) == 0 {
return deduped, nil
}
log.Info(ctx, "found same files, deduping", slog.Any("files", paths), slog.String("size", humanize.Bytes(uint64(srcStat.Size()))))
if ctx.Err() != nil {
return deduped, ctx.Err()
}
rng.Src_offset = 0
for i := range rng.Info {
rng.Info[i].Dest_offset = 0
}
err = unix.IoctlFileDedupeRange(srcFd, &rng)
if err != nil {
return deduped, err
}
for i := range rng.Info {
deduped += rng.Info[i].Bytes_deduped
rng.Info[i].Status = 0
rng.Info[i].Bytes_deduped = 0
}
return deduped, nil
}
const compareBlockSize = 1024 * 128
func filehash(r io.Reader) ([20]byte, error) {
buf := make([]byte, compareBlockSize)
_, err := r.Read(buf)
if err != nil && err != io.EOF {
return [20]byte{}, err
}
return sha1.Sum(buf), nil
}
func ptr[D any](v D) *D {
return &v
}

View file

@ -0,0 +1,199 @@
package torrent
import (
"context"
"errors"
"fmt"
"io/fs"
"log/slog"
"os"
"path/filepath"
"slices"
"strings"
"git.kmsign.ru/royalcat/tstor/pkg/cowutils"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
)
// OpenTorrent implements storage.ClientImplCloser.
func (me *fileStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
ctx := context.Background()
log := me.log
dir := torrentDir(me.baseDir, infoHash)
legacyDir := filepath.Join(me.baseDir, info.Name)
log = log.With(slog.String("legacy_dir", legacyDir), slog.String("dir", dir))
if _, err := os.Stat(legacyDir); err == nil {
log.Warn(ctx, "legacy torrent dir found, renaming", slog.String("dir", dir))
err = os.Rename(legacyDir, dir)
if err != nil {
return storage.TorrentImpl{}, fmt.Errorf("error renaming legacy torrent dir: %w", err)
}
}
if _, err := os.Stat(dir); errors.Is(err, fs.ErrNotExist) {
log.Info(ctx, "new torrent, trying copy files from existing")
dups := me.dupIndex.Includes(infoHash, info.Files)
for _, dup := range dups {
err := me.copyDup(ctx, infoHash, dup)
if err != nil {
log.Error(ctx, "error copying file", slog.String("file", dup.fileinfo.DisplayPath(info)), rlog.Error(err))
}
}
}
return me.client.OpenTorrent(info, infoHash)
}
func (me *fileStorage) copyDup(ctx context.Context, infoHash infohash.T, dup dupInfo) error {
log := me.log.With(slog.String("infohash", infoHash.HexString()), slog.String("dup_infohash", dup.infohash.HexString()))
srcPath := me.fullFilePath(dup.infohash, dup.fileinfo)
src, err := os.Open(me.fullFilePath(dup.infohash, dup.fileinfo))
if err != nil {
return err
}
dstPath := me.fullFilePath(infoHash, dup.fileinfo)
dst, err := os.OpenFile(dstPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil {
return err
}
log.Info(ctx, "copying duplicate file", slog.String("src", srcPath), slog.String("dst", dstPath))
err = cowutils.Reflink(ctx, dst, src, true)
if err != nil {
return fmt.Errorf("error copying file: %w", err)
}
return nil
}
func torrentDir(baseDir string, infoHash metainfo.Hash) string {
return filepath.Join(baseDir, infoHash.HexString())
}
func filePath(file metainfo.FileInfo) string {
return filepath.Join(file.BestPath()...)
}
func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info metainfo.Info) (compatable bool, tryLater bool, err error) {
log := s.log.With(
slog.String("new-name", info.BestName()),
slog.String("new-infohash", ih.String()),
)
name := info.BestName()
aq, found, err := s.dirsAquire.Get(ctx, info.BestName())
if err != nil {
return false, false, err
}
if !found {
err = s.dirsAquire.Set(ctx, name, DirAquire{
Name: name,
Hashes: slices.Compact([]infohash.T{ih}),
})
if err != nil {
return false, false, err
}
log.Debug(ctx, "acquiring was not found, so created")
return true, false, nil
}
if slices.Contains(aq.Hashes, ih) {
log.Debug(ctx, "hash already know to be compatable")
return true, false, nil
}
for _, existingTorrent := range s.client.Torrents() {
if existingTorrent.Name() != name || existingTorrent.InfoHash() == ih {
continue
}
existingInfo := existingTorrent.Info()
existingFiles := slices.Clone(existingInfo.Files)
newFiles := slices.Clone(info.Files)
if !s.checkTorrentFilesCompatable(ctx, aq, existingFiles, newFiles) {
return false, false, nil
}
aq.Hashes = slicesUnique(append(aq.Hashes, ih))
err = s.dirsAquire.Set(ctx, aq.Name, aq)
if err != nil {
log.Warn(ctx, "torrent not compatible")
return false, false, err
}
}
if slices.Contains(aq.Hashes, ih) {
log.Debug(ctx, "hash is compatable")
return true, false, nil
}
log.Debug(ctx, "torrent with same name not found, try later")
return false, true, nil
}
func (s *Daemon) checkTorrentFilesCompatable(ctx context.Context, aq DirAquire, existingFiles, newFiles []metainfo.FileInfo) bool {
log := s.log.With(slog.String("name", aq.Name))
pathCmp := func(a, b metainfo.FileInfo) int {
return slices.Compare(a.BestPath(), b.BestPath())
}
slices.SortStableFunc(existingFiles, pathCmp)
slices.SortStableFunc(newFiles, pathCmp)
// torrents basically equals
if slices.EqualFunc(existingFiles, newFiles, func(fi1, fi2 metainfo.FileInfo) bool {
return fi1.Length == fi2.Length && slices.Equal(fi1.BestPath(), fi1.BestPath())
}) {
return true
}
if len(newFiles) > len(existingFiles) {
type fileInfo struct {
Path string
Length int64
}
mapInfo := func(fi metainfo.FileInfo) fileInfo {
return fileInfo{
Path: strings.Join(fi.BestPath(), "/"),
Length: fi.Length,
}
}
existingFiles := apply(existingFiles, mapInfo)
newFiles := apply(newFiles, mapInfo)
for _, n := range newFiles {
if slices.Contains(existingFiles, n) {
continue
}
for _, e := range existingFiles {
if e.Path == n.Path && e.Length != n.Length {
log.Warn(ctx, "torrents not compatible, has files with different length",
slog.String("path", n.Path),
slog.Int64("existing-length", e.Length),
slog.Int64("new-length", e.Length),
)
return false
}
}
}
}
return true
}

View file

@ -2,93 +2,109 @@ package ytdlp
import (
"context"
"encoding/json"
"fmt"
"path"
"sync"
"time"
"os"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/go-git/go-billy/v5/osfs"
"git.kmsign.ru/royalcat/tstor/pkg/kvsingle"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/pkg/ytdlp"
"git.kmsign.ru/royalcat/tstor/src/tasks"
"github.com/royalcat/ctxio"
"github.com/royalcat/ctxprogress"
)
func NewService(dataDir string) *Service {
s := &Service{
dataDir: dataDir,
sources: make(map[string]ytdlpSource, 0),
type Controller struct {
datafs ctxbilly.Filesystem
source Source
client *ytdlp.Client
cachedinfo *kvsingle.Value[string, ytdlp.Info]
}
func newYtdlpController(datafs ctxbilly.Filesystem, source Source, client *ytdlp.Client) *Controller {
return &Controller{
datafs: datafs,
source: source,
client: client,
}
go func() {
for {
ctx := context.Background()
ctx = ctxprogress.New(ctx)
ctxprogress.AddCallback(ctx, func(p ctxprogress.Progress) {
cur, total := p.Progress()
fmt.Printf("updating sources: %d/%d\n", cur, total)
})
err := s.Update(ctx)
if err != nil {
fmt.Println("failed to update sources:", err)
}
time.Sleep(time.Minute)
}
}()
return s
}
type Service struct {
mu sync.Mutex
dataDir string
sources map[string]ytdlpSource
func (c *Controller) Source() Source {
return c.source
}
func (c *Service) addSource(s ytdlpSource) {
c.mu.Lock()
defer c.mu.Unlock()
const sizeApprox = 1024 * 1024 * 1024
c.sources[s.Name()] = s
}
func (c *Service) sourceDir(s ytdlpSource) string {
return path.Join(c.dataDir, s.Name())
}
func (c *Service) Update(ctx context.Context) error {
for name, s := range c.sources {
if ctx.Err() != nil {
return ctx.Err()
func (c *Controller) Update(ctx context.Context, updater tasks.Updater) error {
log := updater.Logger()
ctxprogress.New(ctx)
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 0, Total: 10})
plst, err := c.client.Playlist(ctx, c.source.Url)
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 1, Total: 10})
ctxprogress.Range(ctx, plst, func(ctx context.Context, _ int, e ytdlp.Entry) bool {
if e.OriginalURL == "" {
log.Error("no URL in entry", rlog.Error(err))
return true
}
dir := c.sourceDir(s)
err := s.Download(ctx, nil, dir)
info, err := c.Info(ctx)
if err != nil {
return fmt.Errorf("failed to fetch source %s: %w", name, err)
log.Error("error getting info", rlog.Error(err))
return true
}
dwl := info.RequestedDownloads[0]
fileinfo, err := c.datafs.Stat(ctx, dwl.Filename)
if err != nil {
log.Error("error getting file info", rlog.Error(err))
return true
}
if fileinfo.Size()+sizeApprox > dwl.FilesizeApprox && fileinfo.Size()-sizeApprox < dwl.FilesizeApprox {
log.Debug("file already downloaded", "filename", dwl.Filename)
return true
}
file, err := c.datafs.OpenFile(ctx, dwl.Filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil {
log.Error("error opening destination file", rlog.Error(err))
return true
}
err = c.client.Download(ctx, info.OriginalURL, ctxio.IoWriter(ctx, file))
if err != nil {
return false
}
return true
})
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 2, Total: 2})
if err != nil {
return err
}
return nil
}
func (c *Service) BuildFS(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
data, err := ctxio.ReadAll(ctx, f)
func (c *Controller) Info(ctx context.Context) (ytdlp.Info, error) {
info, found, err := c.cachedinfo.Get(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read source file: %w", err)
return info, err
}
if found {
return info, nil
}
var s ytdlpSource
err = json.Unmarshal(data, &s)
info, err = c.Info(ctx)
if err != nil {
return nil, err
return info, err
}
c.addSource(s)
downloadFS := ctxbilly.WrapFileSystem(osfs.New(c.sourceDir(s)))
return newSourceFS(path.Base(f.Name()), downloadFS, c, s), nil
err = c.cachedinfo.Set(ctx, info)
if err != nil {
return info, err
}
return info, nil
}
func (c *Controller) Downloaded() error {
return nil
}

View file

@ -0,0 +1,71 @@
package ytdlp
import (
"context"
"encoding/json"
"fmt"
"path"
"sync"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/pkg/ytdlp"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/go-git/go-billy/v5/osfs"
"github.com/royalcat/ctxio"
)
func NewService(dataDir string) (*Daemon, error) {
client, err := ytdlp.New()
if err != nil {
return nil, err
}
s := &Daemon{
mu: sync.Mutex{},
client: client,
dataDir: dataDir,
controllers: make(map[string]*Controller, 0),
}
return s, nil
}
type Daemon struct {
mu sync.Mutex
dataDir string
client *ytdlp.Client
controllers map[string]*Controller
}
func (c *Daemon) addSource(s Source) {
c.mu.Lock()
defer c.mu.Unlock()
ctl := newYtdlpController(ctxbilly.WrapFileSystem(osfs.New(c.sourceDir(s))), s, c.client)
c.controllers[s.Name()] = ctl
}
func (c *Daemon) sourceDir(s Source) string {
return path.Join(c.dataDir, s.Name())
}
func (c *Daemon) BuildFS(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
data, err := ctxio.ReadAll(ctx, f)
if err != nil {
return nil, fmt.Errorf("failed to read source file: %w", err)
}
var s Source
err = json.Unmarshal(data, &s)
if err != nil {
return nil, err
}
c.addSource(s)
downloadFS := ctxbilly.WrapFileSystem(osfs.New(c.sourceDir(s)))
return newSourceFS(path.Base(f.Name()), downloadFS, c, s), nil
}

View file

@ -9,8 +9,8 @@ import (
)
type SourceFS struct {
service *Service
source ytdlpSource
service *Daemon
source Source
fs ctxbilly.Filesystem
@ -19,7 +19,7 @@ type SourceFS struct {
var _ vfs.Filesystem = (*SourceFS)(nil)
func newSourceFS(name string, fs ctxbilly.Filesystem, service *Service, source ytdlpSource) *SourceFS {
func newSourceFS(name string, fs ctxbilly.Filesystem, service *Daemon, source Source) *SourceFS {
return &SourceFS{
fs: fs,
service: service,

View file

@ -1,7 +0,0 @@
package ytdlp
import "io"
type TaskUpdater interface {
Output() io.Writer
}

View file

@ -0,0 +1,37 @@
package ytdlp
import (
"context"
"fmt"
"git.kmsign.ru/royalcat/tstor/src/tasks"
)
const executorName = "ytdlp"
type DownloadTask struct {
Name string
}
var _ tasks.Task = (*DownloadTask)(nil)
// Executor implements tasks.Task.
func (d *DownloadTask) Executor() string {
return executorName
}
var _ tasks.TaskExecutor = (*Daemon)(nil)
// ExecutorName implements tasks.TaskExecutor.
func (c *Daemon) ExecutorName() string {
return executorName
}
func (c *Daemon) RunTask(ctx context.Context, upd tasks.Updater, task tasks.Task) error {
switch t := task.(type) {
case *DownloadTask:
return c.controllers[t.Name].Update(ctx, upd)
default:
return fmt.Errorf("unknown task type: %T", task)
}
}

View file

@ -1,44 +1,29 @@
package ytdlp
import (
"context"
"crypto/sha1"
"encoding/base64"
"git.kmsign.ru/royalcat/tstor/pkg/ytdlp"
"github.com/royalcat/ctxprogress"
"strings"
)
type ytdlpSource struct {
type Source struct {
Url string `json:"url"`
}
var hasher = sha1.New()
func (s *ytdlpSource) Name() string {
return base64.URLEncoding.EncodeToString(hasher.Sum([]byte(s.Url)))
var prefixCutset = [...]string{
"https://", "http://", "www.",
}
func (s *ytdlpSource) Download(ctx context.Context, task TaskUpdater, dir string) error {
client, err := ytdlp.New()
if err != nil {
return err
}
ctxprogress.New(ctx)
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 0, Total: 2})
plst, err := client.Playlist(ctx, s.Url)
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 1, Total: 2})
ctxprogress.Range(ctx, plst, func(ctx context.Context, _ int, e ytdlp.PlaylistEntry) bool {
err = client.Download(ctx, e.Url(), dir)
if err != nil {
return false
}
return true
})
ctxprogress.Set(ctx, ctxprogress.RangeProgress{Current: 2, Total: 2})
if err != nil {
return err
func urlHash(url string) string {
for _, v := range prefixCutset {
url = strings.TrimPrefix(url, v)
}
return nil
return base64.URLEncoding.EncodeToString(hasher.Sum([]byte(url)))
}
func (s *Source) Name() string {
return urlHash(s.Url)
}

8
src/tasks/executor.go Normal file
View file

@ -0,0 +1,8 @@
package tasks
import "context"
type TaskExecutor interface {
ExecutorName() string
RunTask(ctx context.Context, upd Updater, task Task) error
}

5
src/tasks/task.go Normal file
View file

@ -0,0 +1,5 @@
package tasks
type Task interface {
Executor() string
}

8
src/tasks/updater.go Normal file
View file

@ -0,0 +1,8 @@
package tasks
import "log/slog"
type Updater interface {
Logger() *slog.Logger
SetProgress(current, total int64)
}

View file

@ -140,34 +140,34 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
// recreate telemetry logger
client.log = rlog.Component("telemetry")
// runtime.SetMutexProfileFraction(5)
// runtime.SetBlockProfileRate(5)
// _, err = pyroscope.Start(pyroscope.Config{
// ApplicationName: appName,
// // replace this with the address of pyroscope server
// ServerAddress: "https://pyroscope.kmsign.ru",
// // you can disable logging by setting this to nil
// Logger: &pyroscopeLogger{
// log: client.log.WithComponent("pyroscope"),
// },
// ProfileTypes: []pyroscope.ProfileType{
// // these profile types are enabled by default:
// pyroscope.ProfileCPU,
// pyroscope.ProfileAllocObjects,
// pyroscope.ProfileAllocSpace,
// pyroscope.ProfileInuseObjects,
// pyroscope.ProfileInuseSpace,
// // these profile types are optional:
// // pyroscope.ProfileGoroutines,
// // pyroscope.ProfileMutexCount,
// // pyroscope.ProfileMutexDuration,
// // pyroscope.ProfileBlockCount,
// // pyroscope.ProfileBlockDuration,
// },
// })
// if err != nil {
// return client, nil
// }
runtime.SetMutexProfileFraction(5)
runtime.SetBlockProfileRate(5)
_, err = pyroscope.Start(pyroscope.Config{
ApplicationName: appName,
// replace this with the address of pyroscope server
ServerAddress: "https://pyroscope.kmsign.ru",
// you can disable logging by setting this to nil
Logger: &pyroscopeLogger{
log: client.log.WithComponent("pyroscope"),
},
ProfileTypes: []pyroscope.ProfileType{
// these profile types are enabled by default:
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileInuseSpace,
// these profile types are optional:
// pyroscope.ProfileGoroutines,
// pyroscope.ProfileMutexCount,
// pyroscope.ProfileMutexDuration,
// pyroscope.ProfileBlockCount,
// pyroscope.ProfileBlockDuration,
},
})
if err != nil {
return client, nil
}
return client, nil
}

View file

@ -259,7 +259,7 @@ func (r *Resolver) NestedFs(ctx context.Context, fsPath string, file File) (File
// open requeue raw open, without resolver call
func (r *Resolver) ResolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) {
ctx, span := tracer.Start(ctx, "resolvePath")
ctx, span := tracer.Start(ctx, "ResolvePath")
defer span.End()
name = path.Clean(name)

View file

@ -2,7 +2,9 @@ package vfs
import (
"context"
"io/fs"
"path"
"path/filepath"
"strings"
"sync"
"time"
@ -69,3 +71,71 @@ func subTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
return ctx, func() {}
}
func Walk(ctx context.Context, vfs Filesystem, root string, walkFn filepath.WalkFunc) error {
info, err := vfs.Stat(ctx, root)
if err != nil {
err = walkFn(root, nil, err)
} else {
err = walk(ctx, vfs, root, info, walkFn)
}
if err == filepath.SkipDir {
return nil
}
return err
}
// walk recursively descends path, calling walkFn
// adapted from https://golang.org/src/path/filepath/path.go
func walk(ctx context.Context, vfs Filesystem, path string, info fs.FileInfo, walkFn filepath.WalkFunc) error {
if !info.IsDir() {
return walkFn(path, info, nil)
}
names, err := readdirnames(ctx, vfs, path)
err1 := walkFn(path, info, err)
// If err != nil, walk can't walk into this directory.
// err1 != nil means walkFn want walk to skip this directory or stop walking.
// Therefore, if one of err and err1 isn't nil, walk will return.
if err != nil || err1 != nil {
// The caller's behavior is controlled by the return value, which is decided
// by walkFn. walkFn may ignore err and return nil.
// If walkFn returns SkipDir, it will be handled by the caller.
// So walk should return whatever walkFn returns.
return err1
}
for _, name := range names {
filename := filepath.Join(path, name)
fileInfo, err := vfs.Stat(ctx, filename)
if err != nil {
if err := walkFn(filename, fileInfo, err); err != nil && err != filepath.SkipDir {
return err
}
} else {
err = walk(ctx, vfs, filename, fileInfo, walkFn)
if err != nil {
if !fileInfo.IsDir() || err != filepath.SkipDir {
return err
}
}
}
}
return nil
}
func readdirnames(ctx context.Context, vfs Filesystem, dir string) ([]string, error) {
files, err := vfs.ReadDir(ctx, dir)
if err != nil {
return nil, err
}
var names []string
for _, file := range files {
names = append(names, file.Name())
}
return names, nil
}