package datastorage import ( "context" "crypto/sha1" "fmt" "io" "io/fs" "log/slog" "os" "path" "path/filepath" "slices" "git.kmsign.ru/royalcat/tstor/src/host/controller" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" "github.com/dustin/go-humanize" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" "golang.org/x/sys/unix" ) // type DataStorage interface { // storage.ClientImplCloser // DeleteFile(file *torrent.File) error // CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) // CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) // } var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/host/datastorage") // NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem. func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage { return &DataStorage{ ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{ ClientBaseDir: baseDir, PieceCompletion: pc, TorrentDirMaker: torrentDir, FilePathMaker: filePath, }), baseDir: baseDir, pieceCompletion: pc, log: slog.With("component", "torrent-client"), } } // File-based storage for torrents, that isn't yet bound to a particular torrent. type DataStorage struct { baseDir string storage.ClientImplCloser pieceCompletion storage.PieceCompletion log *slog.Logger } func (me *DataStorage) Close() error { return me.pieceCompletion.Close() } func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string { dirName := info.Name if dirName == "" { dirName = infoHash.HexString() } return filepath.Join(baseDir, dirName) } func filePath(opts storage.FilePathMakerOpts) string { return filepath.Join(opts.File.Path...) } func (fs *DataStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string { return filepath.Join(torrentDir(fs.baseDir, info, infoHash), filePath(storage.FilePathMakerOpts{ Info: info, File: fileInfo, })) } func (fs *DataStorage) DeleteFile(file *torrent.File) error { info := file.Torrent().Info() infoHash := file.Torrent().InfoHash() torrentDir := torrentDir(fs.baseDir, info, infoHash) fileInfo := file.FileInfo() relFilePath := filePath(storage.FilePathMakerOpts{ Info: info, File: &fileInfo, }) filePath := path.Join(torrentDir, relFilePath) for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ { pk := metainfo.PieceKey{InfoHash: infoHash, Index: i} err := fs.pieceCompletion.Set(pk, false) if err != nil { return err } } return os.Remove(filePath) } func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) ([]string, error) { log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun) expectedEntries := []string{} for _, e := range expected { expectedEntries = append(expectedEntries, e.Torrent().Name()) } entries, err := os.ReadDir(fs.baseDir) if err != nil { return nil, err } toDelete := []string{} for _, v := range entries { if !slices.Contains(expectedEntries, v.Name()) { toDelete = append(toDelete, v.Name()) } } if ctx.Err() != nil { return nil, ctx.Err() } log.Info("deleting trash data", "dirsCount", len(toDelete)) if !dryRun { for i, name := range toDelete { p := path.Join(fs.baseDir, name) log.Warn("deleting trash data", "path", p) err := os.RemoveAll(p) if err != nil { return toDelete[:i], err } } } return toDelete, nil } func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) ([]string, error) { log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun) expectedEntries := []string{} { for _, e := range expected { files, err := e.Files(ctx) if err != nil { return nil, err } for _, f := range files { expectedEntries = append(expectedEntries, s.filePath(e.Torrent().Info(), e.Torrent().InfoHash(), ptr(f.FileInfo()))) } } } entries := []string{} err := filepath.WalkDir(s.baseDir, func(path string, info fs.DirEntry, err error) error { if err != nil { return err } if ctx.Err() != nil { return ctx.Err() } if info.IsDir() { return nil } entries = append(entries, path) return nil }) if err != nil { return nil, err } toDelete := []string{} for _, v := range entries { if !slices.Contains(expectedEntries, v) { toDelete = append(toDelete, v) } } if ctx.Err() != nil { return toDelete, ctx.Err() } log.Info("deleting trash data", "filesCount", len(toDelete)) if !dryRun { for i, p := range toDelete { s.log.Warn("deleting trash data", "path", p) err := os.Remove(p) if err != nil { return toDelete[i:], err } } } return toDelete, nil } func (s *DataStorage) iterFiles(ctx context.Context, iter func(ctx context.Context, path string, entry fs.FileInfo) error) error { return filepath.Walk(s.baseDir, func(path string, info fs.FileInfo, err error) error { if err != nil { return err } if ctx.Err() != nil { return ctx.Err() } if info.IsDir() { return nil } return iter(ctx, path, info) }) } func (s *DataStorage) Dedupe(ctx context.Context) (uint64, error) { ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe")) defer span.End() log := s.log sizeMap := map[int64][]string{} err := s.iterFiles(ctx, func(ctx context.Context, path string, info fs.FileInfo) error { size := info.Size() sizeMap[size] = append(sizeMap[size], path) return nil }) if err != nil { return 0, err } maps.DeleteFunc(sizeMap, func(k int64, v []string) bool { return len(v) <= 1 }) span.AddEvent("collected files with same size", trace.WithAttributes( attribute.Int("count", len(sizeMap)), )) var deduped uint64 = 0 i := 0 for _, paths := range sizeMap { if i%100 == 0 { log.Info("deduping in progress", "current", i, "total", len(sizeMap)) } i++ if ctx.Err() != nil { return deduped, ctx.Err() } slices.Sort(paths) paths = slices.Compact(paths) if len(paths) <= 1 { continue } paths, err = applyErr(paths, filepath.Abs) if err != nil { return deduped, err } dedupedGroup, err := s.dedupeFiles(ctx, paths) if err != nil { log.Error("Error applying dedupe", "files", paths, "error", err.Error()) continue } if dedupedGroup > 0 { deduped += dedupedGroup log.Info("deduped file group", slog.String("files", fmt.Sprint(paths)), slog.String("deduped", humanize.Bytes(dedupedGroup)), slog.String("deduped_total", humanize.Bytes(deduped)), ) } } return deduped, nil } func applyErr[E, O any](in []E, apply func(E) (O, error)) ([]O, error) { out := make([]O, 0, len(in)) for _, p := range in { o, err := apply(p) if err != nil { return out, err } out = append(out, o) } return out, nil } // const blockSize uint64 = 4096 func (s *DataStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) { ctx, span := tracer.Start(ctx, fmt.Sprintf("dedupeFiles"), trace.WithAttributes( attribute.StringSlice("files", paths), )) defer func() { span.SetAttributes(attribute.Int64("deduped", int64(deduped))) if err != nil { span.RecordError(err) } span.End() }() log := s.log srcF, err := os.Open(paths[0]) if err != nil { return deduped, err } defer srcF.Close() srcStat, err := srcF.Stat() if err != nil { return deduped, err } srcFd := int(srcF.Fd()) srcSize := srcStat.Size() fsStat := unix.Statfs_t{} err = unix.Fstatfs(srcFd, &fsStat) if err != nil { span.RecordError(err) return deduped, err } srcHash, err := filehash(srcF) if err != nil { return deduped, err } if int64(fsStat.Bsize) > srcSize { // for btrfs it means file in residing in not deduplicatable metadata return deduped, nil } blockSize := uint64((srcSize % int64(fsStat.Bsize)) * int64(fsStat.Bsize)) span.SetAttributes(attribute.Int64("blocksize", int64(blockSize))) rng := unix.FileDedupeRange{ Src_offset: 0, Src_length: blockSize, Info: []unix.FileDedupeRangeInfo{}, } for _, dst := range paths[1:] { if ctx.Err() != nil { return deduped, ctx.Err() } destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm) if err != nil { return deduped, err } defer destF.Close() dstHash, err := filehash(destF) if err != nil { return deduped, err } if srcHash != dstHash { destF.Close() continue } rng.Info = append(rng.Info, unix.FileDedupeRangeInfo{ Dest_fd: int64(destF.Fd()), Dest_offset: 0, }) } if len(rng.Info) == 0 { return deduped, nil } log.Info("found same files, deduping", "files", paths, "size", humanize.Bytes(uint64(srcStat.Size()))) if ctx.Err() != nil { return deduped, ctx.Err() } rng.Src_offset = 0 for i := range rng.Info { rng.Info[i].Dest_offset = 0 } err = unix.IoctlFileDedupeRange(srcFd, &rng) if err != nil { return deduped, err } for i := range rng.Info { deduped += rng.Info[i].Bytes_deduped rng.Info[i].Status = 0 rng.Info[i].Bytes_deduped = 0 } return deduped, nil } const compareBlockSize = 1024 * 128 func filehash(r io.Reader) ([20]byte, error) { buf := make([]byte, compareBlockSize) _, err := r.Read(buf) if err != nil && err != io.EOF { return [20]byte{}, err } return sha1.Sum(buf), nil } func ptr[D any](v D) *D { return &v }