package service import ( "bufio" "context" "errors" "fmt" "log/slog" "os" "path/filepath" "slices" "strings" "sync" "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/host/datastorage" "git.kmsign.ru/royalcat/tstor/src/host/store" "git.kmsign.ru/royalcat/tstor/src/host/tkv" "git.kmsign.ru/royalcat/tstor/src/host/vfs" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/types" "github.com/anacrolix/torrent/types/infohash" "github.com/royalcat/kv" ) var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/service") type DirAquire struct { Name string Hashes []infohash.T } type Service struct { c *torrent.Client excludedFiles *store.FilesMappings infoBytes *store.InfoBytes torrentLoaded chan struct{} loadMutex sync.Mutex // stats *Stats DefaultPriority types.PiecePriority Storage *datastorage.DataStorage SourceDir string dirsAquire kv.Store[string, DirAquire] log *rlog.Logger } func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client, storage *datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes, ) (*Service, error) { dirsAcquire, err := tkv.New[string, DirAquire](cfg.MetadataFolder, "dir-acquire") if err != nil { return nil, err } s := &Service{ log: rlog.Component("torrent-service"), c: c, DefaultPriority: types.PiecePriorityNone, excludedFiles: excludedFiles, infoBytes: infoBytes, Storage: storage, SourceDir: sourceDir, torrentLoaded: make(chan struct{}), loadMutex: sync.Mutex{}, dirsAquire: dirsAcquire, // stats: newStats(), // TODO persistent } go func() { ctx := context.Background() err := s.loadTorrentFiles(ctx) if err != nil { s.log.Error(ctx, "initial torrent load failed", rlog.Error(err)) } close(s.torrentLoaded) }() return s, nil } var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs func (s *Service) Close() error { return errors.Join(append( s.c.Close(), s.Storage.Close(), )...) } func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) { ctx, span := tracer.Start(ctx, "LoadTorrent") defer span.End() log := s.log defer f.Close(ctx) stat, err := f.Info() if err != nil { return nil, fmt.Errorf("call stat failed: %w", err) } span.SetAttributes(attribute.String("filename", stat.Name())) mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f))) if err != nil { return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err) } t, ok := s.c.Torrent(mi.HashInfoBytes()) if !ok { span.AddEvent("torrent not found, loading from file") log.Info(ctx, "torrent not found, loading from file") spec, err := torrent.TorrentSpecFromMetaInfoErr(mi) if err != nil { return nil, fmt.Errorf("parse spec from metadata: %w", err) } infoBytes := spec.InfoBytes if !isValidInfoHashBytes(infoBytes) { log.Warn(ctx, "info loaded from spec not valid") infoBytes = nil } if len(infoBytes) == 0 { log.Info(ctx, "no info loaded from file, try to load from cache") infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash) if err != nil && err != store.ErrNotFound { return nil, fmt.Errorf("get info bytes from database: %w", err) } } t, _ = s.c.AddTorrentOpt(torrent.AddTorrentOpts{ InfoHash: spec.InfoHash, Storage: s.Storage, InfoBytes: infoBytes, ChunkSize: spec.ChunkSize, }) t.AllowDataDownload() t.AllowDataUpload() span.AddEvent("torrent added to client") select { case <-ctx.Done(): return nil, ctx.Err() case <-t.GotInfo(): err := s.infoBytes.Set(t.InfoHash(), t.Metainfo()) if err != nil { log.Error(ctx, "error setting info bytes for torrent", slog.String("torrent-name", t.Name()), rlog.Error(err), ) } } span.AddEvent("got info") info := t.Info() if info == nil { return nil, fmt.Errorf("info is nil") } compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, *info) if err != nil { return nil, err } if !compatable { return nil, fmt.Errorf( "torrent with name '%s' not compatable existing infohash: %s, new: %s", t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(), ) } } return t, nil } func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, info metainfo.Info) (compatable bool, tryLater bool, err error) { log := s.log.With( slog.String("new-name", info.BestName()), slog.String("new-infohash", ih.String()), ) name := info.BestName() aq, found, err := s.dirsAquire.Get(ctx, info.BestName()) if err != nil { return false, false, err } if !found { err = s.dirsAquire.Set(ctx, name, DirAquire{ Name: name, Hashes: slices.Compact([]infohash.T{ih}), }) if err != nil { return false, false, err } log.Debug(ctx, "acquiring was not found, so created") return true, false, nil } if slices.Contains(aq.Hashes, ih) { log.Debug(ctx, "hash already know to be compatable") return true, false, nil } for _, existingTorrent := range s.c.Torrents() { if existingTorrent.Name() != name || existingTorrent.InfoHash() == ih { continue } existingInfo := existingTorrent.Info() existingFiles := slices.Clone(existingInfo.Files) newFiles := slices.Clone(info.Files) if !s.checkTorrentFilesCompatable(ctx, aq, existingFiles, newFiles) { return false, false, nil } aq.Hashes = slicesUnique(append(aq.Hashes, ih)) err = s.dirsAquire.Set(ctx, aq.Name, aq) if err != nil { log.Warn(ctx, "torrent not compatible") return false, false, err } } if slices.Contains(aq.Hashes, ih) { log.Debug(ctx, "hash is compatable") return true, false, nil } log.Debug(ctx, "torrent with same name not found, try later") return false, true, nil } func (s *Service) checkTorrentFilesCompatable(ctx context.Context, aq DirAquire, existingFiles, newFiles []metainfo.FileInfo) bool { log := s.log.With(slog.String("name", aq.Name)) pathCmp := func(a, b metainfo.FileInfo) int { return slices.Compare(a.BestPath(), b.BestPath()) } slices.SortStableFunc(existingFiles, pathCmp) slices.SortStableFunc(newFiles, pathCmp) // torrents basically equals if slices.EqualFunc(existingFiles, newFiles, func(fi1, fi2 metainfo.FileInfo) bool { return fi1.Length == fi2.Length && slices.Equal(fi1.BestPath(), fi1.BestPath()) }) { return true } if len(newFiles) > len(existingFiles) { type fileInfo struct { Path string Length int64 } mapInfo := func(fi metainfo.FileInfo) fileInfo { return fileInfo{ Path: strings.Join(fi.BestPath(), "/"), Length: fi.Length, } } existingFiles := apply(existingFiles, mapInfo) newFiles := apply(newFiles, mapInfo) for _, n := range newFiles { if slices.Contains(existingFiles, n) { continue } for _, e := range existingFiles { if e.Path == n.Path && e.Length != n.Length { log.Warn(ctx, "torrents not compatible, has files with different length", slog.String("path", n.Path), slog.Int64("existing-length", e.Length), slog.Int64("new-length", e.Length), ) return false } } } } return true } // func (s *Service) getTorrentsByName(name string) []*torrent.Torrent { // out := []*torrent.Torrent{} // for _, t := range s.c.Torrents() { // if t.Name() == name { // out = append(out, t) // } // } // return out // } func isValidInfoHashBytes(d []byte) bool { var info metainfo.Info err := bencode.Unmarshal(d, &info) return err == nil } func (s *Service) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) { defer f.Close(ctx) info, err := f.Info() if err != nil { return nil, err } t, err := s.LoadTorrent(ctx, f) if err != nil { return nil, err } return vfs.NewTorrentFs(info.Name(), controller.NewTorrent(t, s.excludedFiles)), nil } func (s *Service) Stats() (*Stats, error) { return &Stats{}, nil } func (s *Service) GetStats() torrent.ConnStats { return s.c.ConnStats() } const loadWorkers = 5 func (s *Service) loadTorrentFiles(ctx context.Context) error { ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes( attribute.Int("workers", loadWorkers), )) defer span.End() log := s.log loaderPaths := make(chan string) wg := sync.WaitGroup{} defer func() { close(loaderPaths) wg.Wait() }() loaderWorker := func() { wg.Add(1) for path := range loaderPaths { file, err := vfs.NewLazyOsFile(path) if err != nil { log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err)) continue } defer file.Close(ctx) _, err = s.LoadTorrent(ctx, file) if err != nil { log.Error(ctx, "failed adding torrent", rlog.Error(err)) } } wg.Done() } for range loadWorkers { go loaderWorker() } return filepath.Walk(s.SourceDir, func(path string, info os.FileInfo, err error) error { if err != nil { return fmt.Errorf("fs walk error: %w", err) } if ctx.Err() != nil { return ctx.Err() } if info.IsDir() { return nil } if strings.HasSuffix(path, ".torrent") { loaderPaths <- path } return nil }) } func (s *Service) ListTorrents(ctx context.Context) ([]*controller.Torrent, error) { <-s.torrentLoaded out := []*controller.Torrent{} for _, v := range s.c.Torrents() { out = append(out, controller.NewTorrent(v, s.excludedFiles)) } return out, nil } func (s *Service) GetTorrent(infohashHex string) (*controller.Torrent, error) { <-s.torrentLoaded t, ok := s.c.Torrent(infohash.FromHexString(infohashHex)) if !ok { return nil, nil } return controller.NewTorrent(t, s.excludedFiles), nil } func slicesUnique[S ~[]E, E comparable](in S) S { m := map[E]struct{}{} for _, v := range in { m[v] = struct{}{} } return maps.Keys(m) } func apply[I, O any](in []I, f func(e I) O) []O { out := []O{} for _, v := range in { out = append(out, f(v)) } return out }