tstor/daemons/torrent/daemon_load.go
royalcat 585f317478
Some checks failed
docker / build-docker (push) Failing after 34s
daemons separation
2024-11-24 20:32:26 +03:00

246 lines
5.8 KiB
Go

package torrent
import (
"bufio"
"context"
"fmt"
"io"
"log/slog"
"os"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/go-git/go-billy/v5/util"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const activityTimeout = time.Minute * 15
func readInfoHash(ctx context.Context, f vfs.File) (metainfo.Hash, error) {
ctx, span := tracer.Start(ctx, "readInfoHash")
defer span.End()
mi, err := metainfo.Load(ctxio.IoReader(ctx, f))
if err != nil {
return metainfo.Hash{}, fmt.Errorf("loading metainfo: %w", err)
}
return mi.HashInfoBytes(), nil
}
func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
ctx, span := tracer.Start(ctx, "loadTorrent")
defer span.End()
log := s.log
stat, err := f.Info()
if err != nil {
return nil, fmt.Errorf("call stat failed: %w", err)
}
span.SetAttributes(attribute.String("filename", stat.Name()))
mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f)))
if err != nil {
return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err)
}
log = log.With(slog.String("info-hash", mi.HashInfoBytes().HexString()))
var ctl *Controller
t, ok := s.client.Torrent(mi.HashInfoBytes())
if ok {
log = log.With(slog.String("torrent-name", t.Name()))
ctl = s.newController(t)
} else {
span.AddEvent("torrent not found, loading from file")
log.Info(ctx, "torrent not found, loading from file")
spec, err := torrent.TorrentSpecFromMetaInfoErr(mi)
if err != nil {
return nil, fmt.Errorf("parse spec from metadata: %w", err)
}
infoBytes := spec.InfoBytes
if !isValidInfoHashBytes(infoBytes) {
log.Warn(ctx, "info loaded from spec not valid")
infoBytes = nil
}
if len(infoBytes) == 0 {
log.Info(ctx, "no info loaded from file, try to load from cache")
infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash)
if err != nil && err != errNotFound {
return nil, fmt.Errorf("get info bytes from database: %w", err)
}
}
t, _ = s.client.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: spec.InfoHash,
InfoHashV2: spec.InfoHashV2,
Storage: s.Storage,
InfoBytes: infoBytes,
ChunkSize: spec.ChunkSize,
})
log = log.With(slog.String("torrent-name", t.Name()))
t.AllowDataDownload()
t.AllowDataUpload()
span.AddEvent("torrent added to client")
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-t.GotInfo():
err := s.infoBytes.Set(t.InfoHash(), t.Metainfo())
if err != nil {
log.Error(ctx, "error setting info bytes for torrent",
slog.String("torrent-name", t.Name()),
rlog.Error(err),
)
}
}
span.AddEvent("got info")
ctl = s.newController(t)
err = ctl.initializeTorrentPriories(ctx)
if err != nil {
return nil, fmt.Errorf("initialize torrent priorities: %w", err)
}
// go func() {
// subscr := ctl.t.SubscribePieceStateChanges()
// defer subscr.Close()
// dropTimer := time.NewTimer(activityTimeout)
// defer dropTimer.Stop()
// for {
// select {
// case <-subscr.Values:
// dropTimer.Reset(activityTimeout)
// case <-dropTimer.C:
// log.Info(ctx, "torrent dropped by activity timeout")
// select {
// case <-ctl.t.Closed():
// return
// case <-time.After(time.Second):
// ctl.t.Drop()
// }
// case <-ctl.t.Closed():
// return
// }
// }
// }()
}
return ctl, nil
}
const loadWorkers = 5
func (s *Daemon) backgroudFileLoad(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes(
attribute.Int("workers", loadWorkers),
))
defer span.End()
log := s.log
loaderPaths := make(chan string, loadWorkers*5)
wg := sync.WaitGroup{}
defer func() {
close(loaderPaths)
wg.Wait()
}()
loaderWorker := func() {
for path := range loaderPaths {
info, err := s.sourceFs.Stat(path)
if err != nil {
log.Error(ctx, "error stat torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
file, err := s.sourceFs.Open(path)
if err != nil {
log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
defer file.Close()
vfile := vfs.NewCtxBillyFile(info, ctxbilly.WrapFile(file))
ih, err := readInfoHash(ctx, vfile)
if err != nil {
log.Error(ctx, "error reading info hash", slog.String("filename", path), rlog.Error(err))
continue
}
props := storeByTorrent(s.fileProperties, ih)
_, err = vfile.Seek(0, io.SeekStart)
if err != nil {
log.Error(ctx, "error seeking file", slog.String("filename", path), rlog.Error(err))
continue
}
isPrioritized := false
err = props.Range(ctx, func(k string, v FileProperties) error {
if v.Priority > 0 {
isPrioritized = true
return io.EOF
}
return nil
})
if err != nil && err != io.EOF {
log.Error(ctx, "error checking file priority", slog.String("filename", path), rlog.Error(err))
continue
}
if !isPrioritized {
log.Debug(ctx, "file not prioritized, skipping", slog.String("filename", path))
continue
}
_, err = s.loadTorrent(ctx, vfile)
if err != nil {
log.Error(ctx, "failed adding torrent", rlog.Error(err))
}
}
wg.Done()
}
wg.Add(loadWorkers)
for range loadWorkers {
go loaderWorker()
}
return util.Walk(s.sourceFs, "", func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("fs walk error: %w", err)
}
if ctx.Err() != nil {
return ctx.Err()
}
if info.IsDir() {
return nil
}
if strings.HasSuffix(path, ".torrent") {
loaderPaths <- path
}
return nil
})
}