This commit is contained in:
royalcat 2024-03-18 00:00:34 +03:00
parent 35913e0190
commit 6a1e338af4
34 changed files with 1900 additions and 355 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,9 @@
package model
import "github.com/anacrolix/torrent"
import (
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent"
)
func MapPeerSource(source torrent.PeerSource) string {
switch source {
@ -22,3 +25,13 @@ func MapPeerSource(source torrent.PeerSource) string {
return "Unknown"
}
}
func MapTorrent(t *controller.Torrent) *Torrent {
return &Torrent{
Infohash: t.InfoHash(),
Name: t.Name(),
BytesCompleted: t.BytesCompleted(),
BytesMissing: t.BytesMissing(),
T: t,
}
}

View file

@ -9,6 +9,12 @@ import (
"github.com/anacrolix/torrent"
)
type Progress interface {
IsProgress()
GetCurrent() int64
GetTotal() int64
}
type BooleanFilter struct {
Eq *bool `json:"eq,omitempty"`
}
@ -21,6 +27,10 @@ type DateTimeFilter struct {
Lte *time.Time `json:"lte,omitempty"`
}
type DownloadTorrentResponse struct {
Task *Task `json:"task,omitempty"`
}
type IntFilter struct {
Eq *int64 `json:"eq,omitempty"`
Gt *int64 `json:"gt,omitempty"`
@ -52,6 +62,13 @@ type StringFilter struct {
In []string `json:"in,omitempty"`
}
type Subscription struct {
}
type Task struct {
ID string `json:"id"`
}
type Torrent struct {
Name string `json:"name"`
Infohash string `json:"infohash"`
@ -85,6 +102,16 @@ type TorrentPeer struct {
F *torrent.PeerConn `json:"-"`
}
type TorrentProgress struct {
Torrent *Torrent `json:"torrent"`
Current int64 `json:"current"`
Total int64 `json:"total"`
}
func (TorrentProgress) IsProgress() {}
func (this TorrentProgress) GetCurrent() int64 { return this.Current }
func (this TorrentProgress) GetTotal() int64 { return this.Total }
type TorrentsFilter struct {
Name *StringFilter `json:"name,omitempty"`
BytesCompleted *IntFilter `json:"bytesCompleted,omitempty"`

View file

@ -7,8 +7,11 @@ package resolver
import (
"context"
"git.kmsign.ru/royalcat/tstor/pkg/uuid"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
"git.kmsign.ru/royalcat/tstor/src/host/service"
aih "github.com/anacrolix/torrent/types/infohash"
)
// ValidateTorrents is the resolver for the validateTorrents field.
@ -58,6 +61,25 @@ func (r *mutationResolver) CleanupTorrents(ctx context.Context, files *bool, dry
}
}
// DownloadTorrent is the resolver for the downloadTorrent field.
func (r *mutationResolver) DownloadTorrent(ctx context.Context, infohash string, file *string) (*model.DownloadTorrentResponse, error) {
f := ""
if file != nil {
f = *file
}
err := r.Service.Download(ctx, &service.TorrentDownloadTask{
ID: uuid.New(),
InfoHash: aih.FromHexString(infohash),
File: f,
})
if err != nil {
return nil, err
}
return &model.DownloadTorrentResponse{}, nil
}
// Mutation returns graph.MutationResolver implementation.
func (r *Resolver) Mutation() graph.MutationResolver { return &mutationResolver{r} }

View file

@ -52,13 +52,7 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt
tr := []*model.Torrent{}
for _, t := range torrents {
d := &model.Torrent{
Infohash: t.InfoHash(),
Name: t.Name(),
BytesCompleted: t.BytesCompleted(),
BytesMissing: t.BytesMissing(),
T: t,
}
d := model.MapTorrent(t)
if !filterFunc(d) {
continue

View file

@ -0,0 +1,54 @@
package resolver
// This file will be automatically regenerated based on the schema, any resolver implementations
// will be copied through when generating and any unknown code will be moved to the end.
// Code generated by github.com/99designs/gqlgen version v0.17.43
import (
"context"
"fmt"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
)
// TaskProgress is the resolver for the taskProgress field.
func (r *subscriptionResolver) TaskProgress(ctx context.Context, taskID string) (<-chan model.Progress, error) {
panic(fmt.Errorf("not implemented: TaskProgress - taskProgress"))
}
// TorrentDownloadUpdates is the resolver for the torrentDownloadUpdates field.
func (r *subscriptionResolver) TorrentDownloadUpdates(ctx context.Context) (<-chan *model.TorrentProgress, error) {
out := make(chan *model.TorrentProgress)
progress, err := r.Service.DownloadProgress(ctx)
if err != nil {
return nil, err
}
go func() {
defer close(out)
for p := range progress {
if p.Torrent == nil {
fmt.Println("nil torrent")
continue
}
po := &model.TorrentProgress{
Torrent: model.MapTorrent(p.Torrent),
Current: p.Current,
Total: p.Total,
}
select {
case <-ctx.Done():
return
case out <- po:
}
}
}()
return out, nil
}
// Subscription returns graph.SubscriptionResolver implementation.
func (r *Resolver) Subscription() graph.SubscriptionResolver { return &subscriptionResolver{r} }
type subscriptionResolver struct{ *Resolver }

View file

@ -19,7 +19,7 @@ func (r *torrentResolver) Name(ctx context.Context, obj *model.Torrent) (string,
// Files is the resolver for the files field.
func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) {
out := []*model.TorrentFile{}
files, err := obj.T.Files()
files, err := obj.T.Files(ctx)
if err != nil {
return nil, err
}
@ -37,17 +37,17 @@ func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*mod
// ExcludedFiles is the resolver for the excludedFiles field.
func (r *torrentResolver) ExcludedFiles(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) {
out := []*model.TorrentFile{}
files, err := obj.T.ExcludedFiles()
if err != nil {
return nil, err
}
for _, f := range files {
out = append(out, &model.TorrentFile{
Filename: f.DisplayPath(),
Size: f.Length(),
F: f,
})
}
// files, err := obj.T.ExcludedFiles()
// if err != nil {
// return nil, err
// }
// for _, f := range files {
// out = append(out, &model.TorrentFile{
// Filename: f.DisplayPath(),
// Size: f.Length(),
// F: f,
// })
// }
return out, nil
}

197
src/export/nfs/cache.go Normal file
View file

@ -0,0 +1,197 @@
package nfs
import (
"crypto/sha256"
"encoding/binary"
"io/fs"
"reflect"
"slices"
"github.com/willscott/go-nfs"
"github.com/go-git/go-billy/v5"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
)
// NewCachingHandler wraps a handler to provide a basic to/from-file handle cache.
func NewCachingHandler(h nfs.Handler, limit int) nfs.Handler {
return NewCachingHandlerWithVerifierLimit(h, limit, limit)
}
// NewCachingHandlerWithVerifierLimit provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
func NewCachingHandlerWithVerifierLimit(h nfs.Handler, limit int, verifierLimit int) nfs.Handler {
if limit < 2 || verifierLimit < 2 {
nfs.Log.Warnf("Caching handler created with insufficient cache to support directory listing", "size", limit, "verifiers", verifierLimit)
}
cache, _ := lru.New[uuid.UUID, entry](limit)
reverseCache := make(map[string][]uuid.UUID)
verifiers, _ := lru.New[uint64, verifier](verifierLimit)
return &CachingHandler{
Handler: h,
activeHandles: cache,
reverseHandles: reverseCache,
activeVerifiers: verifiers,
cacheLimit: limit,
}
}
// CachingHandler implements to/from handle via an LRU cache.
type CachingHandler struct {
nfs.Handler
activeHandles *lru.Cache[uuid.UUID, entry]
reverseHandles map[string][]uuid.UUID
activeVerifiers *lru.Cache[uint64, verifier]
cacheLimit int
}
type entry struct {
f billy.Filesystem
p []string
}
// ToHandle takes a file and represents it with an opaque handle to reference it.
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
// but we can generalize with a stateful local cache of handed out IDs.
func (c *CachingHandler) ToHandle(f billy.Filesystem, path []string) []byte {
joinedPath := f.Join(path...)
if handle := c.searchReverseCache(f, joinedPath); handle != nil {
return handle
}
id := uuid.New()
newPath := make([]string, len(path))
copy(newPath, path)
evictedKey, evictedPath, ok := c.activeHandles.GetOldest()
if evicted := c.activeHandles.Add(id, entry{f, newPath}); evicted && ok {
rk := evictedPath.f.Join(evictedPath.p...)
c.evictReverseCache(rk, evictedKey)
}
if _, ok := c.reverseHandles[joinedPath]; !ok {
c.reverseHandles[joinedPath] = []uuid.UUID{}
}
c.reverseHandles[joinedPath] = append(c.reverseHandles[joinedPath], id)
b, _ := id.MarshalBinary()
return b
}
// FromHandle converts from an opaque handle to the file it represents
func (c *CachingHandler) FromHandle(fh []byte) (billy.Filesystem, []string, error) {
id, err := uuid.FromBytes(fh)
if err != nil {
return nil, []string{}, err
}
if f, ok := c.activeHandles.Get(id); ok {
for _, k := range c.activeHandles.Keys() {
candidate, _ := c.activeHandles.Peek(k)
if hasPrefix(f.p, candidate.p) {
_, _ = c.activeHandles.Get(k)
}
}
return f.f, slices.Clone(f.p), nil
}
return nil, []string{}, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
func (c *CachingHandler) searchReverseCache(f billy.Filesystem, path string) []byte {
uuids, exists := c.reverseHandles[path]
if !exists {
return nil
}
for _, id := range uuids {
if candidate, ok := c.activeHandles.Get(id); ok {
if reflect.DeepEqual(candidate.f, f) {
return id[:]
}
}
}
return nil
}
func (c *CachingHandler) evictReverseCache(path string, handle uuid.UUID) {
uuids, exists := c.reverseHandles[path]
if !exists {
return
}
for i, u := range uuids {
if u == handle {
uuids = append(uuids[:i], uuids[i+1:]...)
c.reverseHandles[path] = uuids
return
}
}
}
func (c *CachingHandler) InvalidateHandle(fs billy.Filesystem, handle []byte) error {
//Remove from cache
id, _ := uuid.FromBytes(handle)
entry, ok := c.activeHandles.Get(id)
if ok {
rk := entry.f.Join(entry.p...)
c.evictReverseCache(rk, id)
}
c.activeHandles.Remove(id)
return nil
}
// HandleLimit exports how many file handles can be safely stored by this cache.
func (c *CachingHandler) HandleLimit() int {
return c.cacheLimit
}
func hasPrefix(path, prefix []string) bool {
if len(prefix) > len(path) {
return false
}
for i, e := range prefix {
if path[i] != e {
return false
}
}
return true
}
type verifier struct {
path string
contents []fs.FileInfo
}
func hashPathAndContents(path string, contents []fs.FileInfo) uint64 {
//calculate a cookie-verifier.
vHash := sha256.New()
// Add the path to avoid collisions of directories with the same content
vHash.Write(binary.BigEndian.AppendUint64([]byte{}, uint64(len(path))))
vHash.Write([]byte(path))
for _, c := range contents {
vHash.Write([]byte(c.Name())) // Never fails according to the docs
}
verify := vHash.Sum(nil)[0:8]
return binary.BigEndian.Uint64(verify)
}
func (c *CachingHandler) VerifierFor(path string, contents []fs.FileInfo) uint64 {
id := hashPathAndContents(path, contents)
c.activeVerifiers.Add(id, verifier{path, contents})
return id
}
func (c *CachingHandler) DataForVerifier(path string, id uint64) []fs.FileInfo {
if cache, ok := c.activeVerifiers.Get(id); ok {
return cache.contents
}
return nil
}

View file

@ -17,7 +17,7 @@ func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
bfs := &billyFsWrapper{fs: fs, log: nfslog}
handler := nfshelper.NewNullAuthHandler(bfs)
cacheHelper := nfshelper.NewCachingHandler(handler, 1024*16)
cacheHelper := nfshelper.NewCachingHandler(handler, 1024)
// cacheHelper := NewCachingHandler(handler)

View file

@ -1,6 +1,7 @@
package controller
import (
"context"
"slices"
"strings"
@ -11,10 +12,10 @@ import (
type Torrent struct {
torrentFilePath string
t *torrent.Torrent
rep *store.ExlcudedFiles
rep *store.FilesMappings
}
func NewTorrent(t *torrent.Torrent, rep *store.ExlcudedFiles) *Torrent {
func NewTorrent(t *torrent.Torrent, rep *store.FilesMappings) *Torrent {
return &Torrent{t: t, rep: rep}
}
@ -26,13 +27,13 @@ func (s *Torrent) Torrent() *torrent.Torrent {
return s.t
}
func (s *Torrent) Name() string {
<-s.t.GotInfo()
if name := s.t.Name(); name != "" {
func (c *Torrent) Name() string {
<-c.t.GotInfo()
if name := c.t.Name(); name != "" {
return name
}
return s.InfoHash()
return c.InfoHash()
}
func (s *Torrent) InfoHash() string {
@ -50,8 +51,13 @@ func (s *Torrent) BytesMissing() int64 {
return s.t.BytesMissing()
}
func (s *Torrent) Files() ([]*torrent.File, error) {
excludedFiles, err := s.rep.ExcludedFiles(s.t.InfoHash())
func (s *Torrent) Length() int64 {
<-s.t.GotInfo()
return s.t.Length()
}
func (s *Torrent) Files(ctx context.Context) ([]*torrent.File, error) {
fileMappings, err := s.rep.FileMappings(ctx, s.t.InfoHash())
if err != nil {
return nil, err
}
@ -60,25 +66,30 @@ func (s *Torrent) Files() ([]*torrent.File, error) {
files := s.t.Files()
files = slices.DeleteFunc(files, func(file *torrent.File) bool {
p := file.Path()
if strings.Contains(p, "/.pad/") {
return false
return true
}
if !slices.Contains(excludedFiles, p) {
return false
if target, ok := fileMappings[p]; ok && target == "" {
return true
}
return true
return false
})
for _, tf := range files {
s.isFileComplete(tf.BeginPieceIndex(), tf.EndPieceIndex())
}
return files, nil
}
func Map[T, U any](ts []T, f func(T) U) []U {
us := make([]U, len(ts))
for i := range ts {
us[i] = f(ts[i])
}
return us
}
func (s *Torrent) ExcludeFile(ctx context.Context, f *torrent.File) error {
return s.rep.ExcludeFile(ctx, f)
}
func (s *Torrent) isFileComplete(startIndex int, endIndex int) bool {
for i := startIndex; i < endIndex; i++ {
if !s.t.Piece(i).State().Complete {
@ -88,35 +99,6 @@ func (s *Torrent) isFileComplete(startIndex int, endIndex int) bool {
return true
}
func (s *Torrent) ExcludedFiles() ([]*torrent.File, error) {
excludedFiles, err := s.rep.ExcludedFiles(s.t.InfoHash())
if err != nil {
return nil, err
}
<-s.t.GotInfo()
files := s.t.Files()
files = slices.DeleteFunc(files, func(file *torrent.File) bool {
p := file.Path()
if strings.Contains(p, "/.pad/") {
return false
}
if slices.Contains(excludedFiles, p) {
return false
}
return true
})
return files, nil
}
func (s *Torrent) ExcludeFile(f *torrent.File) error {
return s.rep.ExcludeFile(f)
}
func (s *Torrent) ValidateTorrent() error {
<-s.t.GotInfo()
s.t.VerifyData()

View file

@ -0,0 +1,172 @@
package datastorage
import (
"context"
"fmt"
"io"
"os"
"path"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/hashicorp/go-multierror"
"github.com/royalcat/kv"
)
type PieceStorage struct {
basePath string
completion storage.PieceCompletion
dirInfohash kv.Store[string, infohash.T]
}
func NewPieceStorage(path string, completion storage.PieceCompletion) *PieceStorage {
return &PieceStorage{
basePath: path,
completion: completion,
}
}
var _ DataStorage = (*PieceStorage)(nil)
// OpenTorrent implements FileStorageDeleter.
func (p *PieceStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
torrentPath := path.Join(p.basePath, infoHash.HexString())
descriptors := []*os.File{}
return storage.TorrentImpl{
Piece: func(piece metainfo.Piece) storage.PieceImpl {
hash := piece.Hash().HexString()
piecePrefixDir := path.Join(torrentPath, hash[:2])
err := os.MkdirAll(piecePrefixDir, os.ModePerm|os.ModeDir)
if err != nil {
return &errPiece{err: err}
}
piecePath := path.Join(torrentPath, hash[:2], hash)
file, err := os.OpenFile(piecePath, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return &errPiece{err: err}
}
pk := metainfo.PieceKey{
InfoHash: infoHash,
Index: piece.Index(),
}
return newPieceFile(pk, file, p.completion)
// file, err os.OpenFile(piecePath)
},
Flush: func() error {
var res error
for _, f := range descriptors {
if err := f.Sync(); err != nil {
res = multierror.Append(res, err)
}
}
return res
},
Close: func() error {
var res error
for _, f := range descriptors {
if err := f.Close(); err != nil {
res = multierror.Append(res, err)
}
}
return res
},
}, nil
}
// Close implements FileStorageDeleter.
func (p *PieceStorage) Close() error {
return nil
}
// DeleteFile implements FileStorageDeleter.
func (p *PieceStorage) DeleteFile(file *torrent.File) error {
return fmt.Errorf("not implemented")
}
// CleanupDirs implements DataStorage.
func (p *PieceStorage) CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) {
return 0, nil // TODO
}
// CleanupFiles implements DataStorage.
func (p *PieceStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) {
return 0, nil // TODO
}
func newPieceFile(pk metainfo.PieceKey, file *os.File, completion storage.PieceCompletionGetSetter) *piece {
return &piece{
pk: pk,
File: file,
completion: completion,
}
}
type piece struct {
*os.File
pk metainfo.PieceKey
completion storage.PieceCompletionGetSetter
}
// Completion implements storage.PieceImpl.
func (p *piece) Completion() storage.Completion {
compl, err := p.completion.Get(p.pk)
if err != nil {
return storage.Completion{Complete: false, Ok: false, Err: err}
}
return compl
}
// MarkComplete implements storage.PieceImpl.
func (p *piece) MarkComplete() error {
return p.completion.Set(p.pk, true)
}
// MarkNotComplete implements storage.PieceImpl.
func (p *piece) MarkNotComplete() error {
return p.completion.Set(p.pk, false)
}
var _ storage.PieceImpl = (*piece)(nil)
var _ io.WriterTo = (*piece)(nil)
type errPiece struct {
err error
}
// WriteTo implements io.WriterTo.
func (p *errPiece) WriteTo(io.Writer) (int64, error) {
return 0, p.err
}
// ReadAt implements storage.PieceImpl.
func (p *errPiece) ReadAt([]byte, int64) (int, error) {
return 0, p.err
}
// WriteAt implements storage.PieceImpl.
func (p *errPiece) WriteAt([]byte, int64) (int, error) {
return 0, p.err
}
// Completion implements storage.PieceImpl.
func (p *errPiece) Completion() storage.Completion {
return storage.Completion{Complete: false, Ok: false, Err: p.err}
}
// MarkComplete implements storage.PieceImpl.
func (p *errPiece) MarkComplete() error {
return p.err
}
// MarkNotComplete implements storage.PieceImpl.
func (p *errPiece) MarkNotComplete() error {
return p.err
}
var _ storage.PieceImpl = (*errPiece)(nil)
var _ io.WriterTo = (*errPiece)(nil)

View file

@ -1,4 +1,4 @@
package filestorage
package datastorage
import (
"fmt"
@ -10,7 +10,7 @@ import (
"github.com/anacrolix/torrent/storage"
)
func Setup(cfg config.TorrentClient) (*FileStorage, storage.PieceCompletion, error) {
func Setup(cfg config.TorrentClient) (DataStorage, storage.PieceCompletion, error) {
pcp := filepath.Join(cfg.MetadataFolder, "piece-completion")
if err := os.MkdirAll(pcp, 0744); err != nil {
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)

View file

@ -1,4 +1,4 @@
package filestorage
package datastorage
import (
"context"
@ -14,9 +14,11 @@ import (
"github.com/anacrolix/torrent/storage"
)
type FileStorageDeleter interface {
type DataStorage interface {
storage.ClientImplCloser
DeleteFile(file *torrent.File) error
CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
}
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
@ -137,7 +139,7 @@ func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.
expectedEntries := []string{}
{
for _, e := range expected {
files, err := e.Files()
files, err := e.Files(ctx)
if err != nil {
return 0, err
}

130
src/host/service/queue.go Normal file
View file

@ -0,0 +1,130 @@
package service
import (
"context"
"fmt"
"git.kmsign.ru/royalcat/tstor/pkg/uuid"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types/infohash"
)
type TorrentDownloadTask struct {
ID uuid.UUID
InfoHash infohash.T
File string
}
func (s *Service) Download(ctx context.Context, task *TorrentDownloadTask) error {
t, ok := s.c.Torrent(task.InfoHash)
if !ok {
return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString())
}
if task.File != "" {
var file *torrent.File
for _, tf := range t.Files() {
if tf.Path() == task.File {
file = tf
break
}
}
if file == nil {
return fmt.Errorf("file %s not found in torrent torrent with IH %s", task.File, task.InfoHash.HexString())
}
file.Download()
return nil
}
t.DownloadAll()
return nil
}
// func (s *Service) DownloadAndWait(ctx context.Context, task *TorrentDownloadTask) error {
// t, ok := s.c.Torrent(task.InfoHash)
// if !ok {
// return fmt.Errorf("torrent with IH %s not found", task.InfoHash.HexString())
// }
// if task.File != "" {
// var file *torrent.File
// for _, tf := range t.Files() {
// if tf.Path() == task.File {
// file = tf
// break
// }
// }
// if file == nil {
// return fmt.Errorf("file %s not found in torrent torrent with IH %s", task.File, task.InfoHash.HexString())
// }
// file.Download()
// return waitPieceRange(ctx, t, file.BeginPieceIndex(), file.EndPieceIndex())
// }
// t.DownloadAll()
// select {
// case <-ctx.Done():
// return ctx.Err()
// case <-t.Complete.On():
// return nil
// }
// }
// func waitPieceRange(ctx context.Context, t *torrent.Torrent, start, end int) error {
// for i := start; i < end; i++ {
// timer := time.NewTimer(time.Millisecond)
// for {
// select {
// case <-ctx.Done():
// return ctx.Err()
// case <-timer.C:
// if t.PieceState(i).Complete {
// continue
// }
// }
// }
// }
// return nil
// }
type TorrentProgress struct {
Torrent *controller.Torrent
Current int64
Total int64
}
func (s *Service) DownloadProgress(ctx context.Context) (<-chan TorrentProgress, error) {
torrents, err := s.ListTorrents(ctx)
if err != nil {
return nil, err
}
out := make(chan TorrentProgress, 1)
go func() {
defer close(out)
for _, t := range torrents {
sub := t.Torrent().SubscribePieceStateChanges()
go func() {
for range sub.Values {
out <- TorrentProgress{
Torrent: t,
Current: t.BytesCompleted(),
Total: t.Length(),
}
}
}()
defer sub.Close()
}
<-ctx.Done()
}()
return out, nil
}

View file

@ -11,9 +11,10 @@ import (
"time"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"git.kmsign.ru/royalcat/tstor/src/host/filestorage"
"git.kmsign.ru/royalcat/tstor/src/host/datastorage"
"git.kmsign.ru/royalcat/tstor/src/host/store"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"go.uber.org/multierr"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/bencode"
@ -24,21 +25,21 @@ import (
type Service struct {
c *torrent.Client
excludedFiles *store.ExlcudedFiles
excludedFiles *store.FilesMappings
infoBytes *store.InfoBytes
torrentLoaded chan struct{}
// stats *Stats
DefaultPriority types.PiecePriority
Storage *filestorage.FileStorage
Storage datastorage.DataStorage
SourceDir string
log *slog.Logger
addTimeout, readTimeout int
}
func NewService(sourceDir string, c *torrent.Client, storage *filestorage.FileStorage, excludedFiles *store.ExlcudedFiles, infoBytes *store.InfoBytes, addTimeout, readTimeout int) *Service {
func NewService(sourceDir string, c *torrent.Client, storage datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes, addTimeout, readTimeout int) *Service {
s := &Service{
log: slog.With("component", "torrent-service"),
c: c,
@ -66,6 +67,12 @@ func NewService(sourceDir string, c *torrent.Client, storage *filestorage.FileSt
var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs
func (s *Service) Close() error {
err := multierr.Combine(s.c.Close()...)
err = multierr.Append(err, s.Storage.Close())
return err
}
func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) {
defer f.Close()
@ -102,17 +109,17 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
if err != nil {
infoBytes = nil
} else {
for _, t := range s.c.Torrents() {
if t.Name() == info.BestName() && t.InfoHash() != spec.InfoHash {
<-t.GotInfo()
if !isTorrentCompatable(*t.Info(), info) {
return nil, fmt.Errorf(
"torrent with name '%s' not compatable existing infohash: %s, new: %s",
t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
)
}
}
}
// for _, t := range s.c.Torrents() {
// if t.Name() == info.BestName() && t.InfoHash() != spec.InfoHash {
// <-t.GotInfo()
// if !isTorrentCompatable(*t.Info(), info) {
// return nil, fmt.Errorf(
// "torrent with name '%s' not compatable existing infohash: %s, new: %s",
// t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
// )
// }
// }
// }
}
t, _ = s.c.AddTorrentOpt(torrent.AddTorrentOpts{
@ -123,7 +130,6 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
})
t.AllowDataDownload()
t.AllowDataUpload()
t.DownloadAll()
select {
case <-ctx.Done():

View file

@ -1,94 +0,0 @@
package store
import (
"errors"
"path/filepath"
"sync"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/philippgille/gokv"
"github.com/philippgille/gokv/badgerdb"
"github.com/philippgille/gokv/encoding"
)
func NewExcludedFiles(metaDir string, storage TorrentFileDeleter) (*ExlcudedFiles, error) {
excludedFilesStore, err := badgerdb.NewStore(badgerdb.Options{
Dir: filepath.Join(metaDir, "excluded-files"),
Codec: encoding.JSON,
})
if err != nil {
return nil, err
}
r := &ExlcudedFiles{
excludedFiles: excludedFilesStore,
storage: storage,
}
return r, nil
}
type ExlcudedFiles struct {
m sync.RWMutex
excludedFiles gokv.Store
storage TorrentFileDeleter
}
var ErrNotFound = errors.New("not found")
type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error
}
func (r *ExlcudedFiles) ExcludeFile(file *torrent.File) error {
r.m.Lock()
defer r.m.Unlock()
hash := file.Torrent().InfoHash()
var excludedFiles []string
found, err := r.excludedFiles.Get(hash.AsString(), &excludedFiles)
if err != nil {
return err
}
if !found {
excludedFiles = []string{}
}
excludedFiles = unique(append(excludedFiles, file.Path()))
err = r.storage.DeleteFile(file)
if err != nil {
return err
}
return r.excludedFiles.Set(hash.AsString(), excludedFiles)
}
func (r *ExlcudedFiles) ExcludedFiles(hash metainfo.Hash) ([]string, error) {
r.m.Lock()
defer r.m.Unlock()
var excludedFiles []string
found, err := r.excludedFiles.Get(hash.AsString(), &excludedFiles)
if err != nil {
return nil, err
}
if !found {
return nil, nil
}
return excludedFiles, nil
}
func unique[C comparable](intSlice []C) []C {
keys := make(map[C]bool)
list := []C{}
for _, entry := range intSlice {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}

View file

@ -0,0 +1,57 @@
package store
import (
"context"
"errors"
"path/filepath"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
)
func NewFileMappings(metaDir string, storage TorrentFileDeleter) (*FilesMappings, error) {
str, err := kv.NewBadgerKVBytes[string, string](filepath.Join(metaDir, "file-mappings"))
if err != nil {
return nil, err
}
r := &FilesMappings{
mappings: str,
storage: storage,
}
return r, nil
}
type FilesMappings struct {
mappings kv.Store[string, string]
storage TorrentFileDeleter
}
var ErrNotFound = errors.New("not found")
type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error
}
func fileKey(file *torrent.File) string {
return file.Torrent().InfoHash().HexString() + "/" + file.Path()
}
func (r *FilesMappings) MapFile(ctx context.Context, file *torrent.File, target string) error {
return r.mappings.Set(ctx, fileKey(file), target)
}
func (r *FilesMappings) ExcludeFile(ctx context.Context, file *torrent.File) error {
return r.mappings.Set(ctx, fileKey(file), "")
}
func (r *FilesMappings) FileMappings(ctx context.Context, ih infohash.T) (map[string]string, error) {
out := map[string]string{}
err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) bool {
out[k] = v
return true
})
return out, err
}

View file

@ -21,7 +21,7 @@ func TestFileinfo(t *testing.T) {
require.Zero(fi.Type() & fs.ModeDir)
require.Zero(fi.Mode() & fs.ModeDir)
require.Equal(fs.FileMode(0555), fi.Mode())
require.Equal(nil, fi.Sys())
require.Nil(fi.Sys())
}
func TestDirInfo(t *testing.T) {
@ -38,6 +38,6 @@ func TestDirInfo(t *testing.T) {
require.NotZero(fi.Type() & fs.ModeDir)
require.NotZero(fi.Mode() & fs.ModeDir)
require.Equal(defaultMode|fs.ModeDir, fi.Mode())
require.Equal(nil, fi.Sys())
require.Nil(fi.Sys())
}

View file

@ -12,6 +12,7 @@ import (
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent"
"golang.org/x/exp/maps"
@ -46,17 +47,14 @@ func (fs *TorrentFs) files() (map[string]File, error) {
return fs.filesCache, nil
}
files, err := fs.c.Files()
files, err := fs.c.Files(context.Background())
if err != nil {
return nil, err
}
fs.filesCache = make(map[string]File)
for _, file := range files {
if file.BytesCompleted() == 0 {
continue
}
file.Download()
p := AbsPath(file.Path())
fs.filesCache[p] = &torrentFile{
@ -107,6 +105,24 @@ DEFAULT_DIR:
return fs.filesCache, nil
}
func anyPeerHasFiles(file *torrent.File) bool {
for _, conn := range file.Torrent().PeerConns() {
if bitmapHaveFile(conn.PeerPieces(), file) {
return true
}
}
return false
}
func bitmapHaveFile(bitmap *roaring.Bitmap, file *torrent.File) bool {
for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
if !bitmap.ContainsInt(i) {
return false
}
}
return true
}
func listFilesRecursive(vfs Filesystem, start string) (map[string]File, error) {
out := make(map[string]File, 0)
entries, err := vfs.ReadDir(start)
@ -222,7 +238,7 @@ func (fs *TorrentFs) Unlink(name string) error {
return ErrNotImplemented
}
return fs.c.ExcludeFile(tfile.file)
return fs.c.ExcludeFile(context.Background(), tfile.file)
}
type reader interface {

View file

@ -0,0 +1,21 @@
package virtdir
type SourceType string
const (
VirtDirYtDlp SourceType = "yt-dlp"
)
type VirtDirSource interface {
SourceType() SourceType
}
var _ VirtDirSource = (*VirtDirSourceYtDlp)(nil)
type VirtDirSourceYtDlp struct {
URL string `json:"url"`
}
func (VirtDirSourceYtDlp) SourceType() SourceType {
return VirtDirYtDlp
}

View file

@ -11,13 +11,14 @@ import (
var _ nfs.Logger = (*NFSLog)(nil)
type NFSLog struct {
level nfs.LogLevel
// r *slog.Logger
l *slog.Logger
}
func NewNFSLog(r *slog.Logger) nfs.Logger {
return &NFSLog{
// r: r,
level: nfs.DebugLevel,
// l: r.Level(zerolog.DebugLevel),
l: r,
}
@ -25,43 +26,75 @@ func NewNFSLog(r *slog.Logger) nfs.Logger {
// Debug implements nfs.Logger.
func (l *NFSLog) Debug(args ...interface{}) {
if l.level < nfs.DebugLevel {
return
}
l.l.Debug(fmt.Sprint(args...))
}
// Debugf implements nfs.Logger.
func (l *NFSLog) Debugf(format string, args ...interface{}) {
if l.level < nfs.DebugLevel {
return
}
l.l.Debug(fmt.Sprintf(format, args...))
}
// Error implements nfs.Logger.
func (l *NFSLog) Error(args ...interface{}) {
if l.level < nfs.ErrorLevel {
return
}
l.l.Error(fmt.Sprint(args...))
}
// Errorf implements nfs.Logger.
func (l *NFSLog) Errorf(format string, args ...interface{}) {
if l.level < nfs.ErrorLevel {
return
}
l.l.Error(fmt.Sprintf(format, args...))
}
// Fatal implements nfs.Logger.
func (l *NFSLog) Fatal(args ...interface{}) {
if l.level < nfs.FatalLevel {
return
}
l.l.Error(fmt.Sprint(args...))
log.Fatal(args...)
}
// Fatalf implements nfs.Logger.
func (l *NFSLog) Fatalf(format string, args ...interface{}) {
if l.level < nfs.FatalLevel {
return
}
l.l.Error(fmt.Sprintf(format, args...))
log.Fatalf(format, args...)
}
// Info implements nfs.Logger.
func (l *NFSLog) Info(args ...interface{}) {
if l.level < nfs.InfoLevel {
return
}
l.l.Info(fmt.Sprint(args...))
}
// Infof implements nfs.Logger.
func (l *NFSLog) Infof(format string, args ...interface{}) {
if l.level < nfs.InfoLevel {
return
}
l.l.Info(fmt.Sprintf(format, args...))
}
@ -79,102 +112,85 @@ func (l *NFSLog) Panicf(format string, args ...interface{}) {
// Print implements nfs.Logger.
func (l *NFSLog) Print(args ...interface{}) {
if l.level < nfs.InfoLevel {
return
}
l.l.Info(fmt.Sprint(args...))
}
// Printf implements nfs.Logger.
func (l *NFSLog) Printf(format string, args ...interface{}) {
if l.level < nfs.InfoLevel {
return
}
l.l.Info(fmt.Sprintf(format, args...))
}
// Trace implements nfs.Logger.
func (l *NFSLog) Trace(args ...interface{}) {
if l.level < nfs.TraceLevel {
return
}
l.l.Debug(fmt.Sprint(args...))
}
// Tracef implements nfs.Logger.
func (l *NFSLog) Tracef(format string, args ...interface{}) {
if l.level < nfs.TraceLevel {
return
}
l.l.Debug(fmt.Sprintf(format, args...))
}
// Warn implements nfs.Logger.
func (l *NFSLog) Warn(args ...interface{}) {
if l.level < nfs.WarnLevel {
return
}
l.l.Warn(fmt.Sprint(args...))
}
// Warnf implements nfs.Logger.
func (l *NFSLog) Warnf(format string, args ...interface{}) {
if l.level < nfs.WarnLevel {
return
}
l.l.Warn(fmt.Sprintf(format, args...))
}
// GetLevel implements nfs.Logger.
func (l *NFSLog) GetLevel() nfs.LogLevel {
// zl := l.l.Handler()
// switch zl {
// case zerolog.PanicLevel, zerolog.Disabled:
// return nfs.PanicLevel
// case zerolog.FatalLevel:
// return nfs.FatalLevel
// case zerolog.ErrorLevel:
// return nfs.ErrorLevel
// case zerolog.WarnLevel:
// return nfs.WarnLevel
// case zerolog.InfoLevel:
// return nfs.InfoLevel
// case zerolog.DebugLevel:
// return nfs.DebugLevel
// case zerolog.TraceLevel:
// return nfs.TraceLevel
// }
return nfs.TraceLevel
return l.level
}
// ParseLevel implements nfs.Logger.
func (l *NFSLog) ParseLevel(level string) (nfs.LogLevel, error) {
// switch level {
// case "panic":
// return nfs.PanicLevel, nil
// case "fatal":
// return nfs.FatalLevel, nil
// case "error":
// return nfs.ErrorLevel, nil
// case "warn":
// return nfs.WarnLevel, nil
// case "info":
// return nfs.InfoLevel, nil
// case "debug":
// return nfs.DebugLevel, nil
// case "trace":
// return nfs.TraceLevel, nil
// }
// var ll nfs.LogLevel
// return ll, fmt.Errorf("invalid log level %q", level)
return nfs.TraceLevel, fmt.Errorf("level change not supported")
switch level {
case "panic":
return nfs.PanicLevel, nil
case "fatal":
return nfs.FatalLevel, nil
case "error":
return nfs.ErrorLevel, nil
case "warn":
return nfs.WarnLevel, nil
case "info":
return nfs.InfoLevel, nil
case "debug":
return nfs.DebugLevel, nil
case "trace":
return nfs.TraceLevel, nil
}
return 0, fmt.Errorf("invalid log level %q", level)
}
// SetLevel implements nfs.Logger.
func (l *NFSLog) SetLevel(level nfs.LogLevel) {
// switch level {
// case nfs.PanicLevel:
// l.l = l.r.Level(zerolog.PanicLevel)
// return
// case nfs.FatalLevel:
// l.l = l.r.Level(zerolog.FatalLevel)
// return
// case nfs.ErrorLevel:
// l.l = l.r.Level(zerolog.ErrorLevel)
// return
// case nfs.WarnLevel:
// l.l = l.r.Level(zerolog.WarnLevel)
// return
// case nfs.InfoLevel:
// l.l = l.r.Level(zerolog.InfoLevel)
// return
// case nfs.DebugLevel:
// l.l = l.r.Level(zerolog.DebugLevel)
// return
// case nfs.TraceLevel:
// l.l = l.r.Level(zerolog.TraceLevel)
// return
// }
l.level = level
}