rework 2 (working)

This commit is contained in:
royalcat 2023-10-16 12:18:40 +03:00
parent 5a77fa5e9c
commit d30ef6cc9c
48 changed files with 1340 additions and 1747 deletions

View file

@ -1,6 +1,7 @@
package config
var defaultConfig = Config{
DataFolder: "./data",
WebUi: WebUi{
Port: 4444,
IP: "0.0.0.0",
@ -22,8 +23,9 @@ var defaultConfig = Config{
},
TorrentClient: TorrentClient{
DataFolder: "data",
MetadataFolder: "metadata",
DataFolder: "./torrent/data",
MetadataFolder: "./torrent/metadata",
DHTNodes: []string{},
// GlobalCacheSize: 2048,

View file

@ -6,6 +6,8 @@ type Config struct {
TorrentClient TorrentClient `koanf:"torrent"`
Mounts Mounts `koanf:"mounts"`
Log Log `koanf:"log"`
DataFolder string `koanf:"dataFolder"`
}
type WebUi struct {
@ -25,7 +27,8 @@ type TorrentClient struct {
ReadTimeout int `koanf:"read_timeout,omitempty"`
AddTimeout int `koanf:"add_timeout,omitempty"`
DisableIPv6 bool `koanf:"disable_ipv6,omitempty"`
DHTNodes []string `koanf:"dhtnodes,omitempty"`
DisableIPv6 bool `koanf:"disable_ipv6,omitempty"`
DataFolder string `koanf:"data_folder,omitempty"`
MetadataFolder string `koanf:"metadata_folder,omitempty"`

View file

@ -1,24 +0,0 @@
package fs
type ContainerFs struct {
s *storage
}
func NewContainerFs(fss map[string]Filesystem) (*ContainerFs, error) {
s := newStorage(SupportedFactories)
for p, fs := range fss {
if err := s.AddFS(fs, p); err != nil {
return nil, err
}
}
return &ContainerFs{s: s}, nil
}
func (fs *ContainerFs) Open(filename string) (File, error) {
return fs.s.Get(filename)
}
func (fs *ContainerFs) ReadDir(path string) (map[string]File, error) {
return fs.s.Children(path)
}

View file

@ -1,28 +0,0 @@
package fs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestContainer(t *testing.T) {
t.Parallel()
require := require.New(t)
fss := map[string]Filesystem{
"/test": &DummyFs{},
}
c, err := NewContainerFs(fss)
require.NoError(err)
f, err := c.Open("/test/dir/here")
require.NoError(err)
require.NotNil(f)
files, err := c.ReadDir("/")
require.NoError(err)
require.Len(files, 1)
}

View file

@ -1,39 +0,0 @@
package fs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMemory(t *testing.T) {
t.Parallel()
require := require.New(t)
mem := NewMemory()
mem.Storage.Add(NewMemoryFile([]byte("Hello")), "/dir/here")
fss := map[string]Filesystem{
"/test": mem,
}
c, err := NewContainerFs(fss)
require.NoError(err)
f, err := c.Open("/test/dir/here")
require.NoError(err)
require.NotNil(f)
require.Equal(int64(5), f.Size())
require.NoError(f.Close())
files, err := c.ReadDir("/")
require.NoError(err)
require.Len(files, 1)
files, err = c.ReadDir("/test")
require.NoError(err)
require.Len(files, 1)
}

View file

@ -1,184 +0,0 @@
package fs
import (
"os"
"path"
"strings"
)
const separator = "/"
type FsFactory func(f File) (Filesystem, error)
var SupportedFactories = map[string]FsFactory{
".zip": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &Zip{}), nil
},
".rar": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &Rar{}), nil
},
".7z": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &SevenZip{}), nil
},
}
type storage struct {
factories map[string]FsFactory
files map[string]File
filesystems map[string]Filesystem
children map[string]map[string]File
}
func newStorage(factories map[string]FsFactory) *storage {
return &storage{
files: make(map[string]File),
children: make(map[string]map[string]File),
filesystems: make(map[string]Filesystem),
factories: factories,
}
}
func (s *storage) Clear() {
s.files = make(map[string]File)
s.children = make(map[string]map[string]File)
s.filesystems = make(map[string]Filesystem)
s.Add(&Dir{}, "/")
}
func (s *storage) Has(path string) bool {
path = clean(path)
f := s.files[path]
if f != nil {
return true
}
if f, _ := s.getFileFromFs(path); f != nil {
return true
}
return false
}
func (s *storage) AddFS(fs Filesystem, p string) error {
p = clean(p)
if s.Has(p) {
if dir, err := s.Get(p); err == nil {
if !dir.IsDir() {
return os.ErrExist
}
}
return nil
}
s.filesystems[p] = fs
return s.createParent(p, &Dir{})
}
func (s *storage) Add(f File, p string) error {
p = clean(p)
if s.Has(p) {
if dir, err := s.Get(p); err == nil {
if !dir.IsDir() {
return os.ErrExist
}
}
return nil
}
ext := path.Ext(p)
if ffs := s.factories[ext]; ffs != nil {
fs, err := ffs(f)
if err != nil {
return err
}
s.filesystems[p] = fs
} else {
s.files[p] = f
}
return s.createParent(p, f)
}
func (s *storage) createParent(p string, f File) error {
base, filename := path.Split(p)
base = clean(base)
if err := s.Add(&Dir{}, base); err != nil {
return err
}
if _, ok := s.children[base]; !ok {
s.children[base] = make(map[string]File)
}
if filename != "" {
s.children[base][filename] = f
}
return nil
}
func (s *storage) Children(path string) (map[string]File, error) {
path = clean(path)
files, err := s.getDirFromFs(path)
if err == nil {
return files, nil
}
if !os.IsNotExist(err) {
return nil, err
}
l := make(map[string]File)
for n, f := range s.children[path] {
l[n] = f
}
return l, nil
}
func (s *storage) Get(path string) (File, error) {
path = clean(path)
if !s.Has(path) {
return nil, os.ErrNotExist
}
file, ok := s.files[path]
if ok {
return file, nil
}
return s.getFileFromFs(path)
}
func (s *storage) getFileFromFs(p string) (File, error) {
for fsp, fs := range s.filesystems {
if strings.HasPrefix(p, fsp) {
return fs.Open(separator + strings.TrimPrefix(p, fsp))
}
}
return nil, os.ErrNotExist
}
func (s *storage) getDirFromFs(p string) (map[string]File, error) {
for fsp, fs := range s.filesystems {
if strings.HasPrefix(p, fsp) {
path := strings.TrimPrefix(p, fsp)
return fs.ReadDir(path)
}
}
return nil, os.ErrNotExist
}
func clean(p string) string {
return path.Clean(separator + strings.ReplaceAll(p, "\\", "/"))
}

View file

@ -1,195 +0,0 @@
package fs
import (
"os"
"testing"
"github.com/stretchr/testify/require"
)
var dummyFactories = map[string]FsFactory{
".test": func(f File) (Filesystem, error) {
return &DummyFs{}, nil
},
}
func TestStorage(t *testing.T) {
t.Parallel()
require := require.New(t)
s := newStorage(dummyFactories)
err := s.Add(&Dummy{}, "/path/to/dummy/file.txt")
require.NoError(err)
err = s.Add(&Dummy{}, "/path/to/dummy/file2.txt")
require.NoError(err)
contains := s.Has("/path")
require.True(contains)
contains = s.Has("/path/to/dummy/")
require.True(contains)
file, err := s.Get("/path/to/dummy/file.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
file, err = s.Get("/path/to/dummy/file3.txt")
require.Error(err)
require.Nil(file)
files, err := s.Children("/path/to/dummy/")
require.NoError(err)
require.Len(files, 2)
require.Contains(files, "file.txt")
require.Contains(files, "file2.txt")
err = s.Add(&Dummy{}, "/path/to/dummy/folder/file.txt")
require.NoError(err)
files, err = s.Children("/path/to/dummy/")
require.NoError(err)
require.Len(files, 3)
require.Contains(files, "file.txt")
require.Contains(files, "file2.txt")
require.Contains(files, "folder")
err = s.Add(&Dummy{}, "path/file4.txt")
require.NoError(err)
require.True(s.Has("/path/file4.txt"))
files, err = s.Children("/")
require.NoError(err)
require.Len(files, 1)
err = s.Add(&Dummy{}, "/path/special_file.test")
require.NoError(err)
file, err = s.Get("/path/special_file.test/dir/here/file1.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
files, err = s.Children("/path/special_file.test")
require.NoError(err)
require.NotNil(files)
files, err = s.Children("/path/special_file.test/dir/here")
require.NoError(err)
require.Len(files, 2)
err = s.Add(&Dummy{}, "/path/to/__special__path/file3.txt")
require.NoError(err)
file, err = s.Get("/path/to/__special__path/file3.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
s.Clear()
}
func TestStorageWindowsPath(t *testing.T) {
t.Parallel()
require := require.New(t)
s := newStorage(dummyFactories)
err := s.Add(&Dummy{}, "\\path\\to\\dummy\\file.txt")
require.NoError(err)
file, err := s.Get("\\path\\to\\dummy\\file.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
file, err = s.Get("/path/to/dummy/file.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
}
func TestStorageAddFs(t *testing.T) {
t.Parallel()
require := require.New(t)
s := newStorage(dummyFactories)
err := s.AddFS(&DummyFs{}, "/test")
require.NoError(err)
f, err := s.Get("/test/dir/here/file1.txt")
require.NoError(err)
require.NotNil(f)
err = s.AddFS(&DummyFs{}, "/test")
require.Error(err)
}
func TestSupportedFactories(t *testing.T) {
t.Parallel()
require := require.New(t)
require.Contains(SupportedFactories, ".zip")
require.Contains(SupportedFactories, ".rar")
require.Contains(SupportedFactories, ".7z")
fs, err := SupportedFactories[".zip"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = SupportedFactories[".rar"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = SupportedFactories[".7z"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
}
var _ Filesystem = &DummyFs{}
type DummyFs struct {
}
func (d *DummyFs) Open(filename string) (File, error) {
return &Dummy{}, nil
}
func (d *DummyFs) ReadDir(path string) (map[string]File, error) {
if path == "/dir/here" {
return map[string]File{
"file1.txt": &Dummy{},
"file2.txt": &Dummy{},
}, nil
}
return nil, os.ErrNotExist
}
var _ File = &Dummy{}
type Dummy struct {
}
func (d *Dummy) Size() int64 {
return 0
}
func (d *Dummy) IsDir() bool {
return false
}
func (d *Dummy) Close() error {
return nil
}
func (d *Dummy) Read(p []byte) (n int, err error) {
return 0, nil
}
func (d *Dummy) ReadAt(p []byte, off int64) (n int, err error) {
return 0, nil
}

120
src/host/storage.go Normal file
View file

@ -0,0 +1,120 @@
package host
import (
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
)
type storage struct {
factories map[string]vfs.FsFactory
}
func NewStorage(downPath string, tsrv *torrent.Service) vfs.Filesystem {
factories := map[string]vfs.FsFactory{
".torrent": tsrv.NewTorrentFs,
}
// add default torrent factory for root filesystem
for k, v := range vfs.ArchiveFactories {
factories[k] = v
}
return vfs.NewResolveFS(downPath, factories)
}
// func (s *storage) Clear() {
// s.files = make(map[string]vfs.File)
// }
// func (s *storage) Has(path string) bool {
// path = clean(path)
// f := s.files[path]
// if f != nil {
// return true
// }
// if f, _ := s.getFileFromFs(path); f != nil {
// return true
// }
// return false
// }
// func (s *storage) createParent(p string, f File) error {
// base, filename := path.Split(p)
// base = clean(base)
// if err := s.Add(&Dir{}, base); err != nil {
// return err
// }
// if _, ok := s.children[base]; !ok {
// s.children[base] = make(map[string]File)
// }
// if filename != "" {
// s.children[base][filename] = f
// }
// return nil
// }
// func (s *storage) Children(path string) (map[string]File, error) {
// path = clean(path)
// files, err := s.getDirFromFs(path)
// if err == nil {
// return files, nil
// }
// if !os.IsNotExist(err) {
// return nil, err
// }
// l := make(map[string]File)
// for n, f := range s.children[path] {
// l[n] = f
// }
// return l, nil
// }
// func (s *storage) Get(path string) (File, error) {
// path = clean(path)
// if !s.Has(path) {
// return nil, os.ErrNotExist
// }
// file, ok := s.files[path]
// if ok {
// return file, nil
// }
// return s.getFileFromFs(path)
// }
// func (s *storage) getFileFromFs(p string) (File, error) {
// for fsp, fs := range s.filesystems {
// if strings.HasPrefix(p, fsp) {
// return fs.Open(separator + strings.TrimPrefix(p, fsp))
// }
// }
// return nil, os.ErrNotExist
// }
// func (s *storage) getDirFromFs(p string) (map[string]File, error) {
// for fsp, fs := range s.filesystems {
// if strings.HasPrefix(p, fsp) {
// path := strings.TrimPrefix(p, fsp)
// return fs.ReadDir(path)
// }
// }
// return nil, os.ErrNotExist
// }
// func clean(p string) string {
// return path.Clean(separator + strings.ReplaceAll(p, "\\", "/"))
// }

213
src/host/torrent/service.go Normal file
View file

@ -0,0 +1,213 @@
package torrent
import (
"sync"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type Service struct {
c *torrent.Client
// stats *Stats
mu sync.Mutex
log zerolog.Logger
addTimeout, readTimeout int
}
func NewService(c *torrent.Client, addTimeout, readTimeout int) *Service {
l := log.Logger.With().Str("component", "torrent-service").Logger()
return &Service{
log: l,
c: c,
// stats: newStats(), // TODO persistent
addTimeout: addTimeout,
readTimeout: readTimeout,
}
}
var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs
func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) {
defer f.Close()
mi, err := metainfo.Load(f)
if err != nil {
return nil, err
}
t, err := s.c.AddTorrent(mi)
if err != nil {
return nil, err
}
<-t.GotInfo()
t.AllowDataDownload()
return vfs.NewTorrentFs(t, s.readTimeout), nil
}
func (s *Service) Stats() (*Stats, error) {
return &Stats{}, nil
}
// func (s *Service) Load() (map[string]vfs.Filesystem, error) {
// // Load from config
// s.log.Info().Msg("adding torrents from configuration")
// for _, loader := range s.loaders {
// if err := s.load(loader); err != nil {
// return nil, err
// }
// }
// // Load from DB
// s.log.Info().Msg("adding torrents from database")
// return s.fss, s.load(s.db)
// }
// func (s *Service) load(l loader.Loader) error {
// list, err := l.ListMagnets()
// if err != nil {
// return err
// }
// for r, ms := range list {
// s.addRoute(r)
// for _, m := range ms {
// if err := s.addMagnet(r, m); err != nil {
// return err
// }
// }
// }
// list, err = l.ListTorrentPaths()
// if err != nil {
// return err
// }
// for r, ms := range list {
// s.addRoute(r)
// for _, p := range ms {
// if err := s.addTorrentPath(r, p); err != nil {
// return err
// }
// }
// }
// return nil
// }
// func (s *Service) AddMagnet(r, m string) error {
// if err := s.addMagnet(r, m); err != nil {
// return err
// }
// // Add to db
// return s.db.AddMagnet(r, m)
// }
// func (s *Service) addTorrentPath(r, p string) error {
// // Add to client
// t, err := s.c.AddTorrentFromFile(p)
// if err != nil {
// return err
// }
// return s.addTorrent(r, t)
// }
// func (s *Service) addMagnet(r, m string) error {
// // Add to client
// t, err := s.c.AddMagnet(m)
// if err != nil {
// return err
// }
// return s.addTorrent(r, t)
// }
// func (s *Service) addRoute(r string) {
// s.s.AddRoute(r)
// // Add to filesystems
// folder := path.Join("/", r)
// s.mu.Lock()
// defer s.mu.Unlock()
// _, ok := s.fss[folder]
// if !ok {
// s.fss[folder] = vfs.NewTorrentFs(s.readTimeout)
// }
// }
// func (s *Service) addTorrent(r string, t *torrent.Torrent) error {
// // only get info if name is not available
// if t.Info() == nil {
// s.log.Info().Str("hash", t.InfoHash().String()).Msg("getting torrent info")
// select {
// case <-time.After(time.Duration(s.addTimeout) * time.Second):
// s.log.Error().Str("hash", t.InfoHash().String()).Msg("timeout getting torrent info")
// return errors.New("timeout getting torrent info")
// case <-t.GotInfo():
// s.log.Info().Str("hash", t.InfoHash().String()).Msg("obtained torrent info")
// }
// }
// // Add to stats
// s.s.Add(r, t)
// // Add to filesystems
// folder := path.Join("/", r)
// s.mu.Lock()
// defer s.mu.Unlock()
// tfs, ok := s.fss[folder].(*vfs.TorrentFs)
// if !ok {
// return errors.New("error adding torrent to filesystem")
// }
// tfs.AddTorrent(t)
// s.log.Info().Str("name", t.Info().Name).Str("route", r).Msg("torrent added")
// return nil
// }
// func (s *Service) RemoveFromHash(r, h string) error {
// // Remove from db
// deleted, err := s.db.RemoveFromHash(r, h)
// if err != nil {
// return err
// }
// if !deleted {
// return fmt.Errorf("element with hash %v on route %v cannot be removed", h, r)
// }
// // Remove from stats
// s.s.Del(r, h)
// // Remove from fs
// folder := path.Join("/", r)
// tfs, ok := s.fss[folder].(*vfs.TorrentFs)
// if !ok {
// return errors.New("error removing torrent from filesystem")
// }
// tfs.RemoveTorrent(h)
// // Remove from client
// var mh metainfo.Hash
// if err := mh.FromHexString(h); err != nil {
// return err
// }
// t, ok := s.c.Torrent(metainfo.NewHashFromHex(h))
// if ok {
// t.Drop()
// }
// return nil
// }

View file

@ -2,7 +2,6 @@ package torrent
import (
"errors"
"sort"
"sync"
"time"
@ -73,65 +72,42 @@ type stat struct {
}
type Stats struct {
mut sync.Mutex
torrents map[string]*torrent.Torrent
torrentsByRoute map[string]map[string]*torrent.Torrent
previousStats map[string]*stat
mut sync.Mutex
torrents map[string]*torrent.Torrent
previousStats map[string]*stat
gTime time.Time
}
func NewStats() *Stats {
func newStats() *Stats {
return &Stats{
gTime: time.Now(),
torrents: make(map[string]*torrent.Torrent),
torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
previousStats: make(map[string]*stat),
gTime: time.Now(),
torrents: make(map[string]*torrent.Torrent),
// torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
// previousStats: make(map[string]*stat),
}
}
func (s *Stats) AddRoute(route string) {
_, ok := s.torrentsByRoute[route]
if !ok {
s.torrentsByRoute[route] = make(map[string]*torrent.Torrent)
}
}
func (s *Stats) Add(route string, t *torrent.Torrent) {
func (s *Stats) Add(path string, t *torrent.Torrent) {
s.mut.Lock()
defer s.mut.Unlock()
h := t.InfoHash().String()
s.torrents[h] = t
s.previousStats[h] = &stat{}
_, ok := s.torrentsByRoute[route]
if !ok {
s.torrentsByRoute[route] = make(map[string]*torrent.Torrent)
}
s.torrentsByRoute[route][h] = t
s.torrents[path] = t
s.previousStats[path] = &stat{}
}
func (s *Stats) Del(route, hash string) {
func (s *Stats) Del(path string) {
s.mut.Lock()
defer s.mut.Unlock()
delete(s.torrents, hash)
delete(s.previousStats, hash)
ts, ok := s.torrentsByRoute[route]
if !ok {
return
}
delete(ts, hash)
delete(s.torrents, path)
delete(s.previousStats, path)
}
func (s *Stats) Stats(hash string) (*TorrentStats, error) {
func (s *Stats) Stats(path string) (*TorrentStats, error) {
s.mut.Lock()
defer s.mut.Unlock()
t, ok := s.torrents[hash]
t, ok := s.torrents[path]
if !(ok) {
return nil, ErrTorrentNotFound
}
@ -141,32 +117,6 @@ func (s *Stats) Stats(hash string) (*TorrentStats, error) {
return s.stats(now, t, true), nil
}
func (s *Stats) RoutesStats() []*RouteStats {
s.mut.Lock()
defer s.mut.Unlock()
now := time.Now()
var out []*RouteStats
for r, tl := range s.torrentsByRoute {
var tStats []*TorrentStats
for _, t := range tl {
ts := s.stats(now, t, true)
tStats = append(tStats, ts)
}
sort.Sort(byName(tStats))
rs := &RouteStats{
Name: r,
TorrentStats: tStats,
}
out = append(out, rs)
}
return out
}
func (s *Stats) GlobalStats() *GlobalTorrentStats {
s.mut.Lock()
defer s.mut.Unlock()

View file

@ -1,4 +1,4 @@
package fs
package vfs
import (
"archive/zip"
@ -12,18 +12,129 @@ import (
"github.com/nwaples/rardecode/v2"
)
var _ loader = &Zip{}
type Zip struct {
var ArchiveFactories = map[string]FsFactory{
".zip": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), ZipLoader), nil
},
".rar": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), RarLoader), nil
},
".7z": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), SevenZipLoader), nil
},
}
func (fs *Zip) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
type ArchiveLoader func(r iio.Reader, size int64) (map[string]*archiveFile, error)
var _ Filesystem = &archive{}
type archive struct {
r iio.Reader
size int64
files func() (map[string]*archiveFile, error)
}
func NewArchive(r iio.Reader, size int64, loader ArchiveLoader) *archive {
return &archive{
r: r,
size: size,
files: sync.OnceValues(func() (map[string]*archiveFile, error) {
return loader(r, size)
}),
}
}
func (a *archive) Open(filename string) (File, error) {
files, err := a.files()
if err != nil {
return nil, err
}
return getFile(files, filename)
}
func (fs *archive) ReadDir(path string) (map[string]File, error) {
files, err := fs.files()
if err != nil {
return nil, err
}
return listFilesInDir(files, path)
}
var _ File = &archiveFile{}
func NewArchiveFile(readerFunc func() (iio.Reader, error), len int64) *archiveFile {
return &archiveFile{
readerFunc: readerFunc,
len: len,
}
}
type archiveFile struct {
readerFunc func() (iio.Reader, error)
reader iio.Reader
len int64
}
func (d *archiveFile) load() error {
if d.reader != nil {
return nil
}
r, err := d.readerFunc()
if err != nil {
return err
}
d.reader = r
return nil
}
func (d *archiveFile) Size() int64 {
return d.len
}
func (d *archiveFile) IsDir() bool {
return false
}
func (d *archiveFile) Close() (err error) {
if d.reader != nil {
err = d.reader.Close()
d.reader = nil
}
return
}
func (d *archiveFile) Read(p []byte) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.Read(p)
}
func (d *archiveFile) ReadAt(p []byte, off int64) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.ReadAt(p, off)
}
var _ ArchiveLoader = ZipLoader
func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
zr, err := zip.NewReader(reader, size)
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
out := make(map[string]*archiveFile)
for _, f := range zr.File {
f := f
if f.FileInfo().IsDir() {
@ -48,18 +159,15 @@ func (fs *Zip) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile,
return out, nil
}
var _ loader = &SevenZip{}
var _ ArchiveLoader = SevenZipLoader
type SevenZip struct {
}
func (fs *SevenZip) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
func SevenZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
r, err := sevenzip.NewReader(reader, size)
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
out := make(map[string]*archiveFile)
for _, f := range r.File {
f := f
if f.FileInfo().IsDir() {
@ -84,18 +192,15 @@ func (fs *SevenZip) getFiles(reader iio.Reader, size int64) (map[string]*Archive
return out, nil
}
var _ loader = &Rar{}
var _ ArchiveLoader = RarLoader
type Rar struct {
}
func (fs *Rar) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
func RarLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
r, err := rardecode.NewReader(iio.NewSeekerWrapper(reader, size))
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
out := make(map[string]*archiveFile)
for {
header, err := r.Next()
if err == io.EOF {
@ -118,129 +223,3 @@ func (fs *Rar) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile,
return out, nil
}
type loader interface {
getFiles(r iio.Reader, size int64) (map[string]*ArchiveFile, error)
}
var _ Filesystem = &archive{}
type archive struct {
r iio.Reader
s *storage
size int64
once sync.Once
l loader
}
func NewArchive(r iio.Reader, size int64, l loader) *archive {
return &archive{
r: r,
s: newStorage(nil),
size: size,
l: l,
}
}
func (fs *archive) loadOnce() error {
var errOut error
fs.once.Do(func() {
files, err := fs.l.getFiles(fs.r, fs.size)
if err != nil {
errOut = err
return
}
for name, file := range files {
if err := fs.s.Add(file, name); err != nil {
errOut = err
return
}
}
})
return errOut
}
func (fs *archive) Open(filename string) (File, error) {
if filename == string(os.PathSeparator) {
return &Dir{}, nil
}
if err := fs.loadOnce(); err != nil {
return nil, err
}
return fs.s.Get(filename)
}
func (fs *archive) ReadDir(path string) (map[string]File, error) {
if err := fs.loadOnce(); err != nil {
return nil, err
}
return fs.s.Children(path)
}
var _ File = &ArchiveFile{}
func NewArchiveFile(readerFunc func() (iio.Reader, error), len int64) *ArchiveFile {
return &ArchiveFile{
readerFunc: readerFunc,
len: len,
}
}
type ArchiveFile struct {
readerFunc func() (iio.Reader, error)
reader iio.Reader
len int64
}
func (d *ArchiveFile) load() error {
if d.reader != nil {
return nil
}
r, err := d.readerFunc()
if err != nil {
return err
}
d.reader = r
return nil
}
func (d *ArchiveFile) Size() int64 {
return d.len
}
func (d *ArchiveFile) IsDir() bool {
return false
}
func (d *ArchiveFile) Close() (err error) {
if d.reader != nil {
err = d.reader.Close()
d.reader = nil
}
return
}
func (d *ArchiveFile) Read(p []byte) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.Read(p)
}
func (d *ArchiveFile) ReadAt(p []byte, off int64) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.ReadAt(p, off)
}

View file

@ -1,4 +1,4 @@
package fs
package vfs
import (
"archive/zip"
@ -18,7 +18,7 @@ func TestZipFilesystem(t *testing.T) {
zReader, len := createTestZip(require)
zfs := NewArchive(zReader, len, &Zip{})
zfs := NewArchive(zReader, len, ZipLoader)
files, err := zfs.ReadDir("/path/to/test/file")
require.NoError(err)

View file

@ -1,4 +1,4 @@
package fs
package vfs
var _ File = &Dir{}

View file

@ -1,4 +1,4 @@
package fs
package vfs
import (
"os"

View file

@ -1,4 +1,4 @@
package fs
package vfs
import (
"io/fs"

View file

@ -1,27 +1,27 @@
package fs
package vfs
import (
"bytes"
)
var _ Filesystem = &Memory{}
var _ Filesystem = &MemoryFs{}
type Memory struct {
Storage *storage
type MemoryFs struct {
files map[string]*MemoryFile
}
func NewMemory() *Memory {
return &Memory{
Storage: newStorage(nil),
func NewMemoryFS(files map[string]*MemoryFile) *MemoryFs {
return &MemoryFs{
files: files,
}
}
func (fs *Memory) Open(filename string) (File, error) {
return fs.Storage.Get(filename)
func (m *MemoryFs) Open(filename string) (File, error) {
return getFile(m.files, filename)
}
func (fs *Memory) ReadDir(path string) (map[string]File, error) {
return fs.Storage.Children(path)
func (fs *MemoryFs) ReadDir(path string) (map[string]File, error) {
return listFilesInDir(fs.files, path)
}
var _ File = &MemoryFile{}

View file

@ -0,0 +1,46 @@
package vfs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMemory(t *testing.T) {
t.Parallel()
require := require.New(t)
testData := "Hello"
c := NewMemoryFS(map[string]*MemoryFile{
"/dir/here": NewMemoryFile([]byte(testData)),
})
// fss := map[string]Filesystem{
// "/test": mem,
// }
// c, err := NewContainerFs(fss)
// require.NoError(err)
f, err := c.Open("/dir/here")
require.NoError(err)
require.NotNil(f)
require.Equal(int64(5), f.Size())
require.NoError(f.Close())
data := make([]byte, 5)
n, err := f.Read(data)
require.NoError(err)
require.Equal(n, 5)
require.Equal(string(data), testData)
files, err := c.ReadDir("/")
require.NoError(err)
require.Len(files, 1)
files, err = c.ReadDir("/dir")
require.NoError(err)
require.Len(files, 1)
}

167
src/host/vfs/os.go Normal file
View file

@ -0,0 +1,167 @@
package vfs
import (
"io/fs"
"os"
"path"
"sync"
)
type OsFS struct {
hostDir string
}
// Open implements Filesystem.
func (fs *OsFS) Open(filename string) (File, error) {
if path.Clean(filename) == Separator {
return &Dir{}, nil
}
osfile, err := os.Open(path.Join(fs.hostDir, filename))
if err != nil {
return nil, err
}
return NewOsFile(osfile), nil
}
// ReadDir implements Filesystem.
func (o *OsFS) ReadDir(dir string) (map[string]File, error) {
dir = path.Join(o.hostDir, dir)
entries, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
out := map[string]File{}
for _, e := range entries {
if e.IsDir() {
out[e.Name()] = &Dir{}
} else {
out[e.Name()] = NewLazyOsFile(path.Join(dir, e.Name()))
}
}
return out, nil
}
func NewOsFs(osDir string) *OsFS {
return &OsFS{
hostDir: osDir,
}
}
var _ Filesystem = &OsFS{}
type OsFile struct {
f *os.File
}
func NewOsFile(f *os.File) *OsFile {
return &OsFile{f: f}
}
var _ File = &OsFile{}
// Close implements File.
func (f *OsFile) Close() error {
return f.f.Close()
}
// Read implements File.
func (f *OsFile) Read(p []byte) (n int, err error) {
return f.f.Read(p)
}
// ReadAt implements File.
func (f *OsFile) ReadAt(p []byte, off int64) (n int, err error) {
return f.f.ReadAt(p, off)
}
func (f *OsFile) Stat() (fs.FileInfo, error) {
return f.f.Stat()
}
// Size implements File.
func (f *OsFile) Size() int64 {
stat, err := f.Stat()
if err != nil {
return 0
}
return stat.Size()
}
// IsDir implements File.
func (f *OsFile) IsDir() bool {
stat, err := f.Stat()
if err != nil {
return false
}
return stat.IsDir()
}
type LazyOsFile struct {
m sync.Mutex
path string
file *os.File
}
func NewLazyOsFile(path string) *LazyOsFile {
return &LazyOsFile{path: path}
}
var _ File = &OsFile{}
func (f *LazyOsFile) open() error {
f.m.Lock()
defer f.m.Unlock()
if f.file != nil {
return nil
}
osFile, err := os.Open(f.path)
if err != nil {
return err
}
f.file = osFile
return nil
}
// Close implements File.
func (f *LazyOsFile) Close() error {
return f.file.Close()
}
// Read implements File.
func (f *LazyOsFile) Read(p []byte) (n int, err error) {
return f.file.Read(p)
}
// ReadAt implements File.
func (f *LazyOsFile) ReadAt(p []byte, off int64) (n int, err error) {
return f.file.ReadAt(p, off)
}
func (f *LazyOsFile) Stat() (fs.FileInfo, error) {
if f.file == nil {
return os.Stat(f.path)
} else {
return f.file.Stat()
}
}
// Size implements File.
func (f *LazyOsFile) Size() int64 {
stat, err := f.Stat()
if err != nil {
return 0
}
return stat.Size()
}
// IsDir implements File.
func (f *LazyOsFile) IsDir() bool {
stat, err := f.Stat()
if err != nil {
return false
}
return stat.IsDir()
}

146
src/host/vfs/resolver.go Normal file
View file

@ -0,0 +1,146 @@
package vfs
import (
"fmt"
"strings"
"sync"
)
type ResolveFS struct {
osDir string
osFS *OsFS
resolver *resolver
}
func NewResolveFS(osDir string, factories map[string]FsFactory) *ResolveFS {
return &ResolveFS{
osDir: osDir,
osFS: NewOsFs(osDir),
resolver: newResolver(factories),
}
}
// Open implements Filesystem.
func (r *ResolveFS) Open(filename string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(filename, r.osFS.Open)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.Open(nestedFsPath)
}
return r.osFS.Open(fsPath)
}
// ReadDir implements Filesystem.
func (r *ResolveFS) ReadDir(dir string) (map[string]File, error) {
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(dir, r.osFS.Open)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.ReadDir(nestedFsPath)
}
return r.osFS.ReadDir(fsPath)
}
var _ Filesystem = &ResolveFS{}
type FsFactory func(f File) (Filesystem, error)
const Separator = "/"
func newResolver(factories map[string]FsFactory) *resolver {
return &resolver{
factories: factories,
fsmap: map[string]Filesystem{},
}
}
type resolver struct {
m sync.Mutex
factories map[string]FsFactory
fsmap map[string]Filesystem // filesystem cache
// TODO: add fsmap clean
}
type openFile func(path string) (File, error)
// open requeue raw open, without resolver call
func (r *resolver) resolvePath(name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) {
name = strings.TrimPrefix(name, Separator)
parts := strings.Split(name, Separator)
nestOn := -1
var nestFactory FsFactory
PARTS_LOOP:
for i, part := range parts {
for ext, factory := range r.factories {
if strings.HasSuffix(part, ext) {
nestOn = i + 1
nestFactory = factory
break PARTS_LOOP
}
}
}
if nestOn == -1 {
return name, nil, "", nil
}
fsPath = Clean(strings.Join(parts[:nestOn], Separator))
nestedFsPath = Clean(strings.Join(parts[nestOn:], Separator))
// we dont need lock until now
// it must be before fsmap read to exclude race condition:
// read -> write
// read -> write
r.m.Lock()
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fsPath]; ok {
return fsPath, nestedFs, nestedFsPath, nil
} else {
fsFile, err := rawOpen(fsPath)
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
nestedFs, err := nestFactory(fsFile)
if err != nil {
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}
r.fsmap[fsPath] = nestedFs
return fsPath, nestedFs, nestedFsPath, nil
}
}
// func (r *resolver) resolveFile(name string, fs Filesystem) (File, error) {
// fsPath, nestedFs, nestedFsPath, err := r.resolvePath(name, fs)
// if err != nil {
// return nil, err
// }
// if nestedFs == nil {
// return fs.Open(fsPath)
// }
// return nestedFs.Open(nestedFsPath)
// }
// func (r *resolver) resolveDir(name string, fs Filesystem) (map[string]File, error) {
// fsPath, nestedFs, nestedFsPath, err := r.resolvePath(name, fs)
// if err != nil {
// return nil, err
// }
// if nestedFs == nil {
// return fs.ReadDir(fsPath)
// }
// return nestedFs.ReadDir(nestedFsPath)
// }

View file

@ -0,0 +1,192 @@
package vfs
import (
"os"
)
type Dummy struct {
}
func (d *Dummy) Size() int64 {
return 0
}
func (d *Dummy) IsDir() bool {
return false
}
func (d *Dummy) Close() error {
return nil
}
func (d *Dummy) Read(p []byte) (n int, err error) {
return 0, nil
}
func (d *Dummy) ReadAt(p []byte, off int64) (n int, err error) {
return 0, nil
}
var _ File = &Dummy{}
type DummyFs struct {
}
func (d *DummyFs) Open(filename string) (File, error) {
return &Dummy{}, nil
}
func (d *DummyFs) ReadDir(path string) (map[string]File, error) {
if path == "/dir/here" {
return map[string]File{
"file1.txt": &Dummy{},
"file2.txt": &Dummy{},
}, nil
}
return nil, os.ErrNotExist
}
var _ Filesystem = &DummyFs{}
// func TestDefaultFactories(t *testing.T) {
// t.Parallel()
// require := require.New(t)
// require.Contains(defaultFactories, ".zip")
// require.Contains(defaultFactories, ".rar")
// require.Contains(defaultFactories, ".7z")
// fs, err := defaultFactories[".zip"](&Dummy{}, nil)
// require.NoError(err)
// require.NotNil(fs)
// fs, err = defaultFactories[".rar"](&Dummy{}, nil)
// require.NoError(err)
// require.NotNil(fs)
// fs, err = defaultFactories[".7z"](&Dummy{}, nil)
// require.NoError(err)
// require.NotNil(fs)
// }
// func TestStorageAddFs(t *testing.T) {
// t.Parallel()
// require := require.New(t)
// s := newStorage(dummyFactories)
// err := s.AddFS(&DummyFs{}, "/test")
// require.NoError(err)
// f, err := s.Get("/test/dir/here/file1.txt")
// require.NoError(err)
// require.NotNil(f)
// err = s.AddFS(&DummyFs{}, "/test")
// require.Error(err)
// }
// func TestStorageWindowsPath(t *testing.T) {
// t.Parallel()
// require := require.New(t)
// s := newStorage(dummyFactories)
// err := s.Add(&Dummy{}, "\\path\\to\\dummy\\file.txt")
// require.NoError(err)
// file, err := s.Get("\\path\\to\\dummy\\file.txt")
// require.NoError(err)
// require.Equal(&Dummy{}, file)
// file, err = s.Get("/path/to/dummy/file.txt")
// require.NoError(err)
// require.Equal(&Dummy{}, file)
// }
// var dummyFactories = map[string]vfs.FsFactory{
// ".test": func(f vfs.File, factories map[string]vfs.FsFactory) (vfs.Filesystem, error) {
// return &DummyFs{}, nil
// },
// }
// func TestStorage(t *testing.T) {
// t.Parallel()
// require := require.New(t)
// s := newStorage(dummyFactories)
// err := s.Add(&Dummy{}, "/path/to/dummy/file.txt")
// require.NoError(err)
// err = s.Add(&Dummy{}, "/path/to/dummy/file2.txt")
// require.NoError(err)
// contains := s.Has("/path")
// require.True(contains)
// contains = s.Has("/path/to/dummy/")
// require.True(contains)
// file, err := s.Get("/path/to/dummy/file.txt")
// require.NoError(err)
// require.Equal(&Dummy{}, file)
// file, err = s.Get("/path/to/dummy/file3.txt")
// require.Error(err)
// require.Nil(file)
// files, err := s.Children("/path/to/dummy/")
// require.NoError(err)
// require.Len(files, 2)
// require.Contains(files, "file.txt")
// require.Contains(files, "file2.txt")
// err = s.Add(&Dummy{}, "/path/to/dummy/folder/file.txt")
// require.NoError(err)
// files, err = s.Children("/path/to/dummy/")
// require.NoError(err)
// require.Len(files, 3)
// require.Contains(files, "file.txt")
// require.Contains(files, "file2.txt")
// require.Contains(files, "folder")
// err = s.Add(&Dummy{}, "path/file4.txt")
// require.NoError(err)
// require.True(s.Has("/path/file4.txt"))
// files, err = s.Children("/")
// require.NoError(err)
// require.Len(files, 1)
// err = s.Add(&Dummy{}, "/path/special_file.test")
// require.NoError(err)
// file, err = s.Get("/path/special_file.test/dir/here/file1.txt")
// require.NoError(err)
// require.Equal(&Dummy{}, file)
// files, err = s.Children("/path/special_file.test")
// require.NoError(err)
// require.NotNil(files)
// files, err = s.Children("/path/special_file.test/dir/here")
// require.NoError(err)
// require.Len(files, 2)
// err = s.Add(&Dummy{}, "/path/to/__special__path/file3.txt")
// require.NoError(err)
// file, err = s.Get("/path/to/__special__path/file3.txt")
// require.NoError(err)
// require.Equal(&Dummy{}, file)
// s.Clear()
// }

View file

@ -1,4 +1,4 @@
package fs
package vfs
import (
"context"
@ -11,71 +11,66 @@ import (
"github.com/anacrolix/torrent"
)
var _ Filesystem = &Torrent{}
var _ Filesystem = &TorrentFs{}
type Torrent struct {
type TorrentFs struct {
mu sync.RWMutex
ts map[string]*torrent.Torrent
s *storage
loaded bool
t *torrent.Torrent
readTimeout int
resolver *resolver
}
func NewTorrent(readTimeout int) *Torrent {
return &Torrent{
s: newStorage(SupportedFactories),
ts: make(map[string]*torrent.Torrent),
func NewTorrentFs(t *torrent.Torrent, readTimeout int) *TorrentFs {
return &TorrentFs{
t: t,
readTimeout: readTimeout,
resolver: newResolver(ArchiveFactories),
}
}
func (fs *Torrent) AddTorrent(t *torrent.Torrent) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.loaded = false
fs.ts[t.InfoHash().HexString()] = t
}
func (fs *Torrent) RemoveTorrent(h string) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.s.Clear()
fs.loaded = false
delete(fs.ts, h)
}
func (fs *Torrent) load() {
if fs.loaded {
return
}
fs.mu.RLock()
defer fs.mu.RUnlock()
for _, t := range fs.ts {
<-t.GotInfo()
for _, file := range t.Files() {
fs.s.Add(&torrentFile{
readerFunc: file.NewReader,
len: file.Length(),
timeout: fs.readTimeout,
}, file.Path())
func (fs *TorrentFs) files() map[string]*torrentFile {
files := make(map[string]*torrentFile)
<-fs.t.GotInfo()
for _, file := range fs.t.Files() {
p := Clean(file.Path())
files[p] = &torrentFile{
readerFunc: file.NewReader,
len: file.Length(),
timeout: fs.readTimeout,
}
}
fs.loaded = true
return files
}
func (fs *Torrent) Open(filename string) (File, error) {
fs.load()
return fs.s.Get(filename)
func (fs *TorrentFs) rawOpen(path string) (File, error) {
file, err := getFile(fs.files(), path)
return file, err
}
func (fs *Torrent) ReadDir(path string) (map[string]File, error) {
fs.load()
return fs.s.Children(path)
func (fs *TorrentFs) Open(filename string) (File, error) {
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(filename, fs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.Open(nestedFsPath)
}
return fs.rawOpen(fsPath)
}
func (fs *TorrentFs) ReadDir(name string) (map[string]File, error) {
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(name, fs.rawOpen)
if err != nil {
return nil, err
}
if nestedFs != nil {
return nestedFs.ReadDir(nestedFsPath)
}
return listFilesInDir(fs.files(), fsPath)
}
type reader interface {
@ -93,7 +88,9 @@ type readAtWrapper struct {
}
func newReadAtWrapper(r torrent.Reader, timeout int) reader {
return &readAtWrapper{Reader: r, timeout: timeout}
w := &readAtWrapper{Reader: r, timeout: timeout}
w.SetResponsive()
return w
}
func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) {

View file

@ -1,4 +1,4 @@
package fs
package vfs
import (
"os"
@ -34,55 +34,55 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}
func TestTorrentFilesystem(t *testing.T) {
require := require.New(t)
// func TestTorrentFilesystem(t *testing.T) {
// require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
// to, err := Cli.AddMagnet(testMagnet)
// require.NoError(err)
tfs := NewTorrent(600)
tfs.AddTorrent(to)
// tfs := NewTorrentFs(600)
// tfs.AddTorrent(to)
files, err := tfs.ReadDir("/")
require.NoError(err)
require.Len(files, 1)
require.Contains(files, "The WIRED CD - Rip. Sample. Mash. Share")
// files, err := tfs.ReadDir("/")
// require.NoError(err)
// require.Len(files, 1)
// require.Contains(files, "The WIRED CD - Rip. Sample. Mash. Share")
files, err = tfs.ReadDir("/The WIRED CD - Rip. Sample. Mash. Share")
require.NoError(err)
require.Len(files, 18)
// files, err = tfs.ReadDir("/The WIRED CD - Rip. Sample. Mash. Share")
// require.NoError(err)
// require.Len(files, 18)
f, err := tfs.Open("/The WIRED CD - Rip. Sample. Mash. Share/not_existing_file.txt")
require.Equal(os.ErrNotExist, err)
require.Nil(f)
// f, err := tfs.Open("/The WIRED CD - Rip. Sample. Mash. Share/not_existing_file.txt")
// require.Equal(os.ErrNotExist, err)
// require.Nil(f)
f, err = tfs.Open("/The WIRED CD - Rip. Sample. Mash. Share/01 - Beastie Boys - Now Get Busy.mp3")
require.NoError(err)
require.NotNil(f)
require.Equal(f.Size(), int64(1964275))
// f, err = tfs.Open("/The WIRED CD - Rip. Sample. Mash. Share/01 - Beastie Boys - Now Get Busy.mp3")
// require.NoError(err)
// require.NotNil(f)
// require.Equal(f.Size(), int64(1964275))
b := make([]byte, 10)
// b := make([]byte, 10)
n, err := f.Read(b)
require.NoError(err)
require.Equal(10, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0, 0x0, 0x0, 0x0, 0x1f, 0x76}, b)
// n, err := f.Read(b)
// require.NoError(err)
// require.Equal(10, n)
// require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0, 0x0, 0x0, 0x0, 0x1f, 0x76}, b)
n, err = f.ReadAt(b, 10)
require.NoError(err)
require.Equal(10, n)
// n, err = f.ReadAt(b, 10)
// require.NoError(err)
// require.Equal(10, n)
n, err = f.ReadAt(b, 10000)
require.NoError(err)
require.Equal(10, n)
// n, err = f.ReadAt(b, 10000)
// require.NoError(err)
// require.Equal(10, n)
tfs.RemoveTorrent(to.InfoHash().String())
files, err = tfs.ReadDir("/")
require.NoError(err)
require.Len(files, 0)
// tfs.RemoveTorrent(to.InfoHash().String())
// files, err = tfs.ReadDir("/")
// require.NoError(err)
// require.Len(files, 0)
require.NoError(f.Close())
}
// require.NoError(f.Close())
// }
func TestReadAtTorrent(t *testing.T) {
require := require.New(t)

55
src/host/vfs/utils.go Normal file
View file

@ -0,0 +1,55 @@
package vfs
import (
"io/fs"
"path"
"strings"
)
var ErrNotExist = fs.ErrNotExist
func getFile[F File](m map[string]F, name string) (File, error) {
name = Clean(name)
if name == Separator {
return &Dir{}, nil
}
f, ok := m[name]
if ok {
return f, nil
}
for p := range m {
if strings.HasPrefix(p, name) {
return &Dir{}, nil
}
}
return nil, ErrNotExist
}
func listFilesInDir[F File](m map[string]F, name string) (map[string]File, error) {
name = Clean(name)
out := map[string]File{}
for p, f := range m {
if strings.HasPrefix(p, name) {
parts := strings.Split(trimRelPath(p, name), Separator)
if len(parts) == 1 {
out[parts[0]] = f
} else {
out[parts[0]] = &Dir{}
}
}
}
return out, nil
}
func trimRelPath(p, t string) string {
return strings.Trim(strings.TrimPrefix(p, t), "/")
}
func Clean(p string) string {
return path.Clean(Separator + strings.ReplaceAll(p, "\\", "/"))
}

View file

@ -6,9 +6,8 @@ import (
"math"
"net/http"
"os"
"sort"
"git.kmsign.ru/royalcat/tstor/src/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-gonic/gin"
)
@ -30,56 +29,56 @@ var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerF
}
}
var apiServersHandler = func(ss []*torrent.Server) gin.HandlerFunc {
return func(ctx *gin.Context) {
var infos []*torrent.ServerInfo
for _, s := range ss {
infos = append(infos, s.Info())
}
ctx.JSON(http.StatusOK, infos)
}
}
// var apiServersHandler = func(ss []*torrent.Server) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// var infos []*torrent.ServerInfo
// for _, s := range ss {
// infos = append(infos, s.Info())
// }
// ctx.JSON(http.StatusOK, infos)
// }
// }
var apiRoutesHandler = func(ss *torrent.Stats) gin.HandlerFunc {
return func(ctx *gin.Context) {
s := ss.RoutesStats()
sort.Sort(torrent.ByName(s))
ctx.JSON(http.StatusOK, s)
}
}
// var apiRoutesHandler = func(ss *torrent.Stats) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// s := ss.RoutesStats()
// sort.Sort(torrent.ByName(s))
// ctx.JSON(http.StatusOK, s)
// }
// }
var apiAddTorrentHandler = func(s *torrent.Service) gin.HandlerFunc {
return func(ctx *gin.Context) {
route := ctx.Param("route")
// var apiAddTorrentHandler = func(s *torrent.Service) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// route := ctx.Param("route")
var json RouteAdd
if err := ctx.ShouldBindJSON(&json); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// var json RouteAdd
// if err := ctx.ShouldBindJSON(&json); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
if err := s.AddMagnet(route, json.Magnet); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// if err := s.AddMagnet(route, json.Magnet); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
ctx.JSON(http.StatusOK, nil)
}
}
// ctx.JSON(http.StatusOK, nil)
// }
// }
var apiDelTorrentHandler = func(s *torrent.Service) gin.HandlerFunc {
return func(ctx *gin.Context) {
route := ctx.Param("route")
hash := ctx.Param("torrent_hash")
// var apiDelTorrentHandler = func(s *torrent.Service) gin.HandlerFunc {
// return func(ctx *gin.Context) {
// route := ctx.Param("route")
// hash := ctx.Param("torrent_hash")
if err := s.RemoveFromHash(route, hash); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// if err := s.RemoveFromHash(route, hash); err != nil {
// ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }
ctx.JSON(http.StatusOK, nil)
}
}
// ctx.JSON(http.StatusOK, nil)
// }
// }
var apiLogHandler = func(path string) gin.HandlerFunc {
return func(ctx *gin.Context) {

View file

@ -6,14 +6,14 @@ import (
"git.kmsign.ru/royalcat/tstor"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/torrent"
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/shurcooL/httpfs/html/vfstemplate"
)
func New(fc *filecache.Cache, ss *torrent.Stats, s *torrent.Service, ch *config.Config, tss []*torrent.Server, fs http.FileSystem, logPath string, cfg *config.Config) error {
func New(fc *filecache.Cache, ss *torrent.Stats, s *torrent.Service, logPath string, cfg *config.Config) error {
gin.SetMode(gin.ReleaseMode)
r := gin.New()
r.Use(gin.Recovery())
@ -24,14 +24,6 @@ func New(fc *filecache.Cache, ss *torrent.Stats, s *torrent.Service, ch *config.
c.FileFromFS(c.Request.URL.Path, http.FS(tstor.Assets))
})
if cfg.Mounts.HttpFs.Enabled {
log.Info().Str("host", fmt.Sprintf("0.0.0.0:%d/fs", cfg.Mounts.HttpFs.Port)).Msg("starting HTTPFS")
r.GET("/fs/*filepath", func(c *gin.Context) {
path := c.Param("filepath")
c.FileFromFS(path, fs)
})
}
t, err := vfstemplate.ParseGlob(http.FS(tstor.Templates), nil, "/templates/*")
if err != nil {
return fmt.Errorf("error parsing html: %w", err)
@ -40,7 +32,7 @@ func New(fc *filecache.Cache, ss *torrent.Stats, s *torrent.Service, ch *config.
r.SetHTMLTemplate(t)
r.GET("/", indexHandler)
r.GET("/routes", routesHandler(ss))
// r.GET("/routes", routesHandler(ss))
r.GET("/logs", logsHandler)
r.GET("/servers", serversFoldersHandler())
@ -48,11 +40,11 @@ func New(fc *filecache.Cache, ss *torrent.Stats, s *torrent.Service, ch *config.
{
api.GET("/log", apiLogHandler(logPath))
api.GET("/status", apiStatusHandler(fc, ss))
api.GET("/servers", apiServersHandler(tss))
// api.GET("/servers", apiServersHandler(tss))
api.GET("/routes", apiRoutesHandler(ss))
api.POST("/routes/:route/torrent", apiAddTorrentHandler(s))
api.DELETE("/routes/:route/torrent/:torrent_hash", apiDelTorrentHandler(s))
// api.GET("/routes", apiRoutesHandler(ss))
// api.POST("/routes/:route/torrent", apiAddTorrentHandler(s))
// api.DELETE("/routes/:route/torrent/:torrent_hash", apiDelTorrentHandler(s))
}

View file

@ -3,7 +3,6 @@ package http
import (
"net/http"
"git.kmsign.ru/royalcat/tstor/src/torrent"
"github.com/gin-gonic/gin"
)
@ -11,11 +10,11 @@ var indexHandler = func(c *gin.Context) {
c.HTML(http.StatusOK, "index.html", nil)
}
var routesHandler = func(ss *torrent.Stats) gin.HandlerFunc {
return func(c *gin.Context) {
c.HTML(http.StatusOK, "routes.html", ss.RoutesStats())
}
}
// var routesHandler = func(ss *torrent.Stats) gin.HandlerFunc {
// return func(c *gin.Context) {
// c.HTML(http.StatusOK, "routes.html", ss.RoutesStats())
// }
// }
var logsHandler = func(c *gin.Context) {
c.HTML(http.StatusOK, "logs.html", nil)

View file

@ -4,7 +4,7 @@ import (
"io"
"testing"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/stretchr/testify/require"
)
@ -16,7 +16,7 @@ func TestSeekerWrapper(t *testing.T) {
require := require.New(t)
mf := fs.NewMemoryFile(testData)
mf := vfs.NewMemoryFile(testData)
r := iio.NewSeekerWrapper(mf, mf.Size())
defer r.Close()

View file

@ -5,7 +5,7 @@ import (
"path/filepath"
"runtime"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/billziss-gh/cgofuse/fuse"
"github.com/rs/zerolog/log"
@ -25,7 +25,7 @@ func NewHandler(fuseAllowOther bool, path string) *Handler {
}
}
func (s *Handler) Mount(fss map[string]fs.Filesystem) error {
func (s *Handler) Mount(vfs vfs.Filesystem) error {
folder := s.path
// On windows, the folder must don't exist
if runtime.GOOS == "windows" {
@ -38,12 +38,7 @@ func (s *Handler) Mount(fss map[string]fs.Filesystem) error {
}
}
cfs, err := fs.NewContainerFs(fss)
if err != nil {
return err
}
host := fuse.NewFileSystemHost(NewFS(cfs))
host := fuse.NewFileSystemHost(NewFS(vfs))
// TODO improve error handling here
go func() {

View file

@ -7,7 +7,7 @@ import (
"os"
"sync"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/billziss-gh/cgofuse/fuse"
"github.com/rs/zerolog"
@ -21,7 +21,7 @@ type FS struct {
log zerolog.Logger
}
func NewFS(fs fs.Filesystem) fuse.FileSystemInterface {
func NewFS(fs vfs.Filesystem) fuse.FileSystemInterface {
l := log.Logger.With().Str("component", "fuse").Logger()
return &FS{
fh: &fileHandler{fs: fs},
@ -154,11 +154,11 @@ var ErrBadHolderIndex = errors.New("holder index too big")
type fileHandler struct {
mu sync.RWMutex
opened []fs.File
fs fs.Filesystem
opened []vfs.File
fs vfs.Filesystem
}
func (fh *fileHandler) GetFile(path string, fhi uint64) (fs.File, error) {
func (fh *fileHandler) GetFile(path string, fhi uint64) (vfs.File, error) {
fh.mu.RLock()
defer fh.mu.RUnlock()
@ -204,7 +204,7 @@ func (fh *fileHandler) OpenHolder(path string) (uint64, error) {
return uint64(len(fh.opened) - 1), nil
}
func (fh *fileHandler) get(fhi uint64) (fs.File, error) {
func (fh *fileHandler) get(fhi uint64) (vfs.File, error) {
if int(fhi) >= len(fh.opened) {
return nil, ErrBadHolderIndex
}
@ -240,7 +240,7 @@ func (fh *fileHandler) Remove(fhi uint64) error {
return nil
}
func (fh *fileHandler) lookupFile(path string) (fs.File, error) {
func (fh *fileHandler) lookupFile(path string) (vfs.File, error) {
file, err := fh.fs.Open(path)
if err != nil {
return nil, err

View file

@ -7,7 +7,7 @@ import (
"testing"
"time"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/stretchr/testify/require"
)
@ -22,12 +22,11 @@ func TestHandler(t *testing.T) {
h := NewHandler(false, p)
mem := fs.NewMemory()
mem := vfs.NewMemoryFS(map[string]*vfs.MemoryFile{
"/test.txt": vfs.NewMemoryFile([]byte("test")),
})
err := mem.Storage.Add(fs.NewMemoryFile([]byte("test")), "/test.txt")
require.NoError(err)
err = h.Mount(map[string]fs.Filesystem{"/mem": mem})
err := h.Mount(mem)
require.NoError(err)
time.Sleep(5 * time.Second)
@ -50,12 +49,11 @@ func TestHandlerDriveLetter(t *testing.T) {
h := NewHandler(false, p)
mem := fs.NewMemory()
mem := vfs.NewMemoryFS(map[string]*vfs.MemoryFile{
"/test.txt": vfs.NewMemoryFile([]byte("test")),
})
err := mem.Storage.Add(fs.NewMemoryFile([]byte("test")), "/test.txt")
require.NoError(err)
err = h.Mount(map[string]fs.Filesystem{"/mem": mem})
err := h.Mount(mem)
require.NoError(err)
time.Sleep(5 * time.Second)

View file

@ -7,17 +7,17 @@ import (
"os"
"sync"
dfs "git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/iio"
)
var _ http.FileSystem = &HTTPFS{}
type HTTPFS struct {
fs dfs.Filesystem
fs vfs.Filesystem
}
func NewHTTPFS(fs dfs.Filesystem) *HTTPFS {
func NewHTTPFS(fs vfs.Filesystem) *HTTPFS {
return &HTTPFS{fs: fs}
}
@ -27,7 +27,7 @@ func (fs *HTTPFS) Open(name string) (http.File, error) {
return nil, err
}
fi := dfs.NewFileInfo(name, f.Size(), f.IsDir())
fi := vfs.NewFileInfo(name, f.Size(), f.IsDir())
// TODO make this lazy
fis, err := fs.filesToFileInfo(name)
@ -46,7 +46,7 @@ func (fs *HTTPFS) filesToFileInfo(path string) ([]fs.FileInfo, error) {
var out []os.FileInfo
for n, f := range files {
out = append(out, dfs.NewFileInfo(n, f.Size(), f.IsDir()))
out = append(out, vfs.NewFileInfo(n, f.Size(), f.IsDir()))
}
return out, nil
@ -65,7 +65,7 @@ type httpFile struct {
fi fs.FileInfo
}
func newHTTPFile(f dfs.File, fis []fs.FileInfo, fi fs.FileInfo) *httpFile {
func newHTTPFile(f vfs.File, fis []fs.FileInfo, fi fs.FileInfo) *httpFile {
return &httpFile{
dirContent: fis,
fi: fi,

View file

@ -8,7 +8,7 @@ import (
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/iio"
"golang.org/x/net/webdav"
)
@ -16,10 +16,10 @@ import (
var _ webdav.FileSystem = &WebDAV{}
type WebDAV struct {
fs fs.Filesystem
fs vfs.Filesystem
}
func newFS(fs fs.Filesystem) *WebDAV {
func newFS(fs vfs.Filesystem) *WebDAV {
return &WebDAV{fs: fs}
}
@ -59,7 +59,7 @@ func (wd *WebDAV) Rename(ctx context.Context, oldName, newName string) error {
return webdav.ErrNotImplemented
}
func (wd *WebDAV) lookupFile(path string) (fs.File, error) {
func (wd *WebDAV) lookupFile(path string) (vfs.File, error) {
return wd.fs.Open(path)
}
@ -93,7 +93,7 @@ type webDAVFile struct {
dirContent []os.FileInfo
}
func newFile(name string, f fs.File, df func() ([]os.FileInfo, error)) *webDAVFile {
func newFile(name string, f vfs.File, df func() ([]os.FileInfo, error)) *webDAVFile {
return &webDAVFile{
fi: newFileInfo(name, f.Size(), f.IsDir()),
dirFunc: df,

View file

@ -6,7 +6,7 @@ import (
"os"
"testing"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/stretchr/testify/require"
"golang.org/x/net/webdav"
)
@ -16,10 +16,9 @@ func TestWebDAVFilesystem(t *testing.T) {
require := require.New(t)
mfs := fs.NewMemory()
mf := fs.NewMemoryFile([]byte("test file content."))
err := mfs.Storage.Add(mf, "/folder/file.txt")
require.NoError(err)
mfs := vfs.NewMemoryFS(map[string]*vfs.MemoryFile{
"/folder/file.txt": vfs.NewMemoryFile([]byte("test file content.")),
})
wfs := newFS(mfs)
@ -67,10 +66,9 @@ func TestErrNotImplemented(t *testing.T) {
require := require.New(t)
mfs := fs.NewMemory()
mf := fs.NewMemoryFile([]byte("test file content."))
err := mfs.Storage.Add(mf, "/folder/file.txt")
require.NoError(err)
mfs := vfs.NewMemoryFS(map[string]*vfs.MemoryFile{
"/folder/file.txt": vfs.NewMemoryFile([]byte("test file content.")),
})
wfs := newFS(mfs)

View file

@ -3,12 +3,12 @@ package webdav
import (
"net/http"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/rs/zerolog/log"
"golang.org/x/net/webdav"
)
func newHandler(fs fs.Filesystem) *webdav.Handler {
func newHandler(fs vfs.Filesystem) *webdav.Handler {
l := log.Logger.With().Str("component", "webDAV").Logger()
return &webdav.Handler{
Prefix: "/",

View file

@ -4,11 +4,11 @@ import (
"fmt"
"net/http"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/rs/zerolog/log"
)
func NewWebDAVServer(fs fs.Filesystem, port int, user, pass string) error {
func NewWebDAVServer(fs vfs.Filesystem, port int, user, pass string) error {
log.Info().Str("host", fmt.Sprintf("0.0.0.0:%d", port)).Msg("starting webDAV server")
srv := newHandler(fs)

View file

@ -1,45 +0,0 @@
package loader
import "git.kmsign.ru/royalcat/tstor/src/config"
var _ Loader = &Config{}
type Config struct {
c []config.Route
}
func NewConfig(r []config.Route) *Config {
return &Config{
c: r,
}
}
func (l *Config) ListMagnets() (map[string][]string, error) {
out := make(map[string][]string)
for _, r := range l.c {
for _, t := range r.Torrents {
if t.MagnetURI == "" {
continue
}
out[r.Name] = append(out[r.Name], t.MagnetURI)
}
}
return out, nil
}
func (l *Config) ListTorrentPaths() (map[string][]string, error) {
out := make(map[string][]string)
for _, r := range l.c {
for _, t := range r.Torrents {
if t.TorrentPath == "" {
continue
}
out[r.Name] = append(out[r.Name], t.TorrentPath)
}
}
return out, nil
}

View file

@ -1,112 +0,0 @@
package loader
import (
"path"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/metainfo"
"github.com/dgraph-io/badger/v3"
"github.com/rs/zerolog/log"
)
var _ LoaderAdder = &DB{}
const routeRootKey = "/route/"
type DB struct {
db *badger.DB
}
func NewDB(path string) (*DB, error) {
l := log.Logger.With().Str("component", "torrent-store").Logger()
opts := badger.DefaultOptions(path).
WithLogger(&dlog.Badger{L: l}).
WithValueLogFileSize(1<<26 - 1)
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
err = db.RunValueLogGC(0.5)
if err != nil && err != badger.ErrNoRewrite {
return nil, err
}
return &DB{
db: db,
}, nil
}
func (l *DB) AddMagnet(r, m string) error {
err := l.db.Update(func(txn *badger.Txn) error {
spec, err := metainfo.ParseMagnetUri(m)
if err != nil {
return err
}
ih := spec.InfoHash.HexString()
rp := path.Join(routeRootKey, ih, r)
return txn.Set([]byte(rp), []byte(m))
})
if err != nil {
return err
}
return l.db.Sync()
}
func (l *DB) RemoveFromHash(r, h string) (bool, error) {
tx := l.db.NewTransaction(true)
defer tx.Discard()
var mh metainfo.Hash
if err := mh.FromHexString(h); err != nil {
return false, err
}
rp := path.Join(routeRootKey, h, r)
if _, err := tx.Get([]byte(rp)); err != nil {
return false, nil
}
if err := tx.Delete([]byte(rp)); err != nil {
return false, err
}
return true, tx.Commit()
}
func (l *DB) ListMagnets() (map[string][]string, error) {
tx := l.db.NewTransaction(false)
defer tx.Discard()
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte(routeRootKey)
out := make(map[string][]string)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
_, r := path.Split(string(it.Item().Key()))
i := it.Item()
if err := i.Value(func(v []byte) error {
out[r] = append(out[r], string(v))
return nil
}); err != nil {
return nil, err
}
}
return out, nil
}
func (l *DB) ListTorrentPaths() (map[string][]string, error) {
return nil, nil
}
func (l *DB) Close() error {
return l.db.Close()
}

View file

@ -1,62 +0,0 @@
package loader
import (
"os"
"testing"
"github.com/anacrolix/torrent/storage"
"github.com/stretchr/testify/require"
)
const m1 = "magnet:?xt=urn:btih:c9e15763f722f23e98a29decdfae341b98d53056"
func TestDB(t *testing.T) {
require := require.New(t)
tmpService, err := os.MkdirTemp("", "service")
require.NoError(err)
tmpStorage, err := os.MkdirTemp("", "storage")
require.NoError(err)
cs := storage.NewFile(tmpStorage)
defer cs.Close()
s, err := NewDB(tmpService)
require.NoError(err)
defer s.Close()
err = s.AddMagnet("route1", "WRONG MAGNET")
require.Error(err)
err = s.AddMagnet("route1", m1)
require.NoError(err)
err = s.AddMagnet("route2", m1)
require.NoError(err)
l, err := s.ListMagnets()
require.NoError(err)
require.Len(l, 2)
require.Len(l["route1"], 1)
require.Equal(l["route1"][0], m1)
require.Len(l["route2"], 1)
require.Equal(l["route2"][0], m1)
removed, err := s.RemoveFromHash("other", "c9e15763f722f23e98a29decdfae341b98d53056")
require.NoError(err)
require.False(removed)
removed, err = s.RemoveFromHash("route1", "c9e15763f722f23e98a29decdfae341b98d53056")
require.NoError(err)
require.True(removed)
l, err = s.ListMagnets()
require.NoError(err)
require.Len(l, 1)
require.Len(l["route2"], 1)
require.Equal(l["route2"][0], m1)
require.NoError(s.Close())
require.NoError(cs.Close())
}

View file

@ -1,55 +0,0 @@
package loader
import (
"io/fs"
"path"
"path/filepath"
"git.kmsign.ru/royalcat/tstor/src/config"
)
var _ Loader = &Folder{}
type Folder struct {
c []config.Route
}
func NewFolder(r []config.Route) *Folder {
return &Folder{
c: r,
}
}
func (f *Folder) ListMagnets() (map[string][]string, error) {
return nil, nil
}
func (f *Folder) ListTorrentPaths() (map[string][]string, error) {
out := make(map[string][]string)
for _, r := range f.c {
if r.TorrentFolder == "" {
continue
}
err := filepath.WalkDir(r.TorrentFolder, func(p string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if path.Ext(p) == ".torrent" {
out[r.Name] = append(out[r.Name], p)
}
return nil
})
if err != nil {
return nil, err
}
}
return out, nil
}

View file

@ -1,13 +0,0 @@
package loader
type Loader interface {
ListMagnets() (map[string][]string, error)
ListTorrentPaths() (map[string][]string, error)
}
type LoaderAdder interface {
Loader
RemoveFromHash(r, h string) (bool, error)
AddMagnet(r, m string) error
}

View file

@ -1,255 +0,0 @@
package torrent
import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"git.kmsign.ru/royalcat/tstor/src/config"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type ServerState int
const (
UNKNOWN ServerState = iota
SEEDING
READING
UPDATING
STOPPED
ERROR
)
func (ss ServerState) String() string {
return [...]string{"Unknown", "Seeding", "Reading", "Updating", "Stopped", "Error"}[ss]
}
type ServerInfo struct {
Magnet string `json:"magnetUri"`
UpdatedAt int64 `json:"updatedAt"`
Name string `json:"name"`
Folder string `json:"folder"`
State string `json:"state"`
Peers int `json:"peers"`
Seeds int `json:"seeds"`
}
type Server struct {
cfg *config.Server
log zerolog.Logger
fw *fsnotify.Watcher
eventsCount uint64
c *torrent.Client
pc storage.PieceCompletion
mu sync.RWMutex
t *torrent.Torrent
si ServerInfo
}
func NewServer(c *torrent.Client, pc storage.PieceCompletion, cfg *config.Server) *Server {
l := log.Logger.With().Str("component", "server").Str("name", cfg.Name).Logger()
return &Server{
cfg: cfg,
log: l,
c: c,
pc: pc,
}
}
func (s *Server) Start() error {
s.log.Info().Msg("starting new server folder")
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
if err := os.MkdirAll(s.cfg.Path, 0744); err != nil {
return fmt.Errorf("error creating server folder: %s. Error: %w", s.cfg.Path, err)
}
if err := filepath.Walk(s.cfg.Path,
func(path string, info os.FileInfo, err error) error {
if info.Mode().IsDir() {
s.log.Debug().Str("folder", path).Msg("adding new folder")
return w.Add(path)
}
return nil
}); err != nil {
return err
}
s.fw = w
go func() {
if err := s.makeMagnet(); err != nil {
s.updateState(ERROR)
s.log.Error().Err(err).Msg("error generating magnet on start")
}
s.watch()
}()
go func() {
for {
select {
case event, ok := <-w.Events:
if !ok {
return
}
s.log.Info().Str("file", event.Name).Str("op", event.Op.String()).Msg("file changed inside server folder")
atomic.AddUint64(&s.eventsCount, 1)
case err, ok := <-w.Errors:
if !ok {
return
}
s.updateState(STOPPED)
s.log.Error().Err(err).Msg("error watching server folder")
}
}
}()
s.log.Info().Msg("server folder started")
return nil
}
func (s *Server) watch() {
s.log.Info().Msg("starting watcher")
for range time.Tick(time.Second * 5) {
if s.eventsCount == 0 {
continue
}
ec := s.eventsCount
if err := s.makeMagnet(); err != nil {
s.updateState(ERROR)
s.log.Error().Err(err).Msg("error generating magnet")
}
atomic.AddUint64(&s.eventsCount, -ec)
}
}
func (s *Server) makeMagnet() error {
s.log.Info().Msg("starting serving new torrent")
info := metainfo.Info{
PieceLength: 2 << 18,
}
s.updateState(READING)
if err := info.BuildFromFilePath(s.cfg.Path); err != nil {
return err
}
s.updateState(UPDATING)
if len(info.Files) == 0 {
s.mu.Lock()
s.si.Magnet = ""
s.si.Folder = s.cfg.Path
s.si.Name = s.cfg.Name
s.si.UpdatedAt = time.Now().Unix()
s.mu.Unlock()
s.log.Info().Msg("not creating magnet from empty folder")
s.updateState(STOPPED)
return nil
}
mi := metainfo.MetaInfo{
InfoBytes: bencode.MustMarshal(info),
}
ih := mi.HashInfoBytes()
to, _ := s.c.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: ih,
Storage: storage.NewFileOpts(storage.NewFileClientOpts{
ClientBaseDir: s.cfg.Path,
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
return filepath.Join(opts.File.Path...)
},
TorrentDirMaker: nil,
PieceCompletion: s.pc,
}),
})
tks := s.trackers()
err := to.MergeSpec(&torrent.TorrentSpec{
InfoBytes: mi.InfoBytes,
Trackers: [][]string{tks},
})
if err != nil {
return err
}
m := metainfo.Magnet{
InfoHash: ih,
DisplayName: s.cfg.Name,
Trackers: tks,
}
s.mu.Lock()
s.t = to
s.si.Magnet = m.String()
s.si.Folder = s.cfg.Path
s.si.Name = s.cfg.Name
s.si.UpdatedAt = time.Now().Unix()
s.mu.Unlock()
s.updateState(SEEDING)
s.log.Info().Str("hash", ih.HexString()).Msg("new torrent is ready")
return nil
}
func (s *Server) updateState(ss ServerState) {
s.mu.Lock()
s.si.State = ss.String()
s.mu.Unlock()
}
func (s *Server) trackers() []string {
// TODO load trackers from URL too
return s.cfg.Trackers
}
func (s *Server) Close() error {
if s.fw == nil {
return nil
}
return s.fw.Close()
}
func (s *Server) Info() *ServerInfo {
s.mu.RLock()
defer s.mu.RUnlock()
if s.t != nil {
st := s.t.Stats()
s.si.Peers = st.TotalPeers
s.si.Seeds = st.ConnectedSeeders
}
return &s.si
}

View file

@ -1,203 +0,0 @@
package torrent
import (
"errors"
"fmt"
"path"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/torrent/loader"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type Service struct {
c *torrent.Client
s *Stats
mu sync.Mutex
fss map[string]fs.Filesystem
loaders []loader.Loader
db loader.LoaderAdder
log zerolog.Logger
addTimeout, readTimeout int
}
func NewService(loaders []loader.Loader, db loader.LoaderAdder, stats *Stats, c *torrent.Client, addTimeout, readTimeout int) *Service {
l := log.Logger.With().Str("component", "torrent-service").Logger()
return &Service{
log: l,
s: stats,
c: c,
fss: make(map[string]fs.Filesystem),
loaders: loaders,
db: db,
addTimeout: addTimeout,
readTimeout: readTimeout,
}
}
func (s *Service) Load() (map[string]fs.Filesystem, error) {
// Load from config
s.log.Info().Msg("adding torrents from configuration")
for _, loader := range s.loaders {
if err := s.load(loader); err != nil {
return nil, err
}
}
// Load from DB
s.log.Info().Msg("adding torrents from database")
return s.fss, s.load(s.db)
}
func (s *Service) load(l loader.Loader) error {
list, err := l.ListMagnets()
if err != nil {
return err
}
for r, ms := range list {
s.addRoute(r)
for _, m := range ms {
if err := s.addMagnet(r, m); err != nil {
return err
}
}
}
list, err = l.ListTorrentPaths()
if err != nil {
return err
}
for r, ms := range list {
s.addRoute(r)
for _, p := range ms {
if err := s.addTorrentPath(r, p); err != nil {
return err
}
}
}
return nil
}
func (s *Service) AddMagnet(r, m string) error {
if err := s.addMagnet(r, m); err != nil {
return err
}
// Add to db
return s.db.AddMagnet(r, m)
}
func (s *Service) addTorrentPath(r, p string) error {
// Add to client
t, err := s.c.AddTorrentFromFile(p)
if err != nil {
return err
}
return s.addTorrent(r, t)
}
func (s *Service) addMagnet(r, m string) error {
// Add to client
t, err := s.c.AddMagnet(m)
if err != nil {
return err
}
return s.addTorrent(r, t)
}
func (s *Service) addRoute(r string) {
s.s.AddRoute(r)
// Add to filesystems
folder := path.Join("/", r)
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.fss[folder]
if !ok {
s.fss[folder] = fs.NewTorrent(s.readTimeout)
}
}
func (s *Service) addTorrent(r string, t *torrent.Torrent) error {
// only get info if name is not available
if t.Info() == nil {
s.log.Info().Str("hash", t.InfoHash().String()).Msg("getting torrent info")
select {
case <-time.After(time.Duration(s.addTimeout) * time.Second):
s.log.Error().Str("hash", t.InfoHash().String()).Msg("timeout getting torrent info")
return errors.New("timeout getting torrent info")
case <-t.GotInfo():
s.log.Info().Str("hash", t.InfoHash().String()).Msg("obtained torrent info")
}
}
// Add to stats
s.s.Add(r, t)
// Add to filesystems
folder := path.Join("/", r)
s.mu.Lock()
defer s.mu.Unlock()
tfs, ok := s.fss[folder].(*fs.Torrent)
if !ok {
return errors.New("error adding torrent to filesystem")
}
tfs.AddTorrent(t)
s.log.Info().Str("name", t.Info().Name).Str("route", r).Msg("torrent added")
return nil
}
func (s *Service) RemoveFromHash(r, h string) error {
// Remove from db
deleted, err := s.db.RemoveFromHash(r, h)
if err != nil {
return err
}
if !deleted {
return fmt.Errorf("element with hash %v on route %v cannot be removed", h, r)
}
// Remove from stats
s.s.Del(r, h)
// Remove from fs
folder := path.Join("/", r)
tfs, ok := s.fss[folder].(*fs.Torrent)
if !ok {
return errors.New("error removing torrent from filesystem")
}
tfs.RemoveTorrent(h)
// Remove from client
var mh metainfo.Hash
if err := mh.FromHexString(h); err != nil {
return err
}
t, ok := s.c.Torrent(metainfo.NewHashFromHex(h))
if ok {
t.Drop()
}
return nil
}