WIP
This commit is contained in:
parent
78704bee78
commit
2cefb9db98
19 changed files with 691 additions and 627 deletions
src
|
@ -1,7 +1,7 @@
|
|||
package config
|
||||
|
||||
var defaultConfig = Config{
|
||||
DataFolder: "./data",
|
||||
SourceDir: "./data",
|
||||
WebUi: WebUi{
|
||||
Port: 4444,
|
||||
IP: "0.0.0.0",
|
||||
|
|
|
@ -7,7 +7,7 @@ type Config struct {
|
|||
Mounts Mounts `koanf:"mounts"`
|
||||
Log Log `koanf:"log"`
|
||||
|
||||
DataFolder string `koanf:"dataFolder"`
|
||||
SourceDir string `koanf:"source_dir"`
|
||||
}
|
||||
|
||||
type WebUi struct {
|
||||
|
|
|
@ -2,14 +2,22 @@ package nfs
|
|||
|
||||
import (
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/log"
|
||||
zlog "github.com/rs/zerolog/log"
|
||||
nfs "github.com/willscott/go-nfs"
|
||||
nfshelper "github.com/willscott/go-nfs/helpers"
|
||||
)
|
||||
|
||||
func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
|
||||
bfs := &billyFsWrapper{fs: fs}
|
||||
nfslog := zlog.Logger.With().Str("component", "nfs").Logger()
|
||||
nfs.SetLogger(log.NewNFSLog(nfslog))
|
||||
nfs.Log.SetLevel(nfs.InfoLevel)
|
||||
|
||||
bfs := &billyFsWrapper{fs: fs, log: nfslog}
|
||||
handler := nfshelper.NewNullAuthHandler(bfs)
|
||||
|
||||
cacheHelper := nfshelper.NewCachingHandler(handler, 1024*16)
|
||||
|
||||
// cacheHelper := NewCachingHandler(handler)
|
||||
|
||||
return cacheHelper, nil
|
||||
|
|
|
@ -1,15 +1,18 @@
|
|||
package nfs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/fs"
|
||||
"path/filepath"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/go-git/go-billy/v5"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type billyFsWrapper struct {
|
||||
fs vfs.Filesystem
|
||||
fs vfs.Filesystem
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
var _ billy.Filesystem = (*billyFsWrapper)(nil)
|
||||
|
@ -34,7 +37,7 @@ func (*billyFsWrapper) Join(elem ...string) string {
|
|||
func (fs *billyFsWrapper) Lstat(filename string) (fs.FileInfo, error) {
|
||||
info, err := fs.fs.Stat(filename)
|
||||
if err != nil {
|
||||
return nil, billyErr(err)
|
||||
return nil, billyErr(err, fs.log)
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
|
@ -45,26 +48,28 @@ func (*billyFsWrapper) MkdirAll(filename string, perm fs.FileMode) error {
|
|||
}
|
||||
|
||||
// Open implements billy.Filesystem.
|
||||
func (f *billyFsWrapper) Open(filename string) (billy.File, error) {
|
||||
file, err := f.fs.Open(filename)
|
||||
func (fs *billyFsWrapper) Open(filename string) (billy.File, error) {
|
||||
file, err := fs.fs.Open(filename)
|
||||
if err != nil {
|
||||
return nil, billyErr(err)
|
||||
return nil, billyErr(err, fs.log)
|
||||
}
|
||||
return &billyFile{
|
||||
name: filename,
|
||||
file: file,
|
||||
log: fs.log.With().Str("filename", filename).Logger(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// OpenFile implements billy.Filesystem.
|
||||
func (f *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode) (billy.File, error) {
|
||||
file, err := f.fs.Open(filename)
|
||||
func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode) (billy.File, error) {
|
||||
file, err := fs.fs.Open(filename)
|
||||
if err != nil {
|
||||
return nil, billyErr(err)
|
||||
return nil, billyErr(err, fs.log)
|
||||
}
|
||||
return &billyFile{
|
||||
name: filename,
|
||||
file: file,
|
||||
log: fs.log.With().Str("filename", filename).Int("flag", flag).Str("perm", perm.String()).Logger(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -72,7 +77,7 @@ func (f *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode) (
|
|||
func (bfs *billyFsWrapper) ReadDir(path string) ([]fs.FileInfo, error) {
|
||||
ffs, err := bfs.fs.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, billyErr(err)
|
||||
return nil, billyErr(err, bfs.log)
|
||||
}
|
||||
|
||||
out := make([]fs.FileInfo, 0, len(ffs))
|
||||
|
@ -112,27 +117,28 @@ func (*billyFsWrapper) Root() string {
|
|||
}
|
||||
|
||||
// Stat implements billy.Filesystem.
|
||||
func (f *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) {
|
||||
info, err := f.fs.Stat(filename)
|
||||
func (fs *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) {
|
||||
info, err := fs.fs.Stat(filename)
|
||||
if err != nil {
|
||||
return nil, billyErr(err)
|
||||
return nil, billyErr(err, fs.log)
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// Symlink implements billy.Filesystem.
|
||||
func (*billyFsWrapper) Symlink(target string, link string) error {
|
||||
return billyErr(vfs.ErrNotImplemented)
|
||||
func (fs *billyFsWrapper) Symlink(target string, link string) error {
|
||||
return billyErr(vfs.ErrNotImplemented, fs.log)
|
||||
}
|
||||
|
||||
// TempFile implements billy.Filesystem.
|
||||
func (*billyFsWrapper) TempFile(dir string, prefix string) (billy.File, error) {
|
||||
return nil, billyErr(vfs.ErrNotImplemented)
|
||||
func (fs *billyFsWrapper) TempFile(dir string, prefix string) (billy.File, error) {
|
||||
return nil, billyErr(vfs.ErrNotImplemented, fs.log)
|
||||
}
|
||||
|
||||
type billyFile struct {
|
||||
name string
|
||||
file vfs.File
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
var _ billy.File = (*billyFile)(nil)
|
||||
|
@ -149,27 +155,27 @@ func (f *billyFile) Name() string {
|
|||
|
||||
// Read implements billy.File.
|
||||
func (f *billyFile) Read(p []byte) (n int, err error) {
|
||||
return f.Read(p)
|
||||
return f.file.Read(p)
|
||||
}
|
||||
|
||||
// ReadAt implements billy.File.
|
||||
func (f *billyFile) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
return f.ReadAt(p, off)
|
||||
return f.file.ReadAt(p, off)
|
||||
}
|
||||
|
||||
// Seek implements billy.File.
|
||||
func (*billyFile) Seek(offset int64, whence int) (int64, error) {
|
||||
return 0, billyErr(vfs.ErrNotImplemented)
|
||||
func (f *billyFile) Seek(offset int64, whence int) (int64, error) {
|
||||
return 0, billyErr(vfs.ErrNotImplemented, f.log)
|
||||
}
|
||||
|
||||
// Truncate implements billy.File.
|
||||
func (*billyFile) Truncate(size int64) error {
|
||||
return billyErr(vfs.ErrNotImplemented)
|
||||
func (f *billyFile) Truncate(size int64) error {
|
||||
return billyErr(vfs.ErrNotImplemented, f.log)
|
||||
}
|
||||
|
||||
// Write implements billy.File.
|
||||
func (*billyFile) Write(p []byte) (n int, err error) {
|
||||
return 0, billyErr(vfs.ErrNotImplemented)
|
||||
func (f *billyFile) Write(p []byte) (n int, err error) {
|
||||
return 0, billyErr(vfs.ErrNotImplemented, f.log)
|
||||
}
|
||||
|
||||
// Lock implements billy.File.
|
||||
|
@ -182,9 +188,19 @@ func (*billyFile) Unlock() error {
|
|||
return nil // TODO
|
||||
}
|
||||
|
||||
func billyErr(err error) error {
|
||||
if err == vfs.ErrNotImplemented {
|
||||
func billyErr(err error, log zerolog.Logger) error {
|
||||
if errors.Is(err, vfs.ErrNotImplemented) {
|
||||
return billy.ErrNotSupported
|
||||
}
|
||||
if errors.Is(err, vfs.ErrNotExist) {
|
||||
if err, ok := asErr[*fs.PathError](err); ok {
|
||||
log.Error().Err(err.Err).Str("op", err.Op).Str("path", err.Path).Msg("file not found")
|
||||
}
|
||||
return fs.ErrNotExist
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func asErr[E error](err error) (e E, ok bool) {
|
||||
return e, errors.As(err, &e)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/rs/zerolog/log"
|
||||
"golang.org/x/net/webdav"
|
||||
)
|
||||
|
||||
func NewWebDAVServer(fs vfs.Filesystem, port int, user, pass string) error {
|
||||
|
@ -36,3 +37,42 @@ func NewWebDAVServer(fs vfs.Filesystem, port int, user, pass string) error {
|
|||
|
||||
return httpServer.ListenAndServe()
|
||||
}
|
||||
|
||||
func NewDirServer(dir string, port int, user, pass string) error {
|
||||
|
||||
l := log.Logger.With().Str("component", "webDAV").Logger()
|
||||
srv := &webdav.Handler{
|
||||
Prefix: "/",
|
||||
FileSystem: webdav.Dir(dir),
|
||||
LockSystem: webdav.NewMemLS(),
|
||||
Logger: func(req *http.Request, err error) {
|
||||
if err != nil {
|
||||
l.Error().Err(err).Str("path", req.RequestURI).Msg("webDAV error")
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
serveMux := http.NewServeMux()
|
||||
|
||||
serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
username, password, _ := r.BasicAuth()
|
||||
if username == user && password == pass {
|
||||
srv.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("WWW-Authenticate", `Basic realm="BASIC WebDAV REALM"`)
|
||||
w.WriteHeader(401)
|
||||
_, _ = w.Write([]byte("401 Unauthorized\n"))
|
||||
})
|
||||
|
||||
//nolint:exhaustruct
|
||||
httpServer := &http.Server{
|
||||
Addr: fmt.Sprintf("0.0.0.0:%d", port),
|
||||
Handler: serveMux,
|
||||
}
|
||||
|
||||
log.Info().Str("host", httpServer.Addr).Msg("starting webDAV server")
|
||||
|
||||
return httpServer.ListenAndServe()
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
|
||||
type Service struct {
|
||||
c *torrent.Client
|
||||
rep storage.TorrentsRepository
|
||||
rep storage.ExlcudedFiles
|
||||
|
||||
// stats *Stats
|
||||
DefaultPriority types.PiecePriority
|
||||
|
@ -24,7 +24,7 @@ type Service struct {
|
|||
addTimeout, readTimeout int
|
||||
}
|
||||
|
||||
func NewService(c *torrent.Client, rep storage.TorrentsRepository, addTimeout, readTimeout int) *Service {
|
||||
func NewService(c *torrent.Client, rep storage.ExlcudedFiles, addTimeout, readTimeout int) *Service {
|
||||
l := slog.With("component", "torrent-service")
|
||||
return &Service{
|
||||
log: l,
|
||||
|
@ -39,6 +39,36 @@ func NewService(c *torrent.Client, rep storage.TorrentsRepository, addTimeout, r
|
|||
|
||||
var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs
|
||||
|
||||
func (s *Service) NewTorrent(f vfs.File) (*torrent.Torrent, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*time.Duration(s.addTimeout))
|
||||
defer cancel()
|
||||
defer f.Close()
|
||||
|
||||
mi, err := metainfo.Load(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t, ok := s.c.Torrent(mi.HashInfoBytes())
|
||||
if !ok {
|
||||
t, err = s.c.AddTorrent(mi)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("creating torrent fs timed out")
|
||||
case <-t.GotInfo():
|
||||
}
|
||||
for _, f := range t.Files() {
|
||||
f.SetPriority(s.DefaultPriority)
|
||||
}
|
||||
t.AllowDataDownload()
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*time.Duration(s.addTimeout))
|
||||
defer cancel()
|
||||
|
@ -72,3 +102,7 @@ func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) {
|
|||
func (s *Service) Stats() (*Stats, error) {
|
||||
return &Stats{}, nil
|
||||
}
|
||||
|
||||
func (s *Service) GetStats() torrent.ConnStats {
|
||||
return s.c.ConnStats()
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
)
|
||||
|
||||
func NewStorage(dataPath string, tsrv *service.Service) vfs.Filesystem {
|
||||
func NewTorrentStorage(dataPath string, tsrv *service.Service) vfs.Filesystem {
|
||||
factories := map[string]vfs.FsFactory{
|
||||
".torrent": tsrv.NewTorrentFs,
|
||||
}
|
||||
|
|
|
@ -13,12 +13,12 @@ import (
|
|||
"github.com/philippgille/gokv/encoding"
|
||||
)
|
||||
|
||||
type TorrentsRepository interface {
|
||||
type ExlcudedFiles interface {
|
||||
ExcludeFile(file *torrent.File) error
|
||||
ExcludedFiles(hash metainfo.Hash) ([]string, error)
|
||||
}
|
||||
|
||||
func NewTorrentMetaRepository(metaDir string, storage atstorage.ClientImplCloser) (TorrentsRepository, error) {
|
||||
func NewExcludedFiles(metaDir string, storage atstorage.ClientImplCloser) (ExlcudedFiles, error) {
|
||||
excludedFilesStore, err := badgerdb.NewStore(badgerdb.Options{
|
||||
Dir: filepath.Join(metaDir, "excluded-files"),
|
||||
Codec: encoding.JSON,
|
|
@ -5,9 +5,11 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
dlog "git.kmsign.ru/royalcat/tstor/src/log"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type PieceCompletionState byte
|
||||
|
@ -32,9 +34,11 @@ type badgerPieceCompletion struct {
|
|||
var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil)
|
||||
|
||||
func NewBadgerPieceCompletion(dir string) (storage.PieceCompletion, error) {
|
||||
l := log.Logger.With().Str("component", "badger").Str("db", "piece-completion").Logger()
|
||||
|
||||
opts := badger.
|
||||
DefaultOptions(dir).
|
||||
WithLogger(badgerSlog{slog: slog.With("component", "piece-completion")})
|
||||
WithLogger(&dlog.Badger{L: l})
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -10,20 +10,15 @@ import (
|
|||
)
|
||||
|
||||
func SetupStorage(cfg config.TorrentClient) (storage.ClientImplCloser, storage.PieceCompletion, error) {
|
||||
pcp := filepath.Join(cfg.DataFolder, "piece-completion")
|
||||
pcp := filepath.Join(cfg.MetadataFolder, "piece-completion")
|
||||
if err := os.MkdirAll(pcp, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
pc, err := storage.NewBoltPieceCompletion(pcp)
|
||||
pc, err := NewBadgerPieceCompletion(pcp)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
}
|
||||
|
||||
// pc, err := NewBadgerPieceCompletion(pcp)
|
||||
// if err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
// }
|
||||
|
||||
// TODO implement cache/storage switching
|
||||
// cacheDir := filepath.Join(tcfg.DataFolder, "cache")
|
||||
// if err := os.MkdirAll(cacheDir, 0744); err != nil {
|
||||
|
@ -39,19 +34,17 @@ func SetupStorage(cfg config.TorrentClient) (storage.ClientImplCloser, storage.P
|
|||
// rp := storage.NewResourcePieces(fc.AsResourceProvider())
|
||||
// st := &stc{rp}
|
||||
|
||||
// filesDir := filepath.Join(cfg.DataFolder, "files")
|
||||
// if err := os.MkdirAll(filesDir, 0744); err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
// }
|
||||
|
||||
//st := NewFileStorage(filesDir, pc)
|
||||
|
||||
piecesDir := filepath.Join(cfg.DataFolder, "pieces")
|
||||
if err := os.MkdirAll(piecesDir, 0744); err != nil {
|
||||
filesDir := cfg.DataFolder
|
||||
if err := os.MkdirAll(filesDir, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
st := NewFileStorage(filesDir, pc)
|
||||
|
||||
st := storage.NewMMapWithCompletion(piecesDir, pc)
|
||||
// piecesDir := filepath.Join(cfg.DataFolder, ".pieces")
|
||||
// if err := os.MkdirAll(piecesDir, 0744); err != nil {
|
||||
// return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
// }
|
||||
// st := storage.NewMMapWithCompletion(piecesDir, pc)
|
||||
|
||||
return st, pc, nil
|
||||
}
|
||||
|
|
|
@ -1,55 +1,68 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"slices"
|
||||
|
||||
"github.com/anacrolix/missinggo"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/common"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/segments"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type FileStorageDeleter interface {
|
||||
storage.ClientImplCloser
|
||||
DeleteFile(file *torrent.File) error
|
||||
Cleanup(expected []*torrent.Torrent) error
|
||||
}
|
||||
|
||||
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
|
||||
func NewFileStorage(baseDir string, pc storage.PieceCompletion) FileStorageDeleter {
|
||||
return &FileStorage{baseDir: baseDir, pieceCompletion: pc}
|
||||
|
||||
return &FileStorage{
|
||||
baseDir: baseDir,
|
||||
ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{
|
||||
ClientBaseDir: baseDir,
|
||||
PieceCompletion: pc,
|
||||
TorrentDirMaker: torrentDir,
|
||||
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
|
||||
return filePath(opts.File)
|
||||
},
|
||||
}),
|
||||
pieceCompletion: pc,
|
||||
log: log.Logger.With().Str("component", "torrent-client").Logger(),
|
||||
}
|
||||
}
|
||||
|
||||
// File-based storage for torrents, that isn't yet bound to a particular torrent.
|
||||
type FileStorage struct {
|
||||
baseDir string
|
||||
baseDir string
|
||||
storage.ClientImplCloser
|
||||
pieceCompletion storage.PieceCompletion
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
func (me *FileStorage) Close() error {
|
||||
return me.pieceCompletion.Close()
|
||||
}
|
||||
|
||||
func (me *FileStorage) torrentDir(info *metainfo.Info, infoHash metainfo.Hash) string {
|
||||
return filepath.Join(me.baseDir, info.Name)
|
||||
func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string {
|
||||
return filepath.Join(baseDir, info.Name)
|
||||
}
|
||||
|
||||
func (me *FileStorage) filePath(file metainfo.FileInfo) string {
|
||||
func filePath(file *metainfo.FileInfo) string {
|
||||
return filepath.Join(file.Path...)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) DeleteFile(file *torrent.File) error {
|
||||
info := file.Torrent().Info()
|
||||
infoHash := file.Torrent().InfoHash()
|
||||
torrentDir := fs.torrentDir(info, infoHash)
|
||||
relFilePath := fs.filePath(file.FileInfo())
|
||||
torrentDir := torrentDir(fs.baseDir, info, infoHash)
|
||||
fileInfo := file.FileInfo()
|
||||
relFilePath := filePath(&fileInfo)
|
||||
filePath := path.Join(torrentDir, relFilePath)
|
||||
for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
|
||||
pk := metainfo.PieceKey{InfoHash: infoHash, Index: i}
|
||||
|
@ -61,254 +74,32 @@ func (fs *FileStorage) DeleteFile(file *torrent.File) error {
|
|||
return os.Remove(filePath)
|
||||
}
|
||||
|
||||
func (fs FileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
|
||||
dir := fs.torrentDir(info, infoHash)
|
||||
upvertedFiles := info.UpvertedFiles()
|
||||
files := make([]file, 0, len(upvertedFiles))
|
||||
for i, fileInfo := range upvertedFiles {
|
||||
filePath := filepath.Join(dir, fs.filePath(fileInfo))
|
||||
if !isSubFilepath(dir, filePath) {
|
||||
return storage.TorrentImpl{}, fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, fs.baseDir)
|
||||
}
|
||||
|
||||
f := file{
|
||||
path: filePath,
|
||||
length: fileInfo.Length,
|
||||
}
|
||||
if f.length == 0 {
|
||||
err := CreateNativeZeroLengthFile(f.path)
|
||||
if err != nil {
|
||||
return storage.TorrentImpl{}, fmt.Errorf("creating zero length file: %w", err)
|
||||
}
|
||||
}
|
||||
files = append(files, f)
|
||||
func (fs *FileStorage) Cleanup(expected []*torrent.Torrent) error {
|
||||
expectedEntries := []string{}
|
||||
for _, e := range expected {
|
||||
expectedEntries = append(expectedEntries, e.Name())
|
||||
}
|
||||
t := &fileTorrentImpl{
|
||||
files: files,
|
||||
segmentLocater: segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
|
||||
infoHash: infoHash,
|
||||
completion: fs.pieceCompletion,
|
||||
|
||||
entries, err := os.ReadDir(fs.baseDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return storage.TorrentImpl{
|
||||
Piece: t.Piece,
|
||||
Close: t.Close,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type file struct {
|
||||
// The safe, OS-local file path.
|
||||
path string
|
||||
length int64
|
||||
}
|
||||
|
||||
type fileTorrentImpl struct {
|
||||
files []file
|
||||
segmentLocater segments.Index
|
||||
infoHash metainfo.Hash
|
||||
completion storage.PieceCompletion
|
||||
}
|
||||
|
||||
func (fts *fileTorrentImpl) Piece(p metainfo.Piece) storage.PieceImpl {
|
||||
// Create a view onto the file-based torrent storage.
|
||||
_io := fileTorrentImplIO{fts}
|
||||
// Return the appropriate segments of this.
|
||||
return &filePieceImpl{
|
||||
fileTorrentImpl: fts,
|
||||
p: p,
|
||||
WriterAt: missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
|
||||
ReaderAt: io.NewSectionReader(_io, p.Offset(), p.Length()),
|
||||
toDelete := []string{}
|
||||
for _, v := range entries {
|
||||
if !slices.Contains(expectedEntries, v.Name()) {
|
||||
toDelete = append(toDelete, v.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *fileTorrentImpl) Close() error {
|
||||
fs.log.Info().Int("count", len(toDelete)).Msg("start deleting trash data")
|
||||
for _, name := range toDelete {
|
||||
p := path.Join(fs.baseDir, name)
|
||||
fs.log.Info().Str("path", p).Msg("deleting trash data")
|
||||
err := os.RemoveAll(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// A helper to create zero-length files which won't appear for file-orientated storage since no
|
||||
// writes will ever occur to them (no torrent data is associated with a zero-length file). The
|
||||
// caller should make sure the file name provided is safe/sanitized.
|
||||
func CreateNativeZeroLengthFile(name string) error {
|
||||
err := os.MkdirAll(filepath.Dir(name), 0o777)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f, err := os.Create(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// Exposes file-based storage of a torrent, as one big ReadWriterAt.
|
||||
type fileTorrentImplIO struct {
|
||||
fts *fileTorrentImpl
|
||||
}
|
||||
|
||||
// Returns EOF on short or missing file.
|
||||
func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
|
||||
f, err := os.Open(file.path)
|
||||
if os.IsNotExist(err) {
|
||||
// File missing is treated the same as a short file.
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
// Limit the read to within the expected bounds of this file.
|
||||
if int64(len(b)) > file.length-off {
|
||||
b = b[:file.length-off]
|
||||
}
|
||||
for off < file.length && len(b) != 0 {
|
||||
n1, err1 := f.ReadAt(b, off)
|
||||
b = b[n1:]
|
||||
n += n1
|
||||
off += int64(n1)
|
||||
if n1 == 0 {
|
||||
err = err1
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
|
||||
func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
fst.fts.segmentLocater.Locate(
|
||||
segments.Extent{Start: off, Length: int64(len(b))},
|
||||
func(i int, e segments.Extent) bool {
|
||||
n1, err1 := fst.readFileAt(fst.fts.files[i], b[:e.Length], e.Start)
|
||||
n += n1
|
||||
b = b[n1:]
|
||||
err = err1
|
||||
return err == nil // && int64(n1) == e.Length
|
||||
},
|
||||
)
|
||||
if len(b) != 0 && err == nil {
|
||||
err = io.EOF
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
|
||||
// log.Printf("write at %v: %v bytes", off, len(p))
|
||||
fst.fts.segmentLocater.Locate(
|
||||
segments.Extent{Start: off, Length: int64(len(p))},
|
||||
func(i int, e segments.Extent) bool {
|
||||
name := fst.fts.files[i].path
|
||||
err = os.MkdirAll(filepath.Dir(name), 0o777)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
var f *os.File
|
||||
f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
var n1 int
|
||||
n1, err = f.WriteAt(p[:e.Length], e.Start)
|
||||
// log.Printf("%v %v wrote %v: %v", i, e, n1, err)
|
||||
closeErr := f.Close()
|
||||
n += n1
|
||||
p = p[n1:]
|
||||
if err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
if err == nil && int64(n1) != e.Length {
|
||||
err = io.ErrShortWrite
|
||||
}
|
||||
return err == nil
|
||||
},
|
||||
)
|
||||
return n, err
|
||||
}
|
||||
|
||||
type filePieceImpl struct {
|
||||
*fileTorrentImpl
|
||||
p metainfo.Piece
|
||||
io.WriterAt
|
||||
io.ReaderAt
|
||||
}
|
||||
|
||||
var _ storage.PieceImpl = (*filePieceImpl)(nil)
|
||||
|
||||
func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
|
||||
return metainfo.PieceKey{InfoHash: me.infoHash, Index: me.p.Index()}
|
||||
}
|
||||
|
||||
func (fs *filePieceImpl) Completion() storage.Completion {
|
||||
c, err := fs.completion.Get(fs.pieceKey())
|
||||
if err != nil {
|
||||
log.Printf("error getting piece completion: %s", err)
|
||||
c.Ok = false
|
||||
return c
|
||||
}
|
||||
|
||||
verified := true
|
||||
if c.Complete {
|
||||
// If it's allegedly complete, check that its constituent files have the necessary length.
|
||||
for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
|
||||
s, err := os.Stat(fs.files[fi.fileIndex].path)
|
||||
if err != nil || s.Size() < fi.length {
|
||||
verified = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !verified {
|
||||
// The completion was wrong, fix it.
|
||||
c.Complete = false
|
||||
fs.completion.Set(fs.pieceKey(), false)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (fs *filePieceImpl) MarkComplete() error {
|
||||
return fs.completion.Set(fs.pieceKey(), true)
|
||||
}
|
||||
|
||||
func (fs *filePieceImpl) MarkNotComplete() error {
|
||||
return fs.completion.Set(fs.pieceKey(), false)
|
||||
}
|
||||
|
||||
type requiredLength struct {
|
||||
fileIndex int
|
||||
length int64
|
||||
}
|
||||
|
||||
func isSubFilepath(base, sub string) bool {
|
||||
rel, err := filepath.Rel(base, sub)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return rel != ".." && !strings.HasPrefix(rel, ".."+string(os.PathSeparator))
|
||||
}
|
||||
|
||||
func extentCompleteRequiredLengths(info *metainfo.Info, off, n int64) (ret []requiredLength) {
|
||||
if n == 0 {
|
||||
return
|
||||
}
|
||||
for i, fi := range info.UpvertedFiles() {
|
||||
if off >= fi.Length {
|
||||
off -= fi.Length
|
||||
continue
|
||||
}
|
||||
n1 := n
|
||||
if off+n1 > fi.Length {
|
||||
n1 = fi.Length - off
|
||||
}
|
||||
ret = append(ret, requiredLength{
|
||||
fileIndex: i,
|
||||
length: off + n1,
|
||||
})
|
||||
n -= n1
|
||||
if n == 0 {
|
||||
return
|
||||
}
|
||||
off = 0
|
||||
}
|
||||
panic("extent exceeds torrent bounds")
|
||||
}
|
||||
|
|
|
@ -11,11 +11,6 @@ type OsFS struct {
|
|||
hostDir string
|
||||
}
|
||||
|
||||
// Unlink implements Filesystem.
|
||||
func (fs *OsFS) Unlink(filename string) error {
|
||||
return fs.Unlink(filename)
|
||||
}
|
||||
|
||||
// Stat implements Filesystem.
|
||||
func (fs *OsFS) Stat(filename string) (fs.FileInfo, error) {
|
||||
if path.Clean(filename) == Separator {
|
||||
|
@ -25,6 +20,11 @@ func (fs *OsFS) Stat(filename string) (fs.FileInfo, error) {
|
|||
return os.Stat(path.Join(fs.hostDir, filename))
|
||||
}
|
||||
|
||||
// Unlink implements Filesystem.
|
||||
func (fs *OsFS) Unlink(filename string) error {
|
||||
return os.RemoveAll(path.Join(fs.hostDir, filename))
|
||||
}
|
||||
|
||||
// Open implements Filesystem.
|
||||
func (fs *OsFS) Open(filename string) (File, error) {
|
||||
if path.Clean(filename) == Separator {
|
||||
|
|
|
@ -22,7 +22,7 @@ var _ Filesystem = &TorrentFs{}
|
|||
type TorrentFs struct {
|
||||
mu sync.Mutex
|
||||
t *torrent.Torrent
|
||||
rep storage.TorrentsRepository
|
||||
rep storage.ExlcudedFiles
|
||||
|
||||
readTimeout int
|
||||
|
||||
|
@ -32,7 +32,7 @@ type TorrentFs struct {
|
|||
resolver *resolver
|
||||
}
|
||||
|
||||
func NewTorrentFs(t *torrent.Torrent, rep storage.TorrentsRepository, readTimeout int) *TorrentFs {
|
||||
func NewTorrentFs(t *torrent.Torrent, rep storage.ExlcudedFiles, readTimeout int) *TorrentFs {
|
||||
return &TorrentFs{
|
||||
t: t,
|
||||
rep: rep,
|
||||
|
@ -54,29 +54,41 @@ func (fs *TorrentFs) files() (map[string]*torrentFile, error) {
|
|||
|
||||
fs.filesCache = make(map[string]*torrentFile)
|
||||
for _, file := range files {
|
||||
|
||||
p := file.Path()
|
||||
|
||||
if slices.Contains(excludedFiles, p) {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(p, "/.pad/") {
|
||||
continue
|
||||
}
|
||||
|
||||
p = AbsPath(file.Path())
|
||||
|
||||
// TODO make optional
|
||||
// removing the torrent root directory of same name as torrent
|
||||
p, _ = strings.CutPrefix(p, "/"+fs.t.Name()+"/")
|
||||
p = AbsPath(p)
|
||||
|
||||
fs.filesCache[p] = &torrentFile{
|
||||
name: path.Base(p),
|
||||
timeout: fs.readTimeout,
|
||||
file: file,
|
||||
}
|
||||
}
|
||||
|
||||
rootDir := "/" + fs.t.Name() + "/"
|
||||
singleDir := true
|
||||
for k, _ := range fs.filesCache {
|
||||
if !strings.HasPrefix(k, rootDir) {
|
||||
singleDir = false
|
||||
}
|
||||
}
|
||||
if singleDir {
|
||||
for k, f := range fs.filesCache {
|
||||
delete(fs.filesCache, k)
|
||||
k, _ = strings.CutPrefix(k, rootDir)
|
||||
k = AbsPath(k)
|
||||
fs.filesCache[k] = f
|
||||
}
|
||||
}
|
||||
|
||||
fs.mu.Unlock()
|
||||
}
|
||||
|
||||
|
|
|
@ -3,9 +3,12 @@ package log
|
|||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var _ badger.Logger = (*Badger)(nil)
|
||||
|
||||
type Badger struct {
|
||||
L zerolog.Logger
|
||||
}
|
||||
|
|
173
src/log/nfs.go
Normal file
173
src/log/nfs.go
Normal file
|
@ -0,0 +1,173 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
nfs "github.com/willscott/go-nfs"
|
||||
)
|
||||
|
||||
var _ nfs.Logger = (*NFSLog)(nil)
|
||||
|
||||
type NFSLog struct {
|
||||
r zerolog.Logger
|
||||
l zerolog.Logger
|
||||
}
|
||||
|
||||
func NewNFSLog(r zerolog.Logger) nfs.Logger {
|
||||
return &NFSLog{
|
||||
r: r,
|
||||
l: r.Level(zerolog.DebugLevel),
|
||||
}
|
||||
}
|
||||
|
||||
// Debug implements nfs.Logger.
|
||||
func (l *NFSLog) Debug(args ...interface{}) {
|
||||
l.l.Debug().Msg(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Debugf implements nfs.Logger.
|
||||
func (l *NFSLog) Debugf(format string, args ...interface{}) {
|
||||
l.l.Debug().Msgf(format, args...)
|
||||
}
|
||||
|
||||
// Error implements nfs.Logger.
|
||||
func (l *NFSLog) Error(args ...interface{}) {
|
||||
l.l.Error().Msg(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Errorf implements nfs.Logger.
|
||||
func (l *NFSLog) Errorf(format string, args ...interface{}) {
|
||||
l.l.Error().Msgf(format, args...)
|
||||
}
|
||||
|
||||
// Fatal implements nfs.Logger.
|
||||
func (l *NFSLog) Fatal(args ...interface{}) {
|
||||
l.l.Fatal().Msg(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Fatalf implements nfs.Logger.
|
||||
func (l *NFSLog) Fatalf(format string, args ...interface{}) {
|
||||
l.l.Fatal().Msgf(format, args...)
|
||||
}
|
||||
|
||||
// Info implements nfs.Logger.
|
||||
func (l *NFSLog) Info(args ...interface{}) {
|
||||
l.l.Info().Msg(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Infof implements nfs.Logger.
|
||||
func (l *NFSLog) Infof(format string, args ...interface{}) {
|
||||
l.l.Info().Msgf(format, args...)
|
||||
}
|
||||
|
||||
// Panic implements nfs.Logger.
|
||||
func (l *NFSLog) Panic(args ...interface{}) {
|
||||
l.l.Panic().Msg(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Panicf implements nfs.Logger.
|
||||
func (l *NFSLog) Panicf(format string, args ...interface{}) {
|
||||
l.l.Panic().Msgf(format, args...)
|
||||
}
|
||||
|
||||
// Print implements nfs.Logger.
|
||||
func (l *NFSLog) Print(args ...interface{}) {
|
||||
l.l.Print(args...)
|
||||
}
|
||||
|
||||
// Printf implements nfs.Logger.
|
||||
func (l *NFSLog) Printf(format string, args ...interface{}) {
|
||||
l.l.Printf(format, args...)
|
||||
}
|
||||
|
||||
// Trace implements nfs.Logger.
|
||||
func (l *NFSLog) Trace(args ...interface{}) {
|
||||
l.l.Trace().Msg(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Tracef implements nfs.Logger.
|
||||
func (l *NFSLog) Tracef(format string, args ...interface{}) {
|
||||
l.l.Trace().Msgf(format, args...)
|
||||
}
|
||||
|
||||
// Warn implements nfs.Logger.
|
||||
func (l *NFSLog) Warn(args ...interface{}) {
|
||||
l.l.Warn().Msg(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Warnf implements nfs.Logger.
|
||||
func (l *NFSLog) Warnf(format string, args ...interface{}) {
|
||||
l.l.Warn().Msgf(format, args...)
|
||||
}
|
||||
|
||||
// GetLevel implements nfs.Logger.
|
||||
func (l *NFSLog) GetLevel() nfs.LogLevel {
|
||||
zl := l.l.GetLevel()
|
||||
switch zl {
|
||||
case zerolog.PanicLevel, zerolog.Disabled:
|
||||
return nfs.PanicLevel
|
||||
case zerolog.FatalLevel:
|
||||
return nfs.FatalLevel
|
||||
case zerolog.ErrorLevel:
|
||||
return nfs.ErrorLevel
|
||||
case zerolog.WarnLevel:
|
||||
return nfs.WarnLevel
|
||||
case zerolog.InfoLevel:
|
||||
return nfs.InfoLevel
|
||||
case zerolog.DebugLevel:
|
||||
return nfs.DebugLevel
|
||||
case zerolog.TraceLevel:
|
||||
return nfs.TraceLevel
|
||||
}
|
||||
return nfs.DebugLevel
|
||||
}
|
||||
|
||||
// ParseLevel implements nfs.Logger.
|
||||
func (l *NFSLog) ParseLevel(level string) (nfs.LogLevel, error) {
|
||||
switch level {
|
||||
case "panic":
|
||||
return nfs.PanicLevel, nil
|
||||
case "fatal":
|
||||
return nfs.FatalLevel, nil
|
||||
case "error":
|
||||
return nfs.ErrorLevel, nil
|
||||
case "warn":
|
||||
return nfs.WarnLevel, nil
|
||||
case "info":
|
||||
return nfs.InfoLevel, nil
|
||||
case "debug":
|
||||
return nfs.DebugLevel, nil
|
||||
case "trace":
|
||||
return nfs.TraceLevel, nil
|
||||
}
|
||||
var ll nfs.LogLevel
|
||||
return ll, fmt.Errorf("invalid log level %q", level)
|
||||
}
|
||||
|
||||
// SetLevel implements nfs.Logger.
|
||||
func (l *NFSLog) SetLevel(level nfs.LogLevel) {
|
||||
switch level {
|
||||
case nfs.PanicLevel:
|
||||
l.l = l.r.Level(zerolog.PanicLevel)
|
||||
return
|
||||
case nfs.FatalLevel:
|
||||
l.l = l.r.Level(zerolog.FatalLevel)
|
||||
return
|
||||
case nfs.ErrorLevel:
|
||||
l.l = l.r.Level(zerolog.ErrorLevel)
|
||||
return
|
||||
case nfs.WarnLevel:
|
||||
l.l = l.r.Level(zerolog.WarnLevel)
|
||||
return
|
||||
case nfs.InfoLevel:
|
||||
l.l = l.r.Level(zerolog.InfoLevel)
|
||||
return
|
||||
case nfs.DebugLevel:
|
||||
l.l = l.r.Level(zerolog.DebugLevel)
|
||||
return
|
||||
case nfs.TraceLevel:
|
||||
l.l = l.r.Level(zerolog.TraceLevel)
|
||||
return
|
||||
}
|
||||
}
|
3
src/proto/gen.go
Normal file
3
src/proto/gen.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package proto
|
||||
|
||||
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go_opt=Mtstor.proto=git.kmsign.ru/royalcat/tstor/src/proto --go-grpc_out=. --go-grpc_opt=paths=source_relative --go-grpc_opt=Mtstor.proto=git.kmsign.ru/royalcat/tstor/src/proto --proto_path=../../proto tstor.proto
|
Loading…
Add table
Add a link
Reference in a new issue