tstor/daemons/atorrent/controller.go
royalcat 10c3f126f0
Some checks failed
docker / build-docker (push) Failing after 4m59s
rename old torrent module
2025-01-08 00:51:28 +03:00

265 lines
5.7 KiB
Go

package atorrent
import (
"context"
"log/slog"
"strings"
"git.kmsign.ru/royalcat/tstor/pkg/kvsingle"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types"
"github.com/royalcat/kv"
)
type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error
}
type FileProperties struct {
Excluded bool `json:"excluded"`
Priority types.PiecePriority `json:"priority"`
}
type Controller struct {
torrentFilePath string
t *torrent.Torrent
storage TorrentFileDeleter
fileProperties kv.Store[string, FileProperties]
log *rlog.Logger
}
func newController(t *torrent.Torrent, torrentFileProperties kv.Store[string, FileProperties], storage TorrentFileDeleter, log *rlog.Logger) *Controller {
return &Controller{
t: t,
storage: storage,
fileProperties: torrentFileProperties,
log: log.WithComponent("controller").With(slog.String("infohash", t.InfoHash().HexString())),
}
}
func (s *Controller) TorrentFilePath() string {
return s.torrentFilePath
}
func (s *Controller) Torrent() *torrent.Torrent {
return s.t
}
func (c *Controller) Name() string {
<-c.t.GotInfo()
if name := c.t.Name(); name != "" {
return name
}
return c.InfoHash()
}
func (s *Controller) InfoHash() string {
<-s.t.GotInfo()
return s.t.InfoHash().HexString()
}
func (s *Controller) BytesCompleted() int64 {
<-s.t.GotInfo()
return s.t.BytesCompleted()
}
func (s *Controller) BytesMissing() int64 {
<-s.t.GotInfo()
return s.t.BytesMissing()
}
func (s *Controller) Length() int64 {
<-s.t.GotInfo()
return s.t.Length()
}
func (s *Controller) Files(ctx context.Context) ([]*FileController, error) {
ctx, span := tracer.Start(ctx, "Files")
defer span.End()
fps := map[string]FileProperties{}
err := s.fileProperties.Range(ctx, func(k string, v FileProperties) error {
fps[k] = v
return nil
})
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-s.t.GotInfo():
}
files := make([]*FileController, 0)
for _, v := range s.t.Files() {
if strings.Contains(v.Path(), "/.pad/") {
continue
}
props := kvsingle.New(s.fileProperties, v.Path())
ctl := NewFileController(v, props, s.log)
files = append(files, ctl)
}
return files, nil
}
func (s *Controller) GetFile(ctx context.Context, file string) (*FileController, error) {
files, err := s.Files(ctx)
if err != nil {
return nil, err
}
for _, v := range files {
if v.Path() == file {
return v, nil
}
}
return nil, nil
}
func Map[T, U any](ts []T, f func(T) U) []U {
us := make([]U, len(ts))
for i := range ts {
us[i] = f(ts[i])
}
return us
}
func (s *Controller) ExcludeFile(ctx context.Context, f *torrent.File) error {
log := s.log.With(slog.String("file", f.Path()))
log.Info(ctx, "excluding file")
err := s.fileProperties.Edit(ctx, f.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
v.Excluded = true
return v, nil
})
if err == kv.ErrKeyNotFound {
err := s.fileProperties.Set(ctx, f.Path(), FileProperties{Excluded: true})
if err != nil {
return err
}
} else if err != nil {
return err
}
return s.storage.DeleteFile(f)
}
func (s *Controller) isFileComplete(startIndex int, endIndex int) bool {
for i := startIndex; i < endIndex; i++ {
if !s.t.Piece(i).State().Complete {
return false
}
}
return true
}
func (s *Controller) ValidateTorrent(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.t.GotInfo():
}
for i := 0; i < s.t.NumPieces(); i++ {
if ctx.Err() != nil {
return ctx.Err()
}
s.t.Piece(i).VerifyData()
}
return nil
}
func (c *Controller) SetPriority(ctx context.Context, priority types.PiecePriority) error {
log := c.log.With(slog.Int("priority", int(priority)))
files, err := c.Files(ctx)
if err != nil {
return err
}
for _, f := range files {
excluded, err := f.Excluded(ctx)
if err != nil {
log.Error(ctx, "failed to get file exclusion status", rlog.Error(err))
}
if excluded {
continue
}
err = f.SetPriority(ctx, priority)
if err != nil {
log.Error(ctx, "failed to set file priority", rlog.Error(err))
}
}
return nil
}
const defaultPriority = types.PiecePriorityNone
func (c *Controller) Priority(ctx context.Context) (types.PiecePriority, error) {
prio := defaultPriority
files, err := c.Files(ctx)
if err != nil {
return 0, err
}
for _, v := range files {
filePriority := v.Priority()
if filePriority > prio {
prio = filePriority
}
}
return prio, nil
}
// func (c *Controller) setFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error {
// err := c.fileProperties.Edit(ctx, file.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
// v.Priority = priority
// return v, nil
// })
// if err == kv.ErrKeyNotFound {
// seterr := c.fileProperties.Set(ctx, file.Path(), FileProperties{Priority: priority})
// if seterr != nil {
// return seterr
// }
// err = nil
// }
// if err != nil {
// return err
// }
// file.SetPriority(priority)
// return nil
// }
func (c *Controller) initializeTorrentPriories(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "initializeTorrentPriories")
defer span.End()
log := c.log
files, err := c.Files(ctx)
if err != nil {
return err
}
for _, file := range files {
props, err := file.Properties(ctx)
if err != nil {
log.Error(ctx, "failed to get file properties", rlog.Error(err))
continue
}
file.file.SetPriority(props.Priority)
}
log.Debug(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name()))
return nil
}