tstor/daemons/torrent/queue.go
2024-11-24 20:33:44 +03:00

135 lines
2.7 KiB
Go

package torrent
import (
"context"
"fmt"
"git.kmsign.ru/royalcat/tstor/pkg/uuid"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types/infohash"
)
type DownloadTask struct {
ID uuid.UUID
InfoHash infohash.T
File string
}
func (s *Daemon) Download(ctx context.Context, task *DownloadTask) error {
t, ok := s.client.Torrent(task.InfoHash)
if !ok {
return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString())
}
if task.File != "" {
var file *torrent.File
for _, tf := range t.Files() {
if tf.Path() == task.File {
file = tf
break
}
}
if file == nil {
return fmt.Errorf("file %s not found in torrent torrent with IH %s", task.File, task.InfoHash.HexString())
}
file.Download()
} else {
for _, file := range t.Files() {
file.Download()
}
}
return nil
}
// func (s *Service) DownloadAndWait(ctx context.Context, task *TorrentDownloadTask) error {
// t, ok := s.c.Torrent(task.InfoHash)
// if !ok {
// return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString())
// }
// if task.File != "" {
// var file *torrent.File
// for _, tf := range t.Files() {
// if tf.Path() == task.File {
// file = tf
// break
// }
// }
// if file == nil {
// return fmt.Errorf("file %s not found in torrent torrent with IH %s", task.File, task.InfoHash.HexString())
// }
// file.Download()
// return waitPieceRange(ctx, t, file.BeginPieceIndex(), file.EndPieceIndex())
// }
// t.DownloadAll()
// select {
// case <-ctx.Done():
// return ctx.Err()
// case <-t.Complete.On():
// return nil
// }
// }
// func waitPieceRange(ctx context.Context, t *torrent.Torrent, start, end int) error {
// for i := start; i < end; i++ {
// timer := time.NewTimer(time.Millisecond)
// for {
// select {
// case <-ctx.Done():
// return ctx.Err()
// case <-timer.C:
// if t.PieceState(i).Complete {
// continue
// }
// }
// }
// }
// return nil
// }
type TorrentProgress struct {
Torrent *Controller
Current int64
Total int64
}
func (s *Daemon) DownloadProgress(ctx context.Context) (<-chan TorrentProgress, error) {
torrents, err := s.ListTorrents(ctx)
if err != nil {
return nil, err
}
out := make(chan TorrentProgress, 1)
go func() {
defer close(out)
for _, t := range torrents {
sub := t.Torrent().SubscribePieceStateChanges()
go func(t *Controller) {
for stateChange := range sub.Values {
if !stateChange.Complete && !stateChange.Partial {
continue
}
out <- TorrentProgress{
Torrent: t,
Current: t.BytesCompleted(),
Total: t.Length(),
}
}
}(t)
defer sub.Close()
}
<-ctx.Done()
}()
return out, nil
}