refactor torrent
Some checks failed
docker / build-docker (linux/amd64) (push) Successful in 1m33s
docker / build-docker (linux/386) (push) Successful in 1m39s
docker / build-docker (linux/arm64) (push) Successful in 7m6s
docker / build-docker (linux/arm/v7) (push) Successful in 7m31s
docker / build-docker (linux/arm64/v8) (push) Has been cancelled

This commit is contained in:
royalcat 2024-05-20 00:24:09 +03:00
parent 99cdd5471e
commit 991c15fdef
33 changed files with 223 additions and 275 deletions

View file

@ -33,7 +33,7 @@ models:
resolver: true resolver: true
extraFields: extraFields:
T: T:
type: "*git.kmsign.ru/royalcat/tstor/src/host/controller.Torrent" type: "*git.kmsign.ru/royalcat/tstor/src/host/torrent.Controller"
TorrentFile: TorrentFile:
extraFields: extraFields:
F: F:
@ -57,7 +57,7 @@ models:
resolver: true resolver: true
extraFields: extraFields:
FS: FS:
type: "*git.kmsign.ru/royalcat/tstor/src/host/vfs.TorrentFS" type: "*git.kmsign.ru/royalcat/tstor/src/host/torrent.TorrentFS"
ResolverFS: ResolverFS:
fields: fields:
entries: entries:

View file

@ -19,7 +19,7 @@ import (
"git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/delivery" "git.kmsign.ru/royalcat/tstor/src/delivery"
"git.kmsign.ru/royalcat/tstor/src/host" "git.kmsign.ru/royalcat/tstor/src/host"
"git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/telemetry" "git.kmsign.ru/royalcat/tstor/src/telemetry"
"github.com/go-git/go-billy/v5/osfs" "github.com/go-git/go-billy/v5/osfs"
@ -90,12 +90,12 @@ func run(configPath string) error {
} }
sourceFs := osfs.New(conf.SourceDir, osfs.WithBoundOS()) sourceFs := osfs.New(conf.SourceDir, osfs.WithBoundOS())
srv, err := service.New(sourceFs, conf.TorrentClient) srv, err := torrent.NewService(sourceFs, conf.TorrentClient)
if err != nil { if err != nil {
return fmt.Errorf("error creating service: %w", err) return fmt.Errorf("error creating service: %w", err)
} }
sfs := host.NewTorrentStorage( sfs := host.NewHostedFS(
vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(sourceFs)), vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(sourceFs)),
srv, srv,
) )
@ -174,7 +174,7 @@ func run(configPath string) error {
go func() { go func() {
logFilename := filepath.Join(conf.Log.Path, "logs") logFilename := filepath.Join(conf.Log.Path, "logs")
err := delivery.New(nil, service.NewStats(), srv, sfs, logFilename, conf) err := delivery.New(nil, srv, sfs, logFilename, conf)
if err != nil { if err != nil {
log.Error(ctx, "error initializing HTTP server", rlog.Error(err)) log.Error(ctx, "error initializing HTTP server", rlog.Error(err))
} }

View file

@ -7,12 +7,12 @@ import (
"net/http" "net/http"
"os" "os"
"git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"github.com/anacrolix/missinggo/v2/filecache" "github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
var apiStatusHandler = func(fc *filecache.Cache, ss *service.Stats) gin.HandlerFunc { var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerFunc {
return func(ctx *gin.Context) { return func(ctx *gin.Context) {
stat := gin.H{ stat := gin.H{
"torrentStats": ss.GlobalStats(), "torrentStats": ss.GlobalStats(),

View file

@ -3,6 +3,7 @@ package model
import ( import (
"context" "context"
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
) )
@ -26,8 +27,8 @@ func FillFsEntry(ctx context.Context, e FsElem, fs vfs.Filesystem, path string)
Name: e.Name(), Name: e.Name(),
FS: e, FS: e,
}, nil }, nil
case *vfs.TorrentFS: case *torrent.TorrentFS:
e := e.(*vfs.TorrentFS) e := e.(*torrent.TorrentFS)
torrent, err := MapTorrent(ctx, e.Torrent) torrent, err := MapTorrent(ctx, e.Torrent)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -3,39 +3,39 @@ package model
import ( import (
"context" "context"
"git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"github.com/anacrolix/torrent" atorrent "github.com/anacrolix/torrent"
) )
func MapPeerSource(source torrent.PeerSource) string { func MapPeerSource(source atorrent.PeerSource) string {
switch source { switch source {
case torrent.PeerSourceDirect: case atorrent.PeerSourceDirect:
return "Direct" return "Direct"
case torrent.PeerSourceUtHolepunch: case atorrent.PeerSourceUtHolepunch:
return "Ut Holepunch" return "Ut Holepunch"
case torrent.PeerSourceDhtAnnouncePeer: case atorrent.PeerSourceDhtAnnouncePeer:
return "DHT Announce" return "DHT Announce"
case torrent.PeerSourceDhtGetPeers: case atorrent.PeerSourceDhtGetPeers:
return "DHT" return "DHT"
case torrent.PeerSourceIncoming: case atorrent.PeerSourceIncoming:
return "Incoming" return "Incoming"
case torrent.PeerSourceTracker: case atorrent.PeerSourceTracker:
return "Tracker" return "Tracker"
case torrent.PeerSourcePex: case atorrent.PeerSourcePex:
return "PEX" return "PEX"
default: default:
return "Unknown" return "Unknown"
} }
} }
func MapTorrent(ctx context.Context, t *controller.Torrent) (*Torrent, error) { func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) {
downloading := false downloading := false
files, err := t.Files(ctx) files, err := t.Files(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, file := range files { for _, file := range files {
if file.Priority() > torrent.PiecePriorityNone && file.BytesCompleted() < file.Length() { if file.Priority() > atorrent.PiecePriorityNone && file.BytesCompleted() < file.Length() {
downloading = true downloading = true
break break
} }

View file

@ -5,9 +5,9 @@ package model
import ( import (
"time" "time"
"git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/anacrolix/torrent" torrent1 "github.com/anacrolix/torrent"
) )
type Dir interface { type Dir interface {
@ -180,14 +180,14 @@ type Torrent struct {
ExcludedFiles []*TorrentFile `json:"excludedFiles"` ExcludedFiles []*TorrentFile `json:"excludedFiles"`
Peers []*TorrentPeer `json:"peers"` Peers []*TorrentPeer `json:"peers"`
Downloading bool `json:"downloading"` Downloading bool `json:"downloading"`
T *controller.Torrent `json:"-"` T *torrent.Controller `json:"-"`
} }
type TorrentFs struct { type TorrentFs struct {
Name string `json:"name"` Name string `json:"name"`
Torrent *Torrent `json:"torrent"` Torrent *Torrent `json:"torrent"`
Entries []FsEntry `json:"entries"` Entries []FsEntry `json:"entries"`
FS *vfs.TorrentFS `json:"-"` FS *torrent.TorrentFS `json:"-"`
} }
func (TorrentFs) IsDir() {} func (TorrentFs) IsDir() {}
@ -206,10 +206,10 @@ func (this TorrentFs) GetEntries() []FsEntry {
func (TorrentFs) IsFsEntry() {} func (TorrentFs) IsFsEntry() {}
type TorrentFile struct { type TorrentFile struct {
Filename string `json:"filename"` Filename string `json:"filename"`
Size int64 `json:"size"` Size int64 `json:"size"`
BytesCompleted int64 `json:"bytesCompleted"` BytesCompleted int64 `json:"bytesCompleted"`
F *torrent.File `json:"-"` F *torrent1.File `json:"-"`
} }
type TorrentFileEntry struct { type TorrentFileEntry struct {
@ -230,12 +230,12 @@ type TorrentFilter struct {
} }
type TorrentPeer struct { type TorrentPeer struct {
IP string `json:"ip"` IP string `json:"ip"`
DownloadRate float64 `json:"downloadRate"` DownloadRate float64 `json:"downloadRate"`
Discovery string `json:"discovery"` Discovery string `json:"discovery"`
Port int64 `json:"port"` Port int64 `json:"port"`
ClientName string `json:"clientName"` ClientName string `json:"clientName"`
F *torrent.PeerConn `json:"-"` F *torrent1.PeerConn `json:"-"`
} }
type TorrentProgress struct { type TorrentProgress struct {

View file

@ -14,7 +14,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/uuid" "git.kmsign.ru/royalcat/tstor/pkg/uuid"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
"git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql"
aih "github.com/anacrolix/torrent/types/infohash" aih "github.com/anacrolix/torrent/types/infohash"
) )
@ -79,7 +79,7 @@ func (r *mutationResolver) DownloadTorrent(ctx context.Context, infohash string,
f = *file f = *file
} }
err := r.Service.Download(ctx, &service.TorrentDownloadTask{ err := r.Service.Download(ctx, &torrent.DownloadTask{
ID: uuid.New(), ID: uuid.New(),
InfoHash: aih.FromHexString(infohash), InfoHash: aih.FromHexString(infohash),
File: f, File: f,

View file

@ -11,7 +11,7 @@ import (
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
"git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
) )
// Torrents is the resolver for the torrents field. // Torrents is the resolver for the torrents field.
@ -80,7 +80,7 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt
tr = append(tr, d) tr = append(tr, d)
} }
slices.SortStableFunc(torrents, func(t1, t2 *controller.Torrent) int { slices.SortStableFunc(torrents, func(t1, t2 *torrent.Controller) int {
return strings.Compare(t1.Name(), t2.Name()) return strings.Compare(t1.Name(), t2.Name())
}) })

View file

@ -1,7 +1,7 @@
package resolver package resolver
import ( import (
"git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/go-git/go-billy/v5" "github.com/go-git/go-billy/v5"
) )
@ -11,7 +11,7 @@ import (
// It serves as dependency injection for your app, add any dependencies you require here. // It serves as dependency injection for your app, add any dependencies you require here.
type Resolver struct { type Resolver struct {
Service *service.Service Service *torrent.Service
VFS vfs.Filesystem VFS vfs.Filesystem
SourceFS billy.Filesystem SourceFS billy.Filesystem
} }

View file

@ -7,7 +7,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/anacrolix/missinggo/v2/filecache" "github.com/anacrolix/missinggo/v2/filecache"
echopprof "github.com/labstack/echo-contrib/pprof" echopprof "github.com/labstack/echo-contrib/pprof"
@ -15,7 +15,7 @@ import (
"github.com/labstack/echo/v4/middleware" "github.com/labstack/echo/v4/middleware"
) )
func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, vfs vfs.Filesystem, logPath string, cfg *config.Settings) error { func New(fc *filecache.Cache, s *torrent.Service, vfs vfs.Filesystem, logPath string, cfg *config.Settings) error {
log := slog.With() log := slog.With()
r := echo.New() r := echo.New()

View file

@ -8,7 +8,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/pkg/rlog"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver"
"git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql"
"github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/handler"
@ -18,7 +18,7 @@ import (
"github.com/ravilushqa/otelgqlgen" "github.com/ravilushqa/otelgqlgen"
) )
func GraphQLHandler(service *service.Service, vfs vfs.Filesystem) http.Handler { func GraphQLHandler(service *torrent.Service, vfs vfs.Filesystem) http.Handler {
graphqlHandler := handler.NewDefaultServer( graphqlHandler := handler.NewDefaultServer(
graph.NewExecutableSchema( graph.NewExecutableSchema(
graph.Config{ graph.Config{

View file

@ -97,7 +97,7 @@ func newFile(ctx context.Context, name string, f vfs.File, df func() ([]os.FileI
return &webDAVFile{ return &webDAVFile{
ctx: ctx, ctx: ctx,
f: f, f: f,
fi: newFileInfo(name, f.Size(), f.IsDir()), fi: NewFileInfo(name, f.Size(), f.IsDir()),
dirFunc: df, dirFunc: df,
} }
} }
@ -185,7 +185,7 @@ type webDAVFileInfo struct {
isDir bool isDir bool
} }
func newFileInfo(name string, size int64, isDir bool) *webDAVFileInfo { func NewFileInfo(name string, size int64, isDir bool) *webDAVFileInfo {
return &webDAVFileInfo{ return &webDAVFileInfo{
name: name, name: name,
size: size, size: size,

View file

@ -1,11 +1,11 @@
package host package host
import ( import (
"git.kmsign.ru/royalcat/tstor/src/host/service" "git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
) )
func NewTorrentStorage(sourceFS vfs.Filesystem, tsrv *service.Service) vfs.Filesystem { func NewHostedFS(sourceFS vfs.Filesystem, tsrv *torrent.Service) vfs.Filesystem {
factories := map[string]vfs.FsFactory{ factories := map[string]vfs.FsFactory{
".torrent": tsrv.NewTorrentFs, ".torrent": tsrv.NewTorrentFs,
} }

View file

@ -1,33 +1,32 @@
package controller package torrent
import ( import (
"context" "context"
"slices" "slices"
"strings" "strings"
"git.kmsign.ru/royalcat/tstor/src/host/store"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
) )
type Torrent struct { type Controller struct {
torrentFilePath string torrentFilePath string
t *torrent.Torrent t *torrent.Torrent
rep *store.FilesMappings rep *filesMappingsStore
} }
func NewTorrent(t *torrent.Torrent, rep *store.FilesMappings) *Torrent { func newController(t *torrent.Torrent, rep *filesMappingsStore) *Controller {
return &Torrent{t: t, rep: rep} return &Controller{t: t, rep: rep}
} }
func (s *Torrent) TorrentFilePath() string { func (s *Controller) TorrentFilePath() string {
return s.torrentFilePath return s.torrentFilePath
} }
func (s *Torrent) Torrent() *torrent.Torrent { func (s *Controller) Torrent() *torrent.Torrent {
return s.t return s.t
} }
func (c *Torrent) Name() string { func (c *Controller) Name() string {
<-c.t.GotInfo() <-c.t.GotInfo()
if name := c.t.Name(); name != "" { if name := c.t.Name(); name != "" {
return name return name
@ -36,27 +35,27 @@ func (c *Torrent) Name() string {
return c.InfoHash() return c.InfoHash()
} }
func (s *Torrent) InfoHash() string { func (s *Controller) InfoHash() string {
<-s.t.GotInfo() <-s.t.GotInfo()
return s.t.InfoHash().HexString() return s.t.InfoHash().HexString()
} }
func (s *Torrent) BytesCompleted() int64 { func (s *Controller) BytesCompleted() int64 {
<-s.t.GotInfo() <-s.t.GotInfo()
return s.t.BytesCompleted() return s.t.BytesCompleted()
} }
func (s *Torrent) BytesMissing() int64 { func (s *Controller) BytesMissing() int64 {
<-s.t.GotInfo() <-s.t.GotInfo()
return s.t.BytesMissing() return s.t.BytesMissing()
} }
func (s *Torrent) Length() int64 { func (s *Controller) Length() int64 {
<-s.t.GotInfo() <-s.t.GotInfo()
return s.t.Length() return s.t.Length()
} }
func (s *Torrent) Files(ctx context.Context) ([]*torrent.File, error) { func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
fileMappings, err := s.rep.FileMappings(ctx, s.t.InfoHash()) fileMappings, err := s.rep.FileMappings(ctx, s.t.InfoHash())
if err != nil { if err != nil {
return nil, err return nil, err
@ -95,11 +94,11 @@ func Map[T, U any](ts []T, f func(T) U) []U {
return us return us
} }
func (s *Torrent) ExcludeFile(ctx context.Context, f *torrent.File) error { func (s *Controller) ExcludeFile(ctx context.Context, f *torrent.File) error {
return s.rep.ExcludeFile(ctx, f) return s.rep.ExcludeFile(ctx, f)
} }
func (s *Torrent) isFileComplete(startIndex int, endIndex int) bool { func (s *Controller) isFileComplete(startIndex int, endIndex int) bool {
for i := startIndex; i < endIndex; i++ { for i := startIndex; i < endIndex; i++ {
if !s.t.Piece(i).State().Complete { if !s.t.Piece(i).State().Complete {
return false return false
@ -108,7 +107,7 @@ func (s *Torrent) isFileComplete(startIndex int, endIndex int) bool {
return true return true
} }
func (s *Torrent) ValidateTorrent() error { func (s *Controller) ValidateTorrent() error {
<-s.t.GotInfo() <-s.t.GotInfo()
s.t.VerifyData() s.t.VerifyData()
return nil return nil

View file

@ -1,8 +1,7 @@
package store package torrent
import ( import (
"context" "context"
"errors"
"path/filepath" "path/filepath"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
@ -10,13 +9,13 @@ import (
"github.com/royalcat/kv" "github.com/royalcat/kv"
) )
func NewFileMappings(metaDir string, storage TorrentFileDeleter) (*FilesMappings, error) { func newFileMappingsStore(metaDir string, storage TorrentFileDeleter) (*filesMappingsStore, error) {
str, err := kv.NewBadgerKVBytes[string, string](filepath.Join(metaDir, "file-mappings")) str, err := kv.NewBadgerKVBytes[string, string](filepath.Join(metaDir, "file-mappings"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
r := &FilesMappings{ r := &filesMappingsStore{
mappings: str, mappings: str,
storage: storage, storage: storage,
} }
@ -24,13 +23,11 @@ func NewFileMappings(metaDir string, storage TorrentFileDeleter) (*FilesMappings
return r, nil return r, nil
} }
type FilesMappings struct { type filesMappingsStore struct {
mappings kv.Store[string, string] mappings kv.Store[string, string]
storage TorrentFileDeleter storage TorrentFileDeleter
} }
var ErrNotFound = errors.New("not found")
type TorrentFileDeleter interface { type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error DeleteFile(file *torrent.File) error
} }
@ -39,15 +36,15 @@ func fileKey(file *torrent.File) string {
return file.Torrent().InfoHash().HexString() + "/" + file.Path() return file.Torrent().InfoHash().HexString() + "/" + file.Path()
} }
func (r *FilesMappings) MapFile(ctx context.Context, file *torrent.File, target string) error { func (r *filesMappingsStore) MapFile(ctx context.Context, file *torrent.File, target string) error {
return r.mappings.Set(ctx, fileKey(file), target) return r.mappings.Set(ctx, fileKey(file), target)
} }
func (r *FilesMappings) ExcludeFile(ctx context.Context, file *torrent.File) error { func (r *filesMappingsStore) ExcludeFile(ctx context.Context, file *torrent.File) error {
return r.mappings.Set(ctx, fileKey(file), "") return r.mappings.Set(ctx, fileKey(file), "")
} }
func (r *FilesMappings) FileMappings(ctx context.Context, ih infohash.T) (map[string]string, error) { func (r *filesMappingsStore) FileMappings(ctx context.Context, ih infohash.T) (map[string]string, error) {
out := map[string]string{} out := map[string]string{}
err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) bool { err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) bool {
out[k] = v out[k] = v
@ -56,6 +53,6 @@ func (r *FilesMappings) FileMappings(ctx context.Context, ih infohash.T) (map[st
return out, err return out, err
} }
func (r *FilesMappings) Close(ctx context.Context) error { func (r *filesMappingsStore) Close(ctx context.Context) error {
return r.mappings.Close(ctx) return r.mappings.Close(ctx)
} }

View file

@ -1,4 +1,4 @@
package vfs package torrent
import ( import (
"context" "context"
@ -12,7 +12,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -23,23 +23,35 @@ type TorrentFS struct {
name string name string
mu sync.Mutex mu sync.Mutex
Torrent *controller.Torrent Torrent *Controller
filesCache map[string]File filesCache map[string]vfs.File
lastAccessTimeout atomic.Pointer[time.Time] lastAccessTimeout atomic.Pointer[time.Time]
resolver *resolver resolver *vfs.Resolver
} }
var _ Filesystem = (*TorrentFS)(nil) var _ vfs.Filesystem = (*TorrentFS)(nil)
func NewTorrentFs(name string, c *controller.Torrent) *TorrentFS { func (s *Service) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
return &TorrentFS{ defer f.Close(ctx)
name: name,
Torrent: c, info, err := f.Info()
resolver: newResolver(ArchiveFactories), if err != nil {
return nil, err
} }
c, err := s.LoadTorrent(ctx, f)
if err != nil {
return nil, err
}
return &TorrentFS{
name: info.Name(),
Torrent: c,
resolver: vfs.NewResolver(vfs.ArchiveFactories),
}, nil
} }
var _ fs.DirEntry = (*TorrentFS)(nil) var _ fs.DirEntry = (*TorrentFS)(nil)
@ -89,7 +101,7 @@ func (tfs *TorrentFS) FsName() string {
return "torrentfs" return "torrentfs"
} }
func (fs *TorrentFS) files(ctx context.Context) (map[string]File, error) { func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
fs.mu.Lock() fs.mu.Lock()
defer fs.mu.Unlock() defer fs.mu.Unlock()
@ -105,10 +117,10 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]File, error) {
return nil, err return nil, err
} }
fs.filesCache = make(map[string]File) fs.filesCache = make(map[string]vfs.File)
for _, file := range files { for _, file := range files {
file.SetPriority(torrent.PiecePriorityNormal) file.SetPriority(torrent.PiecePriorityNormal)
p := AbsPath(file.Path()) p := vfs.AbsPath(file.Path())
tf, err := openTorrentFile(ctx, path.Base(p), file) tf, err := openTorrentFile(ctx, path.Base(p), file)
if err != nil { if err != nil {
return nil, err return nil, err
@ -117,17 +129,17 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]File, error) {
} }
// TODO optional // TODO optional
if len(fs.filesCache) == 1 && fs.resolver.isNestedFs(fs.Torrent.Name()) { if len(fs.filesCache) == 1 && fs.resolver.IsNestedFs(fs.Torrent.Name()) {
filepath := "/" + fs.Torrent.Name() filepath := "/" + fs.Torrent.Name()
if file, ok := fs.filesCache[filepath]; ok { if file, ok := fs.filesCache[filepath]; ok {
nestedFs, err := fs.resolver.nestedFs(ctx, filepath, file) nestedFs, err := fs.resolver.NestedFs(ctx, filepath, file)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if nestedFs == nil { if nestedFs == nil {
goto DEFAULT_DIR // FIXME goto DEFAULT_DIR // FIXME
} }
fs.filesCache, err = fs.listFilesRecursive(ctx, nestedFs, "/") fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -149,7 +161,7 @@ DEFAULT_DIR:
for k, f := range fs.filesCache { for k, f := range fs.filesCache {
delete(fs.filesCache, k) delete(fs.filesCache, k)
k, _ = strings.CutPrefix(k, rootDir) k, _ = strings.CutPrefix(k, rootDir)
k = AbsPath(k) k = vfs.AbsPath(k)
fs.filesCache[k] = f fs.filesCache[k] = f
} }
} }
@ -157,45 +169,22 @@ DEFAULT_DIR:
return fs.filesCache, nil return fs.filesCache, nil
} }
// func anyPeerHasFiles(file *torrent.File) bool { func listFilesRecursive(ctx context.Context, fs vfs.Filesystem, start string) (map[string]vfs.File, error) {
// for _, conn := range file.Torrent().PeerConns() { out := make(map[string]vfs.File, 0)
// if bitmapHaveFile(conn.PeerPieces(), file) { entries, err := fs.ReadDir(ctx, start)
// return true
// }
// }
// return false
// }
// func bitmapHaveFile(bitmap *roaring.Bitmap, file *torrent.File) bool {
// for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
// if !bitmap.ContainsInt(i) {
// return false
// }
// }
// return true
// }
func (fs *TorrentFS) listFilesRecursive(ctx context.Context, vfs Filesystem, start string) (map[string]File, error) {
ctx, span := tracer.Start(ctx, "listFilesRecursive",
fs.traceAttrs(attribute.String("start", start)),
)
defer span.End()
out := make(map[string]File, 0)
entries, err := vfs.ReadDir(ctx, start)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, entry := range entries { for _, entry := range entries {
filename := path.Join(start, entry.Name()) filename := path.Join(start, entry.Name())
if entry.IsDir() { if entry.IsDir() {
rec, err := fs.listFilesRecursive(ctx, vfs, filename) rec, err := listFilesRecursive(ctx, fs, filename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
maps.Copy(out, rec) maps.Copy(out, rec)
} else { } else {
file, err := vfs.Open(ctx, filename) file, err := fs.Open(ctx, filename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -206,7 +195,7 @@ func (fs *TorrentFS) listFilesRecursive(ctx context.Context, vfs Filesystem, sta
return out, nil return out, nil
} }
func (fs *TorrentFS) rawOpen(ctx context.Context, filename string) (file File, err error) { func (fs *TorrentFS) rawOpen(ctx context.Context, filename string) (file vfs.File, err error) {
ctx, span := tracer.Start(ctx, "rawOpen", ctx, span := tracer.Start(ctx, "rawOpen",
fs.traceAttrs(attribute.String("filename", filename)), fs.traceAttrs(attribute.String("filename", filename)),
) )
@ -221,7 +210,7 @@ func (fs *TorrentFS) rawOpen(ctx context.Context, filename string) (file File, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
file, err = getFile(files, filename) file, err = vfs.GetFile(files, filename)
return file, err return file, err
} }
@ -236,7 +225,7 @@ func (fs *TorrentFS) rawStat(ctx context.Context, filename string) (fs.FileInfo,
return nil, err return nil, err
} }
file, err := getFile(files, filename) file, err := vfs.GetFile(files, filename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -258,11 +247,11 @@ func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e
) )
defer span.End() defer span.End()
if isRoot(filename) { if vfs.IsRoot(filename) {
return tfs, nil return tfs, nil
} }
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen) fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -287,17 +276,17 @@ func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e
return tfs.rawStat(ctx, fsPath) return tfs.rawStat(ctx, fsPath)
} }
func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file File, err error) { func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file vfs.File, err error) {
ctx, span := tracer.Start(ctx, "Open", ctx, span := tracer.Start(ctx, "Open",
tfs.traceAttrs(attribute.String("filename", filename)), tfs.traceAttrs(attribute.String("filename", filename)),
) )
defer span.End() defer span.End()
if isRoot(filename) { if vfs.IsRoot(filename) {
return newDirFile(tfs.name), nil return vfs.NewDirFile(tfs.name), nil
} }
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen) fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, filename, tfs.rawOpen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -328,7 +317,7 @@ func (tfs *TorrentFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry,
) )
defer span.End() defer span.End()
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, name, tfs.rawOpen) fsPath, nestedFs, nestedFsPath, err := tfs.resolver.ResolvePath(ctx, name, tfs.rawOpen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -354,7 +343,7 @@ func (tfs *TorrentFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry,
return nil, err return nil, err
} }
return listDirFromFiles(files, fsPath) return vfs.ListDirFromFiles(files, fsPath)
} }
func (fs *TorrentFS) Unlink(ctx context.Context, name string) error { func (fs *TorrentFS) Unlink(ctx context.Context, name string) error {
@ -363,7 +352,7 @@ func (fs *TorrentFS) Unlink(ctx context.Context, name string) error {
) )
defer span.End() defer span.End()
name = AbsPath(name) name = vfs.AbsPath(name)
fs.mu.Lock() fs.mu.Lock()
defer fs.mu.Unlock() defer fs.mu.Unlock()
@ -374,7 +363,7 @@ func (fs *TorrentFS) Unlink(ctx context.Context, name string) error {
} }
if !slices.Contains(maps.Keys(files), name) { if !slices.Contains(maps.Keys(files), name) {
return ErrNotExist return vfs.ErrNotExist
} }
file := files[name] file := files[name]
@ -382,13 +371,13 @@ func (fs *TorrentFS) Unlink(ctx context.Context, name string) error {
tfile, ok := file.(*torrentFile) tfile, ok := file.(*torrentFile)
if !ok { if !ok {
return ErrNotImplemented return vfs.ErrNotImplemented
} }
return fs.Torrent.ExcludeFile(ctx, tfile.file) return fs.Torrent.ExcludeFile(ctx, tfile.file)
} }
var _ File = (*torrentFile)(nil) var _ vfs.File = (*torrentFile)(nil)
type torrentFile struct { type torrentFile struct {
name string name string
@ -437,11 +426,11 @@ func (tf *torrentFile) Name() string {
// Type implements File. // Type implements File.
func (tf *torrentFile) Type() fs.FileMode { func (tf *torrentFile) Type() fs.FileMode {
return roMode | fs.ModeDir return vfs.ROMode | fs.ModeDir
} }
func (tf *torrentFile) Info() (fs.FileInfo, error) { func (tf *torrentFile) Info() (fs.FileInfo, error) {
return newFileInfo(tf.name, tf.file.Length()), nil return vfs.NewFileInfo(tf.name, tf.file.Length()), nil
} }
func (tf *torrentFile) Size() int64 { func (tf *torrentFile) Size() int64 {

View file

@ -1,4 +1,4 @@
package vfs package torrent
import ( import (
"os" "os"

View file

@ -1,7 +1,8 @@
package store package torrent
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"path/filepath" "path/filepath"
@ -12,11 +13,13 @@ import (
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
) )
type InfoBytes struct { var errNotFound = errors.New("not found")
type infoBytesStore struct {
db *badger.DB db *badger.DB
} }
func NewInfoBytes(metaDir string) (*InfoBytes, error) { func newInfoBytesStore(metaDir string) (*infoBytesStore, error) {
l := slog.With("component", "badger", "db", "info-bytes") l := slog.With("component", "badger", "db", "info-bytes")
opts := badger. opts := badger.
@ -26,16 +29,16 @@ func NewInfoBytes(metaDir string) (*InfoBytes, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &InfoBytes{db}, nil return &infoBytesStore{db}, nil
} }
func (k *InfoBytes) GetBytes(ih infohash.T) ([]byte, error) { func (k *infoBytesStore) GetBytes(ih infohash.T) ([]byte, error) {
var data []byte var data []byte
err := k.db.View(func(tx *badger.Txn) error { err := k.db.View(func(tx *badger.Txn) error {
item, err := tx.Get(ih.Bytes()) item, err := tx.Get(ih.Bytes())
if err != nil { if err != nil {
if err == badger.ErrKeyNotFound { if err == badger.ErrKeyNotFound {
return ErrNotFound return errNotFound
} }
return fmt.Errorf("error getting value: %w", err) return fmt.Errorf("error getting value: %w", err)
@ -47,7 +50,7 @@ func (k *InfoBytes) GetBytes(ih infohash.T) ([]byte, error) {
return data, err return data, err
} }
func (k *InfoBytes) Get(ih infohash.T) (*metainfo.MetaInfo, error) { func (k *infoBytesStore) Get(ih infohash.T) (*metainfo.MetaInfo, error) {
data, err := k.GetBytes(ih) data, err := k.GetBytes(ih)
if err != nil { if err != nil {
return nil, err return nil, err
@ -56,7 +59,7 @@ func (k *InfoBytes) Get(ih infohash.T) (*metainfo.MetaInfo, error) {
return metainfo.Load(bytes.NewReader(data)) return metainfo.Load(bytes.NewReader(data))
} }
func (me *InfoBytes) SetBytes(ih infohash.T, data []byte) error { func (me *infoBytesStore) SetBytes(ih infohash.T, data []byte) error {
return me.db.Update(func(txn *badger.Txn) error { return me.db.Update(func(txn *badger.Txn) error {
item, err := txn.Get(ih.Bytes()) item, err := txn.Get(ih.Bytes())
if err != nil { if err != nil {
@ -75,16 +78,16 @@ func (me *InfoBytes) SetBytes(ih infohash.T, data []byte) error {
}) })
} }
func (me *InfoBytes) Set(ih infohash.T, info metainfo.MetaInfo) error { func (me *infoBytesStore) Set(ih infohash.T, info metainfo.MetaInfo) error {
return me.SetBytes(ih, info.InfoBytes) return me.SetBytes(ih, info.InfoBytes)
} }
func (k *InfoBytes) Delete(ih infohash.T) error { func (k *infoBytesStore) Delete(ih infohash.T) error {
return k.db.Update(func(txn *badger.Txn) error { return k.db.Update(func(txn *badger.Txn) error {
return txn.Delete(ih.Bytes()) return txn.Delete(ih.Bytes())
}) })
} }
func (me *InfoBytes) Close() error { func (me *infoBytesStore) Close() error {
return me.db.Close() return me.db.Close()
} }

View file

@ -1,4 +1,4 @@
package datastorage package torrent
import ( import (
"context" "context"
@ -7,8 +7,7 @@ import (
"os" "os"
"path" "path"
"git.kmsign.ru/royalcat/tstor/src/host/controller" atorrent "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash" "github.com/anacrolix/torrent/types/infohash"
@ -81,17 +80,17 @@ func (p *PieceStorage) Close() error {
} }
// DeleteFile implements FileStorageDeleter. // DeleteFile implements FileStorageDeleter.
func (p *PieceStorage) DeleteFile(file *torrent.File) error { func (p *PieceStorage) DeleteFile(file *atorrent.File) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
// CleanupDirs implements DataStorage. // CleanupDirs implements DataStorage.
func (p *PieceStorage) CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) { func (p *PieceStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) (int, error) {
return 0, nil // TODO return 0, nil // TODO
} }
// CleanupFiles implements DataStorage. // CleanupFiles implements DataStorage.
func (p *PieceStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) { func (p *PieceStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) (int, error) {
return 0, nil // TODO return 0, nil // TODO
} }

View file

@ -1,22 +1,21 @@
package service package torrent
import ( import (
"context" "context"
"fmt" "fmt"
"git.kmsign.ru/royalcat/tstor/pkg/uuid" "git.kmsign.ru/royalcat/tstor/pkg/uuid"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types/infohash" "github.com/anacrolix/torrent/types/infohash"
) )
type TorrentDownloadTask struct { type DownloadTask struct {
ID uuid.UUID ID uuid.UUID
InfoHash infohash.T InfoHash infohash.T
File string File string
} }
func (s *Service) Download(ctx context.Context, task *TorrentDownloadTask) error { func (s *Service) Download(ctx context.Context, task *DownloadTask) error {
t, ok := s.client.Torrent(task.InfoHash) t, ok := s.client.Torrent(task.InfoHash)
if !ok { if !ok {
return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString()) return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString())
@ -97,7 +96,7 @@ func (s *Service) Download(ctx context.Context, task *TorrentDownloadTask) error
// } // }
type TorrentProgress struct { type TorrentProgress struct {
Torrent *controller.Torrent Torrent *Controller
Current int64 Current int64
Total int64 Total int64
} }
@ -113,7 +112,7 @@ func (s *Service) DownloadProgress(ctx context.Context) (<-chan TorrentProgress,
defer close(out) defer close(out)
for _, t := range torrents { for _, t := range torrents {
sub := t.Torrent().SubscribePieceStateChanges() sub := t.Torrent().SubscribePieceStateChanges()
go func(t *controller.Torrent) { go func(t *Controller) {
for stateChange := range sub.Values { for stateChange := range sub.Values {
if !stateChange.Complete && !stateChange.Partial { if !stateChange.Complete && !stateChange.Partial {
continue continue

View file

@ -1,4 +1,4 @@
package service package torrent
import ( import (
"bufio" "bufio"
@ -16,8 +16,6 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"git.kmsign.ru/royalcat/tstor/src/host/datastorage"
"git.kmsign.ru/royalcat/tstor/src/host/store" "git.kmsign.ru/royalcat/tstor/src/host/store"
"git.kmsign.ru/royalcat/tstor/src/host/tkv" "git.kmsign.ru/royalcat/tstor/src/host/tkv"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
@ -35,7 +33,7 @@ import (
"github.com/royalcat/kv" "github.com/royalcat/kv"
) )
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/service") var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/host/torrent")
type DirAquire struct { type DirAquire struct {
Name string Name string
@ -44,9 +42,9 @@ type DirAquire struct {
type Service struct { type Service struct {
client *torrent.Client client *torrent.Client
excludedFiles *store.FilesMappings excludedFiles *filesMappingsStore
infoBytes *store.InfoBytes infoBytes *infoBytesStore
Storage *datastorage.DataStorage Storage *DataStorage
fis *store.FileItemStore fis *store.FileItemStore
dirsAquire kv.Store[string, DirAquire] dirsAquire kv.Store[string, DirAquire]
@ -58,7 +56,7 @@ type Service struct {
log *rlog.Logger log *rlog.Logger
} }
func New(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, error) { func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, error) {
s := &Service{ s := &Service{
log: rlog.Component("torrent-service"), log: rlog.Component("torrent-service"),
sourceFs: sourceFs, sourceFs: sourceFs,
@ -76,17 +74,17 @@ func New(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, error)
return nil, fmt.Errorf("error starting item store: %w", err) return nil, fmt.Errorf("error starting item store: %w", err)
} }
s.Storage, _, err = datastorage.Setup(conf) s.Storage, _, err = setupStorage(conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.excludedFiles, err = store.NewFileMappings(conf.MetadataFolder, s.Storage) s.excludedFiles, err = newFileMappingsStore(conf.MetadataFolder, s.Storage)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.infoBytes, err = store.NewInfoBytes(conf.MetadataFolder) s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -132,7 +130,7 @@ func (s *Service) Close(ctx context.Context) error {
)...) )...)
} }
func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) { func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
ctx, span := tracer.Start(ctx, "LoadTorrent") ctx, span := tracer.Start(ctx, "LoadTorrent")
defer span.End() defer span.End()
log := s.log log := s.log
@ -171,7 +169,7 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent
if len(infoBytes) == 0 { if len(infoBytes) == 0 {
log.Info(ctx, "no info loaded from file, try to load from cache") log.Info(ctx, "no info loaded from file, try to load from cache")
infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash) infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash)
if err != nil && err != store.ErrNotFound { if err != nil && err != errNotFound {
return nil, fmt.Errorf("get info bytes from database: %w", err) return nil, fmt.Errorf("get info bytes from database: %w", err)
} }
} }
@ -218,7 +216,7 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent
} }
} }
return t, nil return newController(t, s.excludedFiles), nil
} }
func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, info metainfo.Info) (compatable bool, tryLater bool, err error) { func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, info metainfo.Info) (compatable bool, tryLater bool, err error) {
@ -335,38 +333,12 @@ func (s *Service) checkTorrentFilesCompatable(ctx context.Context, aq DirAquire,
return true return true
} }
// func (s *Service) getTorrentsByName(name string) []*torrent.Torrent {
// out := []*torrent.Torrent{}
// for _, t := range s.c.Torrents() {
// if t.Name() == name {
// out = append(out, t)
// }
// }
// return out
// }
func isValidInfoHashBytes(d []byte) bool { func isValidInfoHashBytes(d []byte) bool {
var info metainfo.Info var info metainfo.Info
err := bencode.Unmarshal(d, &info) err := bencode.Unmarshal(d, &info)
return err == nil return err == nil
} }
func (s *Service) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
defer f.Close(ctx)
info, err := f.Info()
if err != nil {
return nil, err
}
t, err := s.LoadTorrent(ctx, f)
if err != nil {
return nil, err
}
return vfs.NewTorrentFs(info.Name(), controller.NewTorrent(t, s.excludedFiles)), nil
}
func (s *Service) Stats() (*Stats, error) { func (s *Service) Stats() (*Stats, error) {
return &Stats{}, nil return &Stats{}, nil
} }
@ -435,17 +407,17 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
}) })
} }
func (s *Service) ListTorrents(ctx context.Context) ([]*controller.Torrent, error) { func (s *Service) ListTorrents(ctx context.Context) ([]*Controller, error) {
<-s.torrentLoaded <-s.torrentLoaded
out := []*controller.Torrent{} out := []*Controller{}
for _, v := range s.client.Torrents() { for _, v := range s.client.Torrents() {
out = append(out, controller.NewTorrent(v, s.excludedFiles)) out = append(out, newController(v, s.excludedFiles))
} }
return out, nil return out, nil
} }
func (s *Service) GetTorrent(infohashHex string) (*controller.Torrent, error) { func (s *Service) GetTorrent(infohashHex string) (*Controller, error) {
<-s.torrentLoaded <-s.torrentLoaded
t, ok := s.client.Torrent(infohash.FromHexString(infohashHex)) t, ok := s.client.Torrent(infohash.FromHexString(infohashHex))
@ -453,7 +425,7 @@ func (s *Service) GetTorrent(infohashHex string) (*controller.Torrent, error) {
return nil, nil return nil, nil
} }
return controller.NewTorrent(t, s.excludedFiles), nil return newController(t, s.excludedFiles), nil
} }
func slicesUnique[S ~[]E, E comparable](in S) S { func slicesUnique[S ~[]E, E comparable](in S) S {

View file

@ -1,4 +1,4 @@
package datastorage package torrent
import ( import (
"fmt" "fmt"
@ -10,7 +10,7 @@ import (
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
func Setup(cfg config.TorrentClient) (*DataStorage, storage.PieceCompletion, error) { func setupStorage(cfg config.TorrentClient) (*DataStorage, storage.PieceCompletion, error) {
pcp := filepath.Join(cfg.MetadataFolder, "piece-completion") pcp := filepath.Join(cfg.MetadataFolder, "piece-completion")
if err := os.MkdirAll(pcp, 0744); err != nil { if err := os.MkdirAll(pcp, 0744); err != nil {
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err) return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)

View file

@ -1,4 +1,4 @@
package service package torrent
import ( import (
"errors" "errors"

View file

@ -1,4 +1,4 @@
package datastorage package torrent
import ( import (
"context" "context"
@ -12,27 +12,16 @@ import (
"path/filepath" "path/filepath"
"slices" "slices"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
// type DataStorage interface {
// storage.ClientImplCloser
// DeleteFile(file *torrent.File) error
// CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
// CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
// }
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/host/datastorage")
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem. // NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage { func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage {
return &DataStorage{ return &DataStorage{
@ -100,7 +89,7 @@ func (fs *DataStorage) DeleteFile(file *torrent.File) error {
return os.Remove(filePath) return os.Remove(filePath)
} }
func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) ([]string, error) { func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) {
log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun) log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun)
expectedEntries := []string{} expectedEntries := []string{}
@ -139,7 +128,7 @@ func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*controller.T
return toDelete, nil return toDelete, nil
} }
func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) ([]string, error) { func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) {
log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun) log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun)
expectedEntries := []string{} expectedEntries := []string{}

View file

@ -122,11 +122,11 @@ func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error {
} }
func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) { func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) {
return getFile(a.files, filename) return GetFile(a.files, filename)
} }
func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
return listDirFromFiles(a.files, path) return ListDirFromFiles(a.files, path)
} }
// Stat implements Filesystem. // Stat implements Filesystem.
@ -198,11 +198,11 @@ func (d *archiveFile) Name() string {
// Type implements File. // Type implements File.
func (d *archiveFile) Type() fs.FileMode { func (d *archiveFile) Type() fs.FileMode {
return roMode return ROMode
} }
func (d *archiveFile) Info() (fs.FileInfo, error) { func (d *archiveFile) Info() (fs.FileInfo, error) {
return newFileInfo(d.name, d.size), nil return NewFileInfo(d.name, d.size), nil
} }
func (d *archiveFile) Size() int64 { func (d *archiveFile) Size() int64 {

View file

@ -8,7 +8,7 @@ import (
var _ File = &dirFile{} var _ File = &dirFile{}
func newDirFile(name string) File { func NewDirFile(name string) File {
return &dirFile{ return &dirFile{
name: path.Base(name), name: path.Base(name),
} }
@ -55,5 +55,5 @@ func (d *dirFile) Size() int64 {
// Type implements File. // Type implements File.
func (d *dirFile) Type() fs.FileMode { func (d *dirFile) Type() fs.FileMode {
return roMode | fs.ModeDir return ROMode | fs.ModeDir
} }

View file

@ -41,7 +41,7 @@ func (d *DummyFs) FsName() string {
// Stat implements Filesystem. // Stat implements Filesystem.
func (*DummyFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { func (*DummyFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
return newFileInfo(path.Base(filename), 0), nil // TODO return NewFileInfo(path.Base(filename), 0), nil // TODO
} }
func (d *DummyFs) Open(ctx context.Context, filename string) (File, error) { func (d *DummyFs) Open(ctx context.Context, filename string) (File, error) {
@ -55,8 +55,8 @@ func (d *DummyFs) Unlink(ctx context.Context, filename string) error {
func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
if path == "/dir/here" { if path == "/dir/here" {
return []fs.DirEntry{ return []fs.DirEntry{
newFileInfo("file1.txt", 0), NewFileInfo("file1.txt", 0),
newFileInfo("file2.txt", 0), NewFileInfo("file2.txt", 0),
}, nil }, nil
} }
@ -101,7 +101,7 @@ func (d *DummyFile) Type() fs.FileMode {
// Stat implements File. // Stat implements File.
func (d *DummyFile) Info() (fs.FileInfo, error) { func (d *DummyFile) Info() (fs.FileInfo, error) {
return newFileInfo(d.name, 0), nil return NewFileInfo(d.name, 0), nil
} }
func (d *DummyFile) Size() int64 { func (d *DummyFile) Size() int64 {

View file

@ -44,7 +44,7 @@ type Filesystem interface {
} }
// readonly // readonly
const roMode = fs.FileMode(0555) const ROMode = fs.FileMode(0555)
type fileInfo struct { type fileInfo struct {
name string name string
@ -63,7 +63,7 @@ func newDirInfo(name string) *fileInfo {
} }
} }
func newFileInfo(name string, size int64) *fileInfo { func NewFileInfo(name string, size int64) *fileInfo {
return &fileInfo{ return &fileInfo{
name: path.Base(name), name: path.Base(name),
size: size, size: size,
@ -93,10 +93,10 @@ func (fi *fileInfo) Size() int64 {
func (fi *fileInfo) Mode() fs.FileMode { func (fi *fileInfo) Mode() fs.FileMode {
if fi.isDir { if fi.isDir {
return roMode | fs.ModeDir return ROMode | fs.ModeDir
} }
return roMode return ROMode
} }
func (fi *fileInfo) ModTime() time.Time { func (fi *fileInfo) ModTime() time.Time {

View file

@ -12,7 +12,7 @@ func TestFileinfo(t *testing.T) {
require := require.New(t) require := require.New(t)
fi := newFileInfo("abc/name", 42) fi := NewFileInfo("abc/name", 42)
require.Equal("name", fi.Name()) require.Equal("name", fi.Name())
require.False(fi.IsDir()) require.False(fi.IsDir())
@ -37,7 +37,7 @@ func TestDirInfo(t *testing.T) {
require.NotNil(fi.ModTime()) require.NotNil(fi.ModTime())
require.NotZero(fi.Type() & fs.ModeDir) require.NotZero(fi.Type() & fs.ModeDir)
require.NotZero(fi.Mode() & fs.ModeDir) require.NotZero(fi.Mode() & fs.ModeDir)
require.Equal(roMode|fs.ModeDir, fi.Mode()) require.Equal(ROMode|fs.ModeDir, fi.Mode())
require.Nil(fi.Sys()) require.Nil(fi.Sys())
} }

View file

@ -68,11 +68,11 @@ func NewMemoryFS(name string, files map[string]*MemoryFile) *MemoryFs {
} }
func (m *MemoryFs) Open(ctx context.Context, filename string) (File, error) { func (m *MemoryFs) Open(ctx context.Context, filename string) (File, error) {
return getFile(m.files, filename) return GetFile(m.files, filename)
} }
func (fs *MemoryFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { func (fs *MemoryFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
return listDirFromFiles(fs.files, path) return ListDirFromFiles(fs.files, path)
} }
// Stat implements Filesystem. // Stat implements Filesystem.
@ -81,7 +81,7 @@ func (mfs *MemoryFs) Stat(ctx context.Context, filename string) (fs.FileInfo, er
if !ok { if !ok {
return nil, ErrNotExist return nil, ErrNotExist
} }
return newFileInfo(path.Base(filename), file.Size()), nil return NewFileInfo(path.Base(filename), file.Size()), nil
} }
// Unlink implements Filesystem. // Unlink implements Filesystem.
@ -110,11 +110,11 @@ func (d *MemoryFile) Name() string {
// Type implements File. // Type implements File.
func (d *MemoryFile) Type() fs.FileMode { func (d *MemoryFile) Type() fs.FileMode {
return roMode return ROMode
} }
func (d *MemoryFile) Info() (fs.FileInfo, error) { func (d *MemoryFile) Info() (fs.FileInfo, error) {
return newFileInfo(d.name, int64(d.data.Len())), nil return NewFileInfo(d.name, int64(d.data.Len())), nil
} }
func (d *MemoryFile) Size() int64 { func (d *MemoryFile) Size() int64 {

View file

@ -34,8 +34,8 @@ func (fs *OsFS) Unlink(ctx context.Context, filename string) error {
// Open implements Filesystem. // Open implements Filesystem.
func (fs *OsFS) Open(ctx context.Context, filename string) (File, error) { func (fs *OsFS) Open(ctx context.Context, filename string) (File, error) {
if isRoot(filename) { if IsRoot(filename) {
return newDirFile(fs.Name()), nil return NewDirFile(fs.Name()), nil
} }
return NewLazyOsFile(path.Join(fs.hostDir, filename)) return NewLazyOsFile(path.Join(fs.hostDir, filename))

View file

@ -21,7 +21,7 @@ import (
type ResolverFS struct { type ResolverFS struct {
rootFS Filesystem rootFS Filesystem
resolver *resolver resolver *Resolver
log *rlog.Logger log *rlog.Logger
} }
@ -29,7 +29,7 @@ type ResolverFS struct {
func NewResolveFS(rootFs Filesystem, factories map[string]FsFactory) *ResolverFS { func NewResolveFS(rootFs Filesystem, factories map[string]FsFactory) *ResolverFS {
return &ResolverFS{ return &ResolverFS{
rootFS: rootFs, rootFS: rootFs,
resolver: newResolver(factories), resolver: NewResolver(factories),
log: rlog.Component("fs.resolverfs"), log: rlog.Component("fs.resolverfs"),
} }
} }
@ -77,10 +77,10 @@ func (r *ResolverFS) Open(ctx context.Context, filename string) (File, error) {
defer span.End() defer span.End()
if path.Clean(filename) == Separator { if path.Clean(filename) == Separator {
return newDirFile(r.Name()), nil return NewDirFile(r.Name()), nil
} }
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open) fsPath, nestedFs, nestedFsPath, err := r.resolver.ResolvePath(ctx, filename, r.rootFS.Open)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,7 +98,7 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
) )
defer span.End() defer span.End()
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, dir, r.rootFS.Open) fsPath, nestedFs, nestedFsPath, err := r.resolver.ResolvePath(ctx, dir, r.rootFS.Open)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -112,7 +112,7 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
} }
out := make([]fs.DirEntry, 0, len(entries)) out := make([]fs.DirEntry, 0, len(entries))
for _, e := range entries { for _, e := range entries {
if r.resolver.isNestedFs(e.Name()) { if r.resolver.IsNestedFs(e.Name()) {
filepath := path.Join("/", dir, e.Name()) filepath := path.Join("/", dir, e.Name())
file, err := r.Open(ctx, filepath) file, err := r.Open(ctx, filepath)
if err != nil { if err != nil {
@ -123,7 +123,7 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
err = func() error { err = func() error {
factoryCtx, cancel := subTimeout(ctx) factoryCtx, cancel := subTimeout(ctx)
defer cancel() defer cancel()
nestedfs, err := r.resolver.nestedFs(factoryCtx, filepath, file) nestedfs, err := r.resolver.NestedFs(factoryCtx, filepath, file)
if err != nil { if err != nil {
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {
r.log.Error(ctx, "creating fs timed out", r.log.Error(ctx, "creating fs timed out",
@ -155,11 +155,11 @@ func (r *ResolverFS) Stat(ctx context.Context, filename string) (fs.FileInfo, er
) )
defer span.End() defer span.End()
if isRoot(filename) { if IsRoot(filename) {
return r, nil return r, nil
} }
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open) fsPath, nestedFs, nestedFsPath, err := r.resolver.ResolvePath(ctx, filename, r.rootFS.Open)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -175,7 +175,7 @@ func (r *ResolverFS) Stat(ctx context.Context, filename string) (fs.FileInfo, er
// Unlink implements Filesystem. // Unlink implements Filesystem.
func (r *ResolverFS) Unlink(ctx context.Context, filename string) error { func (r *ResolverFS) Unlink(ctx context.Context, filename string) error {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open) fsPath, nestedFs, nestedFsPath, err := r.resolver.ResolvePath(ctx, filename, r.rootFS.Open)
if err != nil { if err != nil {
return err return err
} }
@ -210,14 +210,14 @@ var _ Filesystem = &ResolverFS{}
type FsFactory func(ctx context.Context, f File) (Filesystem, error) type FsFactory func(ctx context.Context, f File) (Filesystem, error)
func newResolver(factories map[string]FsFactory) *resolver { func NewResolver(factories map[string]FsFactory) *Resolver {
return &resolver{ return &Resolver{
factories: factories, factories: factories,
fsmap: map[string]Filesystem{}, fsmap: map[string]Filesystem{},
} }
} }
type resolver struct { type Resolver struct {
m sync.Mutex m sync.Mutex
factories map[string]FsFactory factories map[string]FsFactory
fsmap map[string]Filesystem // filesystem cache fsmap map[string]Filesystem // filesystem cache
@ -226,7 +226,7 @@ type resolver struct {
type openFile func(ctx context.Context, path string) (File, error) type openFile func(ctx context.Context, path string) (File, error)
func (r *resolver) isNestedFs(f string) bool { func (r *Resolver) IsNestedFs(f string) bool {
for ext := range r.factories { for ext := range r.factories {
if strings.HasSuffix(f, ext) { if strings.HasSuffix(f, ext) {
return true return true
@ -235,7 +235,7 @@ func (r *resolver) isNestedFs(f string) bool {
return false return false
} }
func (r *resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) { func (r *Resolver) NestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) {
for ext, nestFactory := range r.factories { for ext, nestFactory := range r.factories {
if !strings.HasSuffix(fsPath, ext) { if !strings.HasSuffix(fsPath, ext) {
continue continue
@ -258,7 +258,7 @@ func (r *resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
} }
// open requeue raw open, without resolver call // open requeue raw open, without resolver call
func (r *resolver) resolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) { func (r *Resolver) ResolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) {
ctx, span := tracer.Start(ctx, "resolvePath") ctx, span := tracer.Start(ctx, "resolvePath")
defer span.End() defer span.End()
@ -321,9 +321,9 @@ PARTS_LOOP:
var ErrNotExist = fs.ErrNotExist var ErrNotExist = fs.ErrNotExist
func getFile[F File](m map[string]F, name string) (File, error) { func GetFile[F File](m map[string]F, name string) (File, error) {
if name == Separator { if name == Separator {
return newDirFile(name), nil return NewDirFile(name), nil
} }
f, ok := m[name] f, ok := m[name]
@ -333,21 +333,21 @@ func getFile[F File](m map[string]F, name string) (File, error) {
for p := range m { for p := range m {
if strings.HasPrefix(p, name) { if strings.HasPrefix(p, name) {
return newDirFile(name), nil return NewDirFile(name), nil
} }
} }
return nil, ErrNotExist return nil, ErrNotExist
} }
func listDirFromFiles[F File](m map[string]F, name string) ([]fs.DirEntry, error) { func ListDirFromFiles[F File](m map[string]F, name string) ([]fs.DirEntry, error) {
out := make([]fs.DirEntry, 0, len(m)) out := make([]fs.DirEntry, 0, len(m))
name = AddTrailSlash(path.Clean(name)) name = AddTrailSlash(path.Clean(name))
for p, f := range m { for p, f := range m {
if strings.HasPrefix(p, name) { if strings.HasPrefix(p, name) {
parts := strings.Split(trimRelPath(p, name), Separator) parts := strings.Split(trimRelPath(p, name), Separator)
if len(parts) == 1 { if len(parts) == 1 {
out = append(out, newFileInfo(parts[0], f.Size())) out = append(out, NewFileInfo(parts[0], f.Size()))
} else { } else {
out = append(out, newDirInfo(parts[0])) out = append(out, newDirInfo(parts[0]))
} }

View file

@ -10,7 +10,7 @@ import (
const Separator = "/" const Separator = "/"
func isRoot(filename string) bool { func IsRoot(filename string) bool {
return path.Clean(filename) == Separator return path.Clean(filename) == Separator
} }