package torrent import ( "bufio" "context" "errors" "fmt" "log/slog" "os" "path/filepath" "strings" "sync" "time" "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/tkv" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/royalcat/ctxio" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/types/infohash" "github.com/go-git/go-billy/v5" "github.com/go-git/go-billy/v5/util" "github.com/royalcat/kv" ) var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent", trace.WithInstrumentationAttributes(attribute.String("component", "torrent-daemon")), ) type DirAquire struct { Name string Hashes []infohash.T } type Daemon struct { client *torrent.Client infoBytes *infoBytesStore Storage *fileStorage fis *dhtFileItemStore dirsAquire kv.Store[string, DirAquire] fileProperties kv.Store[string, FileProperties] loadMutex sync.Mutex torrentLoaded chan struct{} sourceFs billy.Filesystem log *rlog.Logger } func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, error) { s := &Daemon{ log: rlog.Component("torrent-service"), sourceFs: sourceFs, torrentLoaded: make(chan struct{}), loadMutex: sync.Mutex{}, } err := os.MkdirAll(conf.MetadataFolder, 0744) if err != nil { return nil, fmt.Errorf("error creating metadata folder: %w", err) } s.fis, err = newDHTStore(filepath.Join(conf.MetadataFolder, "dht-item-store"), 3*time.Hour) if err != nil { return nil, fmt.Errorf("error starting item store: %w", err) } s.Storage, _, err = setupStorage(conf) if err != nil { return nil, err } s.fileProperties, err = tkv.NewKV[string, FileProperties](conf.MetadataFolder, "file-properties") if err != nil { return nil, err } s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder) if err != nil { return nil, err } id, err := getOrCreatePeerID(filepath.Join(conf.MetadataFolder, "ID")) if err != nil { return nil, fmt.Errorf("error creating node ID: %w", err) } s.client, err = newClient(s.Storage, s.fis, &conf, id) if err != nil { return nil, fmt.Errorf("error starting torrent client: %w", err) } s.client.AddDhtNodes(conf.DHTNodes) s.dirsAquire, err = tkv.NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire") if err != nil { return nil, err } go func() { ctx := context.Background() err := s.loadTorrentFiles(ctx) if err != nil { s.log.Error(ctx, "initial torrent load failed", rlog.Error(err)) } close(s.torrentLoaded) }() return s, nil } var _ vfs.FsFactory = (*Daemon)(nil).NewTorrentFs func (s *Daemon) Close(ctx context.Context) error { return errors.Join(append( s.client.Close(), s.Storage.Close(), s.dirsAquire.Close(ctx), // s.excludedFiles.Close(ctx), s.infoBytes.Close(), s.fis.Close(), )...) } func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) { ctx, span := tracer.Start(ctx, "loadTorrent") defer span.End() log := s.log stat, err := f.Info() if err != nil { return nil, fmt.Errorf("call stat failed: %w", err) } span.SetAttributes(attribute.String("filename", stat.Name())) mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f))) if err != nil { return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err) } var ctl *Controller t, ok := s.client.Torrent(mi.HashInfoBytes()) if ok { ctl = s.newController(t) } else { span.AddEvent("torrent not found, loading from file") log.Info(ctx, "torrent not found, loading from file") spec, err := torrent.TorrentSpecFromMetaInfoErr(mi) if err != nil { return nil, fmt.Errorf("parse spec from metadata: %w", err) } infoBytes := spec.InfoBytes if !isValidInfoHashBytes(infoBytes) { log.Warn(ctx, "info loaded from spec not valid") infoBytes = nil } if len(infoBytes) == 0 { log.Info(ctx, "no info loaded from file, try to load from cache") infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash) if err != nil && err != errNotFound { return nil, fmt.Errorf("get info bytes from database: %w", err) } } t, _ = s.client.AddTorrentOpt(torrent.AddTorrentOpts{ InfoHash: spec.InfoHash, InfoHashV2: spec.InfoHashV2, Storage: s.Storage, InfoBytes: infoBytes, ChunkSize: spec.ChunkSize, }) t.AllowDataDownload() t.AllowDataUpload() span.AddEvent("torrent added to client") select { case <-ctx.Done(): return nil, ctx.Err() case <-t.GotInfo(): err := s.infoBytes.Set(t.InfoHash(), t.Metainfo()) if err != nil { log.Error(ctx, "error setting info bytes for torrent", slog.String("torrent-name", t.Name()), rlog.Error(err), ) } } span.AddEvent("got info") ctl = s.newController(t) err = ctl.initializeTorrentPriories(ctx) if err != nil { return nil, fmt.Errorf("initialize torrent priorities: %w", err) } // info := t.Info() // if info == nil { // return nil, fmt.Errorf("info is nil") // } // compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, *info) // if err != nil { // return nil, err // } // if !compatable { // return nil, fmt.Errorf( // "torrent with name '%s' not compatable existing infohash: %s, new: %s", // t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(), // ) // } } return ctl, nil } func isValidInfoHashBytes(d []byte) bool { var info metainfo.Info err := bencode.Unmarshal(d, &info) return err == nil } func (s *Daemon) Stats() torrent.ConnStats { return s.client.ConnStats() } const loadWorkers = 5 func (s *Daemon) loadTorrentFiles(ctx context.Context) error { ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes( attribute.Int("workers", loadWorkers), )) defer span.End() log := s.log loaderPaths := make(chan string, loadWorkers*5) wg := sync.WaitGroup{} defer func() { close(loaderPaths) wg.Wait() }() loaderWorker := func() { for path := range loaderPaths { info, err := s.sourceFs.Stat(path) if err != nil { log.Error(ctx, "error stat torrent file", slog.String("filename", path), rlog.Error(err)) continue } file, err := s.sourceFs.Open(path) if err != nil { log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err)) continue } defer file.Close() vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file)) _, err = s.loadTorrent(ctx, vfile) if err != nil { log.Error(ctx, "failed adding torrent", rlog.Error(err)) } } wg.Done() } wg.Add(loadWorkers) for range loadWorkers { go loaderWorker() } return util.Walk(s.sourceFs, "", func(path string, info os.FileInfo, err error) error { if err != nil { return fmt.Errorf("fs walk error: %w", err) } if ctx.Err() != nil { return ctx.Err() } if info.IsDir() { return nil } if strings.HasSuffix(path, ".torrent") { loaderPaths <- path } return nil }) } func storeByTorrent[K kv.Bytes, V any](s kv.Store[K, V], infohash infohash.T) kv.Store[K, V] { return kv.PrefixBytes[K, V](s, K(infohash.HexString()+"/")) } func (s *Daemon) newController(t *torrent.Torrent) *Controller { return newController(t, storeByTorrent(s.fileProperties, t.InfoHash()), s.Storage, s.log, ) } func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) { <-s.torrentLoaded out := []*Controller{} for _, v := range s.client.Torrents() { out = append(out, s.newController(v)) } return out, nil } func (s *Daemon) GetTorrent(infohashHex string) (*Controller, error) { <-s.torrentLoaded t, ok := s.client.Torrent(infohash.FromHexString(infohashHex)) if !ok { return nil, nil } return s.newController(t), nil } func slicesUnique[S ~[]E, E comparable](in S) S { m := map[E]struct{}{} for _, v := range in { m[v] = struct{}{} } return maps.Keys(m) } func apply[I, O any](in []I, f func(e I) O) []O { out := []O{} for _, v := range in { out = append(out, f(v)) } return out }