file delete on exclude
This commit is contained in:
parent
5f8d497de1
commit
49444bd70d
19 changed files with 481 additions and 429 deletions
|
@ -1,4 +1,4 @@
|
|||
package torrent
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -6,7 +6,7 @@ import (
|
|||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/repository"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/storage"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
|
@ -15,7 +15,7 @@ import (
|
|||
|
||||
type Service struct {
|
||||
c *torrent.Client
|
||||
rep repository.TorrentsRepository
|
||||
rep storage.TorrentsRepository
|
||||
|
||||
// stats *Stats
|
||||
DefaultPriority types.PiecePriority
|
||||
|
@ -24,7 +24,7 @@ type Service struct {
|
|||
addTimeout, readTimeout int
|
||||
}
|
||||
|
||||
func NewService(c *torrent.Client, rep repository.TorrentsRepository, addTimeout, readTimeout int) *Service {
|
||||
func NewService(c *torrent.Client, rep storage.TorrentsRepository, addTimeout, readTimeout int) *Service {
|
||||
l := slog.With("component", "torrent-service")
|
||||
return &Service{
|
||||
log: l,
|
|
@ -1,4 +1,4 @@
|
|||
package torrent
|
||||
package service
|
||||
|
||||
import (
|
||||
"errors"
|
|
@ -1,11 +1,11 @@
|
|||
package host
|
||||
|
||||
import (
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/service"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
)
|
||||
|
||||
func NewStorage(dataPath string, tsrv *torrent.Service) vfs.Filesystem {
|
||||
func NewStorage(dataPath string, tsrv *service.Service) vfs.Filesystem {
|
||||
factories := map[string]vfs.FsFactory{
|
||||
".torrent": tsrv.NewTorrentFs,
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package torrent
|
||||
package storage
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
@ -20,20 +20,6 @@ func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
|
|||
torrentCfg.PeerID = string(id[:])
|
||||
torrentCfg.DefaultStorage = st
|
||||
|
||||
// torrentCfg.DisableIPv6 = cfg.DisableIPv6
|
||||
// torrentCfg.DropDuplicatePeerIds = true
|
||||
// torrentCfg.TorrentPeersLowWater = 10
|
||||
// torrentCfg.TorrentPeersHighWater = 100
|
||||
// torrentCfg.DisableWebtorrent = true
|
||||
// torrentCfg.DisableAggressiveUpload = true
|
||||
// torrentCfg.DisableWebseeds = true
|
||||
// torrentCfg.DisableUTP = false
|
||||
// torrentCfg.NoDefaultPortForwarding = true
|
||||
// torrentCfg.AlwaysWantConns = false
|
||||
// torrentCfg.ClientDhtConfig = torrent.ClientDhtConfig{
|
||||
// NoDHT: true,
|
||||
// }
|
||||
|
||||
l := log.Logger.With().Str("component", "torrent-client").Logger()
|
||||
|
||||
tl := tlog.NewLogger()
|
|
@ -1,4 +1,4 @@
|
|||
package torrent
|
||||
package storage
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
|
@ -1,4 +1,4 @@
|
|||
package torrent
|
||||
package storage
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
|
@ -1,10 +1,11 @@
|
|||
package repository
|
||||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/philippgille/gokv"
|
||||
"github.com/philippgille/gokv/badgerdb"
|
||||
|
@ -12,13 +13,13 @@ import (
|
|||
)
|
||||
|
||||
type TorrentsRepository interface {
|
||||
ExcludeFile(hash metainfo.Hash, file ...string) error
|
||||
ExcludeFile(file *torrent.File) error
|
||||
ExcludedFiles(hash metainfo.Hash) ([]string, error)
|
||||
}
|
||||
|
||||
func NewTorrentMetaRepository(dir string) (TorrentsRepository, error) {
|
||||
func NewTorrentMetaRepository(metaDir string, storage *FileStorage) (TorrentsRepository, error) {
|
||||
excludedFilesStore, err := badgerdb.NewStore(badgerdb.Options{
|
||||
Dir: filepath.Join(dir, "excluded-files"),
|
||||
Dir: filepath.Join(metaDir, "excluded-files"),
|
||||
Codec: encoding.JSON,
|
||||
})
|
||||
|
||||
|
@ -28,6 +29,7 @@ func NewTorrentMetaRepository(dir string) (TorrentsRepository, error) {
|
|||
|
||||
r := &torrentRepositoryImpl{
|
||||
excludedFiles: excludedFilesStore,
|
||||
storage: storage,
|
||||
}
|
||||
|
||||
return r, nil
|
||||
|
@ -36,14 +38,16 @@ func NewTorrentMetaRepository(dir string) (TorrentsRepository, error) {
|
|||
type torrentRepositoryImpl struct {
|
||||
m sync.RWMutex
|
||||
excludedFiles gokv.Store
|
||||
storage *FileStorage
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
func (r *torrentRepositoryImpl) ExcludeFile(hash metainfo.Hash, file ...string) error {
|
||||
func (r *torrentRepositoryImpl) ExcludeFile(file *torrent.File) error {
|
||||
r.m.Lock()
|
||||
defer r.m.Unlock()
|
||||
|
||||
hash := file.Torrent().InfoHash()
|
||||
var excludedFiles []string
|
||||
found, err := r.excludedFiles.Get(hash.AsString(), &excludedFiles)
|
||||
if err != nil {
|
||||
|
@ -52,7 +56,12 @@ func (r *torrentRepositoryImpl) ExcludeFile(hash metainfo.Hash, file ...string)
|
|||
if !found {
|
||||
excludedFiles = []string{}
|
||||
}
|
||||
excludedFiles = unique(append(excludedFiles, file...))
|
||||
excludedFiles = unique(append(excludedFiles, file.Path()))
|
||||
|
||||
err = r.storage.DeleteFile(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.excludedFiles.Set(hash.AsString(), excludedFiles)
|
||||
}
|
51
src/host/storage/storage.go
Normal file
51
src/host/storage/storage.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
)
|
||||
|
||||
func SetupStorage(cfg config.TorrentClient) (*FileStorage, storage.PieceCompletion, error) {
|
||||
pcp := filepath.Join(cfg.DataFolder, "piece-completion")
|
||||
if err := os.MkdirAll(pcp, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
pc, err := storage.NewBoltPieceCompletion(pcp)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
}
|
||||
|
||||
// pc, err := NewBadgerPieceCompletion(pcp)
|
||||
// if err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
// }
|
||||
|
||||
// TODO implement cache/storage switching
|
||||
// cacheDir := filepath.Join(tcfg.DataFolder, "cache")
|
||||
// if err := os.MkdirAll(cacheDir, 0744); err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
// }
|
||||
// fc, err := filecache.NewCache(cacheDir)
|
||||
// if err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating cache: %w", err)
|
||||
// }
|
||||
// log.Info().Msg(fmt.Sprintf("setting cache size to %d MB", 1024))
|
||||
// fc.SetCapacity(1024 * 1024 * 1024)
|
||||
|
||||
// rp := storage.NewResourcePieces(fc.AsResourceProvider())
|
||||
// st := &stc{rp}
|
||||
|
||||
filesDir := filepath.Join(cfg.DataFolder, "files")
|
||||
if err := os.MkdirAll(pcp, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
|
||||
// st := storage.NewMMapWithCompletion(filesDir, pc)
|
||||
st := NewFileStorage(filesDir, pc)
|
||||
|
||||
return st, pc, nil
|
||||
}
|
302
src/host/storage/storage_files.go
Normal file
302
src/host/storage/storage_files.go
Normal file
|
@ -0,0 +1,302 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/anacrolix/missinggo"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/common"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/segments"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
)
|
||||
|
||||
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
|
||||
func NewFileStorage(baseDir string, pc storage.PieceCompletion) *FileStorage {
|
||||
return &FileStorage{baseDir: baseDir, pieceCompletion: pc}
|
||||
}
|
||||
|
||||
// File-based storage for torrents, that isn't yet bound to a particular torrent.
|
||||
type FileStorage struct {
|
||||
baseDir string
|
||||
pieceCompletion storage.PieceCompletion
|
||||
}
|
||||
|
||||
func (me *FileStorage) Close() error {
|
||||
return me.pieceCompletion.Close()
|
||||
}
|
||||
|
||||
func (me *FileStorage) torrentDir(info *metainfo.Info, infoHash metainfo.Hash) string {
|
||||
return filepath.Join(me.baseDir, info.Name)
|
||||
}
|
||||
|
||||
func (me *FileStorage) filePath(file metainfo.FileInfo) string {
|
||||
return filepath.Join(file.Path...)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) DeleteFile(file *torrent.File) error {
|
||||
info := file.Torrent().Info()
|
||||
infoHash := file.Torrent().InfoHash()
|
||||
torrentDir := fs.torrentDir(info, infoHash)
|
||||
relFilePath := fs.filePath(file.FileInfo())
|
||||
filePath := path.Join(torrentDir, relFilePath)
|
||||
return os.Remove(filePath)
|
||||
}
|
||||
|
||||
func (fs FileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
|
||||
dir := fs.torrentDir(info, infoHash)
|
||||
upvertedFiles := info.UpvertedFiles()
|
||||
files := make([]file, 0, len(upvertedFiles))
|
||||
for i, fileInfo := range upvertedFiles {
|
||||
filePath := filepath.Join(dir, fs.filePath(fileInfo))
|
||||
if !isSubFilepath(dir, filePath) {
|
||||
return storage.TorrentImpl{}, fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, fs.baseDir)
|
||||
}
|
||||
|
||||
f := file{
|
||||
path: filePath,
|
||||
length: fileInfo.Length,
|
||||
}
|
||||
if f.length == 0 {
|
||||
err := CreateNativeZeroLengthFile(f.path)
|
||||
if err != nil {
|
||||
return storage.TorrentImpl{}, fmt.Errorf("creating zero length file: %w", err)
|
||||
}
|
||||
}
|
||||
files = append(files, f)
|
||||
}
|
||||
t := &fileTorrentImpl{
|
||||
files: files,
|
||||
segmentLocater: segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
|
||||
infoHash: infoHash,
|
||||
completion: fs.pieceCompletion,
|
||||
}
|
||||
return storage.TorrentImpl{
|
||||
Piece: t.Piece,
|
||||
Close: t.Close,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type file struct {
|
||||
// The safe, OS-local file path.
|
||||
path string
|
||||
length int64
|
||||
}
|
||||
|
||||
type fileTorrentImpl struct {
|
||||
files []file
|
||||
segmentLocater segments.Index
|
||||
infoHash metainfo.Hash
|
||||
completion storage.PieceCompletion
|
||||
}
|
||||
|
||||
func (fts *fileTorrentImpl) Piece(p metainfo.Piece) storage.PieceImpl {
|
||||
// Create a view onto the file-based torrent storage.
|
||||
_io := fileTorrentImplIO{fts}
|
||||
// Return the appropriate segments of this.
|
||||
return &filePieceImpl{
|
||||
fileTorrentImpl: fts,
|
||||
p: p,
|
||||
WriterAt: missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
|
||||
ReaderAt: io.NewSectionReader(_io, p.Offset(), p.Length()),
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *fileTorrentImpl) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// A helper to create zero-length files which won't appear for file-orientated storage since no
|
||||
// writes will ever occur to them (no torrent data is associated with a zero-length file). The
|
||||
// caller should make sure the file name provided is safe/sanitized.
|
||||
func CreateNativeZeroLengthFile(name string) error {
|
||||
err := os.MkdirAll(filepath.Dir(name), 0o777)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f, err := os.Create(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// Exposes file-based storage of a torrent, as one big ReadWriterAt.
|
||||
type fileTorrentImplIO struct {
|
||||
fts *fileTorrentImpl
|
||||
}
|
||||
|
||||
// Returns EOF on short or missing file.
|
||||
func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
|
||||
f, err := os.Open(file.path)
|
||||
if os.IsNotExist(err) {
|
||||
// File missing is treated the same as a short file.
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
// Limit the read to within the expected bounds of this file.
|
||||
if int64(len(b)) > file.length-off {
|
||||
b = b[:file.length-off]
|
||||
}
|
||||
for off < file.length && len(b) != 0 {
|
||||
n1, err1 := f.ReadAt(b, off)
|
||||
b = b[n1:]
|
||||
n += n1
|
||||
off += int64(n1)
|
||||
if n1 == 0 {
|
||||
err = err1
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
|
||||
func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
fst.fts.segmentLocater.Locate(
|
||||
segments.Extent{Start: off, Length: int64(len(b))},
|
||||
func(i int, e segments.Extent) bool {
|
||||
n1, err1 := fst.readFileAt(fst.fts.files[i], b[:e.Length], e.Start)
|
||||
n += n1
|
||||
b = b[n1:]
|
||||
err = err1
|
||||
return err == nil // && int64(n1) == e.Length
|
||||
},
|
||||
)
|
||||
if len(b) != 0 && err == nil {
|
||||
err = io.EOF
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
|
||||
// log.Printf("write at %v: %v bytes", off, len(p))
|
||||
fst.fts.segmentLocater.Locate(
|
||||
segments.Extent{Start: off, Length: int64(len(p))},
|
||||
func(i int, e segments.Extent) bool {
|
||||
name := fst.fts.files[i].path
|
||||
err = os.MkdirAll(filepath.Dir(name), 0o777)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
var f *os.File
|
||||
f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
var n1 int
|
||||
n1, err = f.WriteAt(p[:e.Length], e.Start)
|
||||
// log.Printf("%v %v wrote %v: %v", i, e, n1, err)
|
||||
closeErr := f.Close()
|
||||
n += n1
|
||||
p = p[n1:]
|
||||
if err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
if err == nil && int64(n1) != e.Length {
|
||||
err = io.ErrShortWrite
|
||||
}
|
||||
return err == nil
|
||||
},
|
||||
)
|
||||
return n, err
|
||||
}
|
||||
|
||||
type filePieceImpl struct {
|
||||
*fileTorrentImpl
|
||||
p metainfo.Piece
|
||||
io.WriterAt
|
||||
io.ReaderAt
|
||||
}
|
||||
|
||||
var _ storage.PieceImpl = (*filePieceImpl)(nil)
|
||||
|
||||
func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
|
||||
return metainfo.PieceKey{InfoHash: me.infoHash, Index: me.p.Index()}
|
||||
}
|
||||
|
||||
func (fs *filePieceImpl) Completion() storage.Completion {
|
||||
c, err := fs.completion.Get(fs.pieceKey())
|
||||
if err != nil {
|
||||
log.Printf("error getting piece completion: %s", err)
|
||||
c.Ok = false
|
||||
return c
|
||||
}
|
||||
|
||||
verified := true
|
||||
if c.Complete {
|
||||
// If it's allegedly complete, check that its constituent files have the necessary length.
|
||||
for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
|
||||
s, err := os.Stat(fs.files[fi.fileIndex].path)
|
||||
if err != nil || s.Size() < fi.length {
|
||||
verified = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !verified {
|
||||
// The completion was wrong, fix it.
|
||||
c.Complete = false
|
||||
fs.completion.Set(fs.pieceKey(), false)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (fs *filePieceImpl) MarkComplete() error {
|
||||
return fs.completion.Set(fs.pieceKey(), true)
|
||||
}
|
||||
|
||||
func (fs *filePieceImpl) MarkNotComplete() error {
|
||||
return fs.completion.Set(fs.pieceKey(), false)
|
||||
}
|
||||
|
||||
type requiredLength struct {
|
||||
fileIndex int
|
||||
length int64
|
||||
}
|
||||
|
||||
func isSubFilepath(base, sub string) bool {
|
||||
rel, err := filepath.Rel(base, sub)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return rel != ".." && !strings.HasPrefix(rel, ".."+string(os.PathSeparator))
|
||||
}
|
||||
|
||||
func extentCompleteRequiredLengths(info *metainfo.Info, off, n int64) (ret []requiredLength) {
|
||||
if n == 0 {
|
||||
return
|
||||
}
|
||||
for i, fi := range info.UpvertedFiles() {
|
||||
if off >= fi.Length {
|
||||
off -= fi.Length
|
||||
continue
|
||||
}
|
||||
n1 := n
|
||||
if off+n1 > fi.Length {
|
||||
n1 = fi.Length - off
|
||||
}
|
||||
ret = append(ret, requiredLength{
|
||||
fileIndex: i,
|
||||
length: off + n1,
|
||||
})
|
||||
n -= n1
|
||||
if n == 0 {
|
||||
return
|
||||
}
|
||||
off = 0
|
||||
}
|
||||
panic("extent exceeds torrent bounds")
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package torrent
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
|
@ -1,306 +0,0 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"github.com/anacrolix/missinggo"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/mmap_span"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/edsrzf/mmap-go"
|
||||
)
|
||||
|
||||
type Torrent struct {
|
||||
client *torrent.Client
|
||||
data storage.ClientImplCloser
|
||||
pc storage.PieceCompletion
|
||||
}
|
||||
|
||||
func SetupStorage(cfg config.TorrentClient) (storage.ClientImplCloser, storage.PieceCompletion, error) {
|
||||
pcp := filepath.Join(cfg.DataFolder, "piece-completion")
|
||||
if err := os.MkdirAll(pcp, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
pc, err := storage.NewBoltPieceCompletion(pcp)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
}
|
||||
|
||||
// pc, err := NewBadgerPieceCompletion(pcp)
|
||||
// if err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
// }
|
||||
|
||||
// TODO implement cache/storage switching
|
||||
// cacheDir := filepath.Join(tcfg.DataFolder, "cache")
|
||||
// if err := os.MkdirAll(cacheDir, 0744); err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
// }
|
||||
// fc, err := filecache.NewCache(cacheDir)
|
||||
// if err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating cache: %w", err)
|
||||
// }
|
||||
// log.Info().Msg(fmt.Sprintf("setting cache size to %d MB", 1024))
|
||||
// fc.SetCapacity(1024 * 1024 * 1024)
|
||||
|
||||
// rp := storage.NewResourcePieces(fc.AsResourceProvider())
|
||||
// st := &stc{rp}
|
||||
|
||||
filesDir := filepath.Join(cfg.DataFolder, "files")
|
||||
if err := os.MkdirAll(pcp, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
|
||||
// st := storage.NewMMapWithCompletion(filesDir, pc)
|
||||
st := storage.NewFileOpts(storage.NewFileClientOpts{
|
||||
ClientBaseDir: filesDir,
|
||||
PieceCompletion: pc,
|
||||
})
|
||||
|
||||
return st, pc, nil
|
||||
}
|
||||
|
||||
func (s Torrent) Remove(f *torrent.File) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// type dupePieces struct {
|
||||
// }
|
||||
|
||||
// func (s Torrent) dedupe(f1, f2 *os.File) error {
|
||||
// for _, t := range s.client.Torrents() {
|
||||
// for i := 0; i < t.NumPieces(); i++ {
|
||||
// p := t.Piece(i)
|
||||
// p.Info().Hash()
|
||||
// }
|
||||
// }
|
||||
|
||||
// // https://go-review.googlesource.com/c/sys/+/284352/10/unix/syscall_linux_test.go#856
|
||||
// // dedupe := unix.FileDedupeRange{
|
||||
// // Src_offset: uint64(0),
|
||||
// // Src_length: uint64(4096),
|
||||
// // Info: []unix.FileDedupeRangeInfo{
|
||||
// // unix.FileDedupeRangeInfo{
|
||||
// // Dest_fd: int64(f2.Fd()),
|
||||
// // Dest_offset: uint64(0),
|
||||
// // },
|
||||
// // unix.FileDedupeRangeInfo{
|
||||
// // Dest_fd: int64(f2.Fd()),
|
||||
// // Dest_offset: uint64(4096),
|
||||
// // },
|
||||
// // }}
|
||||
// // err := unix.IoctlFileDedupeRange(int(f1.Fd()), &dedupe)
|
||||
// // if err == unix.EOPNOTSUPP || err == unix.EINVAL {
|
||||
// // t.Skip("deduplication not supported on this filesystem")
|
||||
// // } else if err != nil {
|
||||
// // t.Fatal(err)
|
||||
// // }
|
||||
|
||||
// return nil
|
||||
// }
|
||||
|
||||
type mmapClientImpl struct {
|
||||
baseDir string
|
||||
pc storage.PieceCompletion
|
||||
}
|
||||
|
||||
func NewMMapWithCompletion(baseDir string, completion storage.PieceCompletion) *mmapClientImpl {
|
||||
return &mmapClientImpl{
|
||||
baseDir: baseDir,
|
||||
pc: completion,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ storage.TorrentImpl, err error) {
|
||||
t, err := newMMapTorrent(info, infoHash, s.baseDir, s.pc)
|
||||
if err != nil {
|
||||
return storage.TorrentImpl{}, err
|
||||
}
|
||||
return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Flush: t.Flush}, nil
|
||||
}
|
||||
|
||||
func (s *mmapClientImpl) Close() error {
|
||||
return s.pc.Close()
|
||||
}
|
||||
|
||||
func newMMapTorrent(md *metainfo.Info, infoHash metainfo.Hash, location string, pc storage.PieceCompletionGetSetter) (*mmapTorrent, error) {
|
||||
span := &mmap_span.MMapSpan{}
|
||||
basePath, err := storage.ToSafeFilePath(md.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
basePath = filepath.Join(location, basePath)
|
||||
|
||||
for _, miFile := range md.UpvertedFiles() {
|
||||
var safeName string
|
||||
safeName, err = storage.ToSafeFilePath(miFile.Path...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fileName := filepath.Join(basePath, safeName)
|
||||
var mm FileMapping
|
||||
mm, err = mmapFile(fileName, miFile.Length)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("file %q: %s", miFile.DisplayPath(md), err)
|
||||
return nil, err
|
||||
}
|
||||
span.Append(mm)
|
||||
}
|
||||
span.InitIndex()
|
||||
|
||||
return &mmapTorrent{
|
||||
infoHash: infoHash,
|
||||
span: span,
|
||||
pc: pc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type mmapTorrent struct {
|
||||
infoHash metainfo.Hash
|
||||
span *mmap_span.MMapSpan
|
||||
pc storage.PieceCompletionGetSetter
|
||||
}
|
||||
|
||||
func (ts *mmapTorrent) Piece(p metainfo.Piece) storage.PieceImpl {
|
||||
return mmapPiece{
|
||||
pc: ts.pc,
|
||||
p: p,
|
||||
ih: ts.infoHash,
|
||||
ReaderAt: io.NewSectionReader(ts.span, p.Offset(), p.Length()),
|
||||
WriterAt: missinggo.NewSectionWriter(ts.span, p.Offset(), p.Length()),
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *mmapTorrent) Close() error {
|
||||
errs := ts.span.Close()
|
||||
if len(errs) > 0 {
|
||||
return errs[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ts *mmapTorrent) Flush() error {
|
||||
errs := ts.span.Flush()
|
||||
if len(errs) > 0 {
|
||||
return errs[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type mmapPiece struct {
|
||||
pc storage.PieceCompletionGetSetter
|
||||
p metainfo.Piece
|
||||
ih metainfo.Hash
|
||||
io.ReaderAt
|
||||
io.WriterAt
|
||||
}
|
||||
|
||||
func (me mmapPiece) pieceKey() metainfo.PieceKey {
|
||||
return metainfo.PieceKey{InfoHash: me.ih, Index: me.p.Index()}
|
||||
}
|
||||
|
||||
func (sp mmapPiece) Completion() storage.Completion {
|
||||
c, err := sp.pc.Get(sp.pieceKey())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (sp mmapPiece) MarkComplete() error {
|
||||
return sp.pc.Set(sp.pieceKey(), true)
|
||||
}
|
||||
|
||||
func (sp mmapPiece) MarkNotComplete() error {
|
||||
return sp.pc.Set(sp.pieceKey(), false)
|
||||
}
|
||||
|
||||
func mmapFile(name string, size int64) (_ FileMapping, err error) {
|
||||
dir := filepath.Dir(name)
|
||||
err = os.MkdirAll(dir, 0o750)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("making directory %q: %s", dir, err)
|
||||
}
|
||||
var file *os.File
|
||||
file, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
file.Close()
|
||||
}
|
||||
}()
|
||||
var fi os.FileInfo
|
||||
fi, err = file.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fi.Size() < size {
|
||||
// I think this is necessary on HFS+. Maybe Linux will SIGBUS too if
|
||||
// you overmap a file but I'm not sure.
|
||||
err = file.Truncate(size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return func() (ret mmapWithFile, err error) {
|
||||
ret.f = file
|
||||
if size == 0 {
|
||||
// Can't mmap() regions with length 0.
|
||||
return
|
||||
}
|
||||
intLen := int(size)
|
||||
if int64(intLen) != size {
|
||||
err = errors.New("size too large for system")
|
||||
return
|
||||
}
|
||||
ret.mmap, err = mmap.MapRegion(file, intLen, mmap.RDWR, 0, 0)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error mapping region: %s", err)
|
||||
return
|
||||
}
|
||||
if int64(len(ret.mmap)) != size {
|
||||
panic(len(ret.mmap))
|
||||
}
|
||||
return
|
||||
}()
|
||||
}
|
||||
|
||||
type FileMapping = mmap_span.Mmap
|
||||
|
||||
// Handles closing the mmap's file handle (needed for Windows). Could be implemented differently by
|
||||
// OS.
|
||||
type mmapWithFile struct {
|
||||
f *os.File
|
||||
mmap mmap.MMap
|
||||
}
|
||||
|
||||
func (m mmapWithFile) Flush() error {
|
||||
return m.mmap.Flush()
|
||||
}
|
||||
|
||||
func (m mmapWithFile) Unmap() (err error) {
|
||||
if m.mmap != nil {
|
||||
err = m.mmap.Unmap()
|
||||
}
|
||||
fileErr := m.f.Close()
|
||||
if err == nil {
|
||||
err = fileErr
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m mmapWithFile) Bytes() []byte {
|
||||
if m.mmap == nil {
|
||||
return nil
|
||||
}
|
||||
return m.mmap
|
||||
}
|
|
@ -9,7 +9,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/repository"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/storage"
|
||||
"git.kmsign.ru/royalcat/tstor/src/iio"
|
||||
"github.com/anacrolix/missinggo/v2"
|
||||
"github.com/anacrolix/torrent"
|
||||
|
@ -21,7 +21,7 @@ var _ Filesystem = &TorrentFs{}
|
|||
type TorrentFs struct {
|
||||
mu sync.Mutex
|
||||
t *torrent.Torrent
|
||||
rep repository.TorrentsRepository
|
||||
rep storage.TorrentsRepository
|
||||
|
||||
readTimeout int
|
||||
|
||||
|
@ -31,7 +31,7 @@ type TorrentFs struct {
|
|||
resolver *resolver
|
||||
}
|
||||
|
||||
func NewTorrentFs(t *torrent.Torrent, rep repository.TorrentsRepository, readTimeout int) *TorrentFs {
|
||||
func NewTorrentFs(t *torrent.Torrent, rep storage.TorrentsRepository, readTimeout int) *TorrentFs {
|
||||
return &TorrentFs{
|
||||
t: t,
|
||||
rep: rep,
|
||||
|
@ -53,17 +53,17 @@ func (fs *TorrentFs) files() (map[string]*torrentFile, error) {
|
|||
|
||||
fs.filesCache = make(map[string]*torrentFile)
|
||||
for _, file := range files {
|
||||
p := AbsPath(file.Path())
|
||||
|
||||
if slices.Contains(excludedFiles, p) {
|
||||
if slices.Contains(excludedFiles, file.Path()) {
|
||||
continue
|
||||
}
|
||||
|
||||
p := AbsPath(file.Path())
|
||||
|
||||
fs.filesCache[p] = &torrentFile{
|
||||
name: path.Base(p),
|
||||
readerFunc: file.NewReader,
|
||||
len: file.Length(),
|
||||
timeout: fs.readTimeout,
|
||||
name: path.Base(p),
|
||||
timeout: fs.readTimeout,
|
||||
file: file,
|
||||
}
|
||||
}
|
||||
fs.mu.Unlock()
|
||||
|
@ -144,6 +144,8 @@ func (fs *TorrentFs) ReadDir(name string) ([]fs.DirEntry, error) {
|
|||
}
|
||||
|
||||
func (fs *TorrentFs) Unlink(name string) error {
|
||||
name = AbsPath(name)
|
||||
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
|
@ -151,14 +153,15 @@ func (fs *TorrentFs) Unlink(name string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
file := AbsPath(name)
|
||||
|
||||
if !slices.Contains(maps.Keys(files), file) {
|
||||
if !slices.Contains(maps.Keys(files), name) {
|
||||
return ErrNotExist
|
||||
}
|
||||
fs.filesCache = nil
|
||||
|
||||
return fs.rep.ExcludeFile(fs.t.InfoHash(), file)
|
||||
file := files[name]
|
||||
delete(fs.filesCache, name)
|
||||
|
||||
return fs.rep.ExcludeFile(file.file)
|
||||
}
|
||||
|
||||
type reader interface {
|
||||
|
@ -224,25 +227,25 @@ var _ File = &torrentFile{}
|
|||
type torrentFile struct {
|
||||
name string
|
||||
|
||||
readerFunc func() torrent.Reader
|
||||
reader reader
|
||||
len int64
|
||||
timeout int
|
||||
reader reader
|
||||
timeout int
|
||||
|
||||
file *torrent.File
|
||||
}
|
||||
|
||||
func (d *torrentFile) Stat() (fs.FileInfo, error) {
|
||||
return newFileInfo(d.name, d.len), nil
|
||||
return newFileInfo(d.name, d.file.Length()), nil
|
||||
}
|
||||
|
||||
func (d *torrentFile) load() {
|
||||
if d.reader != nil {
|
||||
return
|
||||
}
|
||||
d.reader = newReadAtWrapper(d.readerFunc(), d.timeout)
|
||||
d.reader = newReadAtWrapper(d.file.NewReader(), d.timeout)
|
||||
}
|
||||
|
||||
func (d *torrentFile) Size() int64 {
|
||||
return d.len
|
||||
return d.file.Length()
|
||||
}
|
||||
|
||||
func (d *torrentFile) IsDir() bool {
|
||||
|
|
|
@ -96,9 +96,8 @@ func TestReadAtTorrent(t *testing.T) {
|
|||
torrFile := to.Files()[0]
|
||||
|
||||
tf := torrentFile{
|
||||
readerFunc: torrFile.NewReader,
|
||||
len: torrFile.Length(),
|
||||
timeout: 500,
|
||||
file: torrFile,
|
||||
timeout: 500,
|
||||
}
|
||||
|
||||
defer tf.Close()
|
||||
|
|
|
@ -7,12 +7,12 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/service"
|
||||
"github.com/anacrolix/missinggo/v2/filecache"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerFunc {
|
||||
var apiStatusHandler = func(fc *filecache.Cache, ss *service.Stats) gin.HandlerFunc {
|
||||
return func(ctx *gin.Context) {
|
||||
stat := gin.H{
|
||||
"torrentStats": ss.GlobalStats(),
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
|
||||
"git.kmsign.ru/royalcat/tstor"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/service"
|
||||
"github.com/anacrolix/missinggo/v2/filecache"
|
||||
"github.com/gin-contrib/pprof"
|
||||
"github.com/gin-gonic/gin"
|
||||
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/shurcooL/httpfs/html/vfstemplate"
|
||||
)
|
||||
|
||||
func New(fc *filecache.Cache, ss *torrent.Stats, s *torrent.Service, logPath string, cfg *config.Config) error {
|
||||
func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, logPath string, cfg *config.Config) error {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
r := gin.New()
|
||||
r.Use(gin.Recovery())
|
||||
|
|
|
@ -26,7 +26,7 @@ func TestReadData(t *testing.T) {
|
|||
require.Equal(5, n)
|
||||
require.Equal("World", string(toRead))
|
||||
|
||||
r.ReadAt(toRead, 0)
|
||||
n, err = r.ReadAt(toRead, 0)
|
||||
require.NoError(err)
|
||||
require.Equal(5, n)
|
||||
require.Equal("Hello", string(toRead))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue