royalcat refactoring

This commit is contained in:
royalcat 2023-10-08 19:46:03 +03:00
parent 1da835cea6
commit b245c9f451
81 changed files with 1476 additions and 1580 deletions

82
src/config/default.go Normal file
View file

@ -0,0 +1,82 @@
package config
const (
metadataFolder = "./tstor-data/metadata"
mountFolder = "./tstor-data/mount"
logsFolder = "./tstor-data/logs"
serverFolder = "./tstor-data/served-folders/server"
)
var defaultConfig = Config{
WebUi: WebUi{
Port: 4444,
IP: "0.0.0.0",
},
Mounts: Mounts{
HttpFs: HttpFs{
Enabled: true,
Port: 4445,
},
WebDAV: WebDAV{
Enabled: true,
Port: 36911,
User: "admin",
Pass: "admin",
},
Fuse: Fuse{
Enabled: false,
AllowOther: false,
Path: mountFolder,
},
},
TorrentClient: TorrentClient{
GlobalCacheSize: 2048,
MetadataFolder: metadataFolder,
AddTimeout: 60,
ReadTimeout: 120,
},
Log: Log{
Path: logsFolder,
MaxBackups: 2,
MaxSize: 50,
},
}
var defaultRoutes = []Route{
{
Name: "multimedia",
Torrents: []Torrent{
{
MagnetURI: "magnet:?xt=urn:btih:c9e15763f722f23e98a29decdfae341b98d53056&dn=Cosmos+Laundromat&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fcosmos-laundromat.torrent",
},
{
MagnetURI: "magnet:?xt=urn:btih:dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c&dn=Big+Buck+Bunny&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fbig-buck-bunny.torrent",
},
{
MagnetURI: "magnet:?xt=urn:btih:08ada5a7a6183aae1e09d831df6748d566095a10&dn=Sintel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fsintel.torrent",
},
{
MagnetURI: "magnet:?xt=urn:btih:209c8226b299b308beaf2b9cd3fb49212dbd13ec&dn=Tears+of+Steel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Ftears-of-steel.torrent",
},
{
MagnetURI: "magnet:?xt=urn:btih:a88fda5954e89178c372716a6a78b8180ed4dad3&dn=The+WIRED+CD+-+Rip.+Sample.+Mash.+Share&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fwired-cd.torrent",
},
},
},
}
var defaultServers = []Server{
{
Name: "server",
Path: serverFolder,
Trackers: []string{
"wss://tracker.btorrent.xyz",
"wss://tracker.openwebtorrent.com",
"http://p4p.arenabg.com:1337/announce",
"udp://tracker.opentrackr.org:1337/announce",
"udp://open.tracker.cl:1337/announce",
"http://openbittorrent.com:80/announce",
},
},
}

48
src/config/load.go Normal file
View file

@ -0,0 +1,48 @@
package config
import (
"os"
"strings"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/providers/structs"
"github.com/knadh/koanf/v2"
)
var k = koanf.New(".")
func Load(path string) (*Config, error) {
err := k.Load(structs.Provider(defaultConfig, "koanf"), nil)
if err != nil {
return nil, err
}
if path != "" {
_ = k.Load(file.Provider(path), yaml.Parser()) // its ok if file doesnt exist
}
err = k.Load(env.Provider("tstor_", ".", func(s string) string {
return strings.Replace(strings.ToLower(
strings.TrimPrefix(s, "tstor_")), "_", ".", -1)
}), nil)
if err != nil {
return nil, err
}
data, err := k.Marshal(yaml.Parser())
if err != nil {
return nil, err
}
err = os.WriteFile(path, data, os.ModePerm)
if err != nil {
return nil, err
}
conf := Config{}
k.Unmarshal("", &conf)
return &conf, nil
}

75
src/config/model.go Normal file
View file

@ -0,0 +1,75 @@
package config
// Config is the main config object
type Config struct {
WebUi WebUi `koanf:"webUi"`
TorrentClient TorrentClient `koanf:"torrent"`
Mounts Mounts `koanf:"mounts"`
Log Log `koanf:"log"`
}
type WebUi struct {
Port int `koanf:"port"`
IP string `koanf:"ip"`
}
type Log struct {
Debug bool `koanf:"debug"`
MaxBackups int `koanf:"max_backups"`
MaxSize int `koanf:"max_size"`
MaxAge int `koanf:"max_age"`
Path string `koanf:"path"`
}
type TorrentClient struct {
ReadTimeout int `koanf:"read_timeout,omitempty"`
AddTimeout int `koanf:"add_timeout,omitempty"`
GlobalCacheSize int64 `koanf:"global_cache_size,omitempty"`
MetadataFolder string `koanf:"metadata_folder,omitempty"`
DisableIPv6 bool `koanf:"disable_ipv6,omitempty"`
Routes []Route `koanf:"routes"`
Servers []Server `koanf:"servers"`
}
type Route struct {
Name string `koanf:"name"`
Torrents []Torrent `koanf:"torrents"`
TorrentFolder string `koanf:"torrent_folder"`
}
type Server struct {
Name string `koanf:"name"`
Path string `koanf:"path"`
Trackers []string `koanf:"trackers"`
TrackerURL string `koanf:"tracker_url"`
}
type Torrent struct {
MagnetURI string `koanf:"magnet_uri,omitempty"`
TorrentPath string `koanf:"torrent_path,omitempty"`
}
type Mounts struct {
WebDAV WebDAV `koanf:"webdav"`
HttpFs HttpFs `koanf:"httpfs"`
Fuse Fuse `koanf:"fuse"`
}
type HttpFs struct {
Enabled bool `koanf:"enabled"`
Port int `koanf:"port"`
}
type WebDAV struct {
Enabled bool `koanf:"enabled"`
Port int `koanf:"port"`
User string `koanf:"user"`
Pass string `koanf:"pass"`
}
type Fuse struct {
Enabled bool `koanf:"enabled"`
AllowOther bool `koanf:"allow_other,omitempty"`
Path string `koanf:"path"`
}

246
src/fs/archive.go Normal file
View file

@ -0,0 +1,246 @@
package fs
import (
"archive/zip"
"io"
"os"
"path/filepath"
"sync"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/bodgit/sevenzip"
"github.com/nwaples/rardecode/v2"
)
var _ loader = &Zip{}
type Zip struct {
}
func (fs *Zip) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
zr, err := zip.NewReader(reader, size)
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
for _, f := range zr.File {
f := f
if f.FileInfo().IsDir() {
continue
}
rf := func() (iio.Reader, error) {
zr, err := f.Open()
if err != nil {
return nil, err
}
return iio.NewDiskTeeReader(zr)
}
n := filepath.Join(string(os.PathSeparator), f.Name)
af := NewArchiveFile(rf, f.FileInfo().Size())
out[n] = af
}
return out, nil
}
var _ loader = &SevenZip{}
type SevenZip struct {
}
func (fs *SevenZip) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
r, err := sevenzip.NewReader(reader, size)
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
for _, f := range r.File {
f := f
if f.FileInfo().IsDir() {
continue
}
rf := func() (iio.Reader, error) {
zr, err := f.Open()
if err != nil {
return nil, err
}
return iio.NewDiskTeeReader(zr)
}
af := NewArchiveFile(rf, f.FileInfo().Size())
n := filepath.Join(string(os.PathSeparator), f.Name)
out[n] = af
}
return out, nil
}
var _ loader = &Rar{}
type Rar struct {
}
func (fs *Rar) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
r, err := rardecode.NewReader(iio.NewSeekerWrapper(reader, size))
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
for {
header, err := r.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
rf := func() (iio.Reader, error) {
return iio.NewDiskTeeReader(r)
}
n := filepath.Join(string(os.PathSeparator), header.Name)
af := NewArchiveFile(rf, header.UnPackedSize)
out[n] = af
}
return out, nil
}
type loader interface {
getFiles(r iio.Reader, size int64) (map[string]*ArchiveFile, error)
}
var _ Filesystem = &archive{}
type archive struct {
r iio.Reader
s *storage
size int64
once sync.Once
l loader
}
func NewArchive(r iio.Reader, size int64, l loader) *archive {
return &archive{
r: r,
s: newStorage(nil),
size: size,
l: l,
}
}
func (fs *archive) loadOnce() error {
var errOut error
fs.once.Do(func() {
files, err := fs.l.getFiles(fs.r, fs.size)
if err != nil {
errOut = err
return
}
for name, file := range files {
if err := fs.s.Add(file, name); err != nil {
errOut = err
return
}
}
})
return errOut
}
func (fs *archive) Open(filename string) (File, error) {
if filename == string(os.PathSeparator) {
return &Dir{}, nil
}
if err := fs.loadOnce(); err != nil {
return nil, err
}
return fs.s.Get(filename)
}
func (fs *archive) ReadDir(path string) (map[string]File, error) {
if err := fs.loadOnce(); err != nil {
return nil, err
}
return fs.s.Children(path)
}
var _ File = &ArchiveFile{}
func NewArchiveFile(readerFunc func() (iio.Reader, error), len int64) *ArchiveFile {
return &ArchiveFile{
readerFunc: readerFunc,
len: len,
}
}
type ArchiveFile struct {
readerFunc func() (iio.Reader, error)
reader iio.Reader
len int64
}
func (d *ArchiveFile) load() error {
if d.reader != nil {
return nil
}
r, err := d.readerFunc()
if err != nil {
return err
}
d.reader = r
return nil
}
func (d *ArchiveFile) Size() int64 {
return d.len
}
func (d *ArchiveFile) IsDir() bool {
return false
}
func (d *ArchiveFile) Close() (err error) {
if d.reader != nil {
err = d.reader.Close()
d.reader = nil
}
return
}
func (d *ArchiveFile) Read(p []byte) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.Read(p)
}
func (d *ArchiveFile) ReadAt(p []byte, off int64) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.ReadAt(p, off)
}

66
src/fs/archive_test.go Normal file
View file

@ -0,0 +1,66 @@
package fs
import (
"archive/zip"
"bytes"
"io"
"testing"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/stretchr/testify/require"
)
var fileContent []byte = []byte("Hello World")
func TestZipFilesystem(t *testing.T) {
t.Parallel()
require := require.New(t)
zReader, len := createTestZip(require)
zfs := NewArchive(zReader, len, &Zip{})
files, err := zfs.ReadDir("/path/to/test/file")
require.NoError(err)
require.Len(files, 1)
f := files["1.txt"]
require.NotNil(f)
out := make([]byte, 11)
n, err := f.Read(out)
require.Equal(io.EOF, err)
require.Equal(11, n)
require.Equal(fileContent, out)
}
func createTestZip(require *require.Assertions) (iio.Reader, int64) {
buf := bytes.NewBuffer([]byte{})
zWriter := zip.NewWriter(buf)
f1, err := zWriter.Create("path/to/test/file/1.txt")
require.NoError(err)
_, err = f1.Write(fileContent)
require.NoError(err)
err = zWriter.Close()
require.NoError(err)
return newCBR(buf.Bytes()), int64(buf.Len())
}
type closeableByteReader struct {
*bytes.Reader
}
func newCBR(b []byte) *closeableByteReader {
return &closeableByteReader{
Reader: bytes.NewReader(b),
}
}
func (*closeableByteReader) Close() error {
return nil
}

24
src/fs/container.go Normal file
View file

@ -0,0 +1,24 @@
package fs
type ContainerFs struct {
s *storage
}
func NewContainerFs(fss map[string]Filesystem) (*ContainerFs, error) {
s := newStorage(SupportedFactories)
for p, fs := range fss {
if err := s.AddFS(fs, p); err != nil {
return nil, err
}
}
return &ContainerFs{s: s}, nil
}
func (fs *ContainerFs) Open(filename string) (File, error) {
return fs.s.Get(filename)
}
func (fs *ContainerFs) ReadDir(path string) (map[string]File, error) {
return fs.s.Children(path)
}

28
src/fs/container_test.go Normal file
View file

@ -0,0 +1,28 @@
package fs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestContainer(t *testing.T) {
t.Parallel()
require := require.New(t)
fss := map[string]Filesystem{
"/test": &DummyFs{},
}
c, err := NewContainerFs(fss)
require.NoError(err)
f, err := c.Open("/test/dir/here")
require.NoError(err)
require.NotNil(f)
files, err := c.ReadDir("/")
require.NoError(err)
require.Len(files, 1)
}

26
src/fs/dir.go Normal file
View file

@ -0,0 +1,26 @@
package fs
var _ File = &Dir{}
type Dir struct {
}
func (d *Dir) Size() int64 {
return 0
}
func (d *Dir) IsDir() bool {
return true
}
func (d *Dir) Close() error {
return nil
}
func (d *Dir) Read(p []byte) (n int, err error) {
return 0, nil
}
func (d *Dir) ReadAt(p []byte, off int64) (n int, err error) {
return 0, nil
}

69
src/fs/fs.go Normal file
View file

@ -0,0 +1,69 @@
package fs
import (
"os"
"time"
"git.kmsign.ru/royalcat/tstor/src/iio"
)
type File interface {
IsDir() bool
Size() int64
iio.Reader
}
type Filesystem interface {
// Open opens the named file for reading. If successful, methods on the
// returned file can be used for reading; the associated file descriptor has
// mode O_RDONLY.
Open(filename string) (File, error)
// ReadDir reads the directory named by dirname and returns a list of
// directory entries.
ReadDir(path string) (map[string]File, error)
}
type fileInfo struct {
name string
size int64
isDir bool
}
func NewFileInfo(name string, size int64, isDir bool) *fileInfo {
return &fileInfo{
name: name,
size: size,
isDir: isDir,
}
}
func (fi *fileInfo) Name() string {
return fi.name
}
func (fi *fileInfo) Size() int64 {
return fi.size
}
func (fi *fileInfo) Mode() os.FileMode {
if fi.isDir {
return 0555 | os.ModeDir
}
return 0555
}
func (fi *fileInfo) ModTime() time.Time {
// TODO fix it
return time.Now()
}
func (fi *fileInfo) IsDir() bool {
return fi.isDir
}
func (fi *fileInfo) Sys() interface{} {
return nil
}

24
src/fs/fs_test.go Normal file
View file

@ -0,0 +1,24 @@
package fs
import (
"io/fs"
"testing"
"github.com/stretchr/testify/require"
)
func TestFileinfo(t *testing.T) {
t.Parallel()
require := require.New(t)
fi := NewFileInfo("name", 42, false)
require.Equal(fi.IsDir(), false)
require.Equal(fi.Name(), "name")
require.Equal(fi.Size(), int64(42))
require.NotNil(fi.ModTime())
require.Equal(fi.Mode(), fs.FileMode(0555))
require.Equal(fi.Sys(), nil)
}

49
src/fs/memory.go Normal file
View file

@ -0,0 +1,49 @@
package fs
import (
"bytes"
)
var _ Filesystem = &Memory{}
type Memory struct {
Storage *storage
}
func NewMemory() *Memory {
return &Memory{
Storage: newStorage(nil),
}
}
func (fs *Memory) Open(filename string) (File, error) {
return fs.Storage.Get(filename)
}
func (fs *Memory) ReadDir(path string) (map[string]File, error) {
return fs.Storage.Children(path)
}
var _ File = &MemoryFile{}
type MemoryFile struct {
*bytes.Reader
}
func NewMemoryFile(data []byte) *MemoryFile {
return &MemoryFile{
Reader: bytes.NewReader(data),
}
}
func (d *MemoryFile) Size() int64 {
return int64(d.Reader.Len())
}
func (d *MemoryFile) IsDir() bool {
return false
}
func (d *MemoryFile) Close() (err error) {
return
}

39
src/fs/memory_test.go Normal file
View file

@ -0,0 +1,39 @@
package fs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMemory(t *testing.T) {
t.Parallel()
require := require.New(t)
mem := NewMemory()
mem.Storage.Add(NewMemoryFile([]byte("Hello")), "/dir/here")
fss := map[string]Filesystem{
"/test": mem,
}
c, err := NewContainerFs(fss)
require.NoError(err)
f, err := c.Open("/test/dir/here")
require.NoError(err)
require.NotNil(f)
require.Equal(int64(5), f.Size())
require.NoError(f.Close())
files, err := c.ReadDir("/")
require.NoError(err)
require.Len(files, 1)
files, err = c.ReadDir("/test")
require.NoError(err)
require.Len(files, 1)
}

184
src/fs/storage.go Normal file
View file

@ -0,0 +1,184 @@
package fs
import (
"os"
"path"
"strings"
)
const separator = "/"
type FsFactory func(f File) (Filesystem, error)
var SupportedFactories = map[string]FsFactory{
".zip": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &Zip{}), nil
},
".rar": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &Rar{}), nil
},
".7z": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &SevenZip{}), nil
},
}
type storage struct {
factories map[string]FsFactory
files map[string]File
filesystems map[string]Filesystem
children map[string]map[string]File
}
func newStorage(factories map[string]FsFactory) *storage {
return &storage{
files: make(map[string]File),
children: make(map[string]map[string]File),
filesystems: make(map[string]Filesystem),
factories: factories,
}
}
func (s *storage) Clear() {
s.files = make(map[string]File)
s.children = make(map[string]map[string]File)
s.filesystems = make(map[string]Filesystem)
s.Add(&Dir{}, "/")
}
func (s *storage) Has(path string) bool {
path = clean(path)
f := s.files[path]
if f != nil {
return true
}
if f, _ := s.getFileFromFs(path); f != nil {
return true
}
return false
}
func (s *storage) AddFS(fs Filesystem, p string) error {
p = clean(p)
if s.Has(p) {
if dir, err := s.Get(p); err == nil {
if !dir.IsDir() {
return os.ErrExist
}
}
return nil
}
s.filesystems[p] = fs
return s.createParent(p, &Dir{})
}
func (s *storage) Add(f File, p string) error {
p = clean(p)
if s.Has(p) {
if dir, err := s.Get(p); err == nil {
if !dir.IsDir() {
return os.ErrExist
}
}
return nil
}
ext := path.Ext(p)
if ffs := s.factories[ext]; ffs != nil {
fs, err := ffs(f)
if err != nil {
return err
}
s.filesystems[p] = fs
} else {
s.files[p] = f
}
return s.createParent(p, f)
}
func (s *storage) createParent(p string, f File) error {
base, filename := path.Split(p)
base = clean(base)
if err := s.Add(&Dir{}, base); err != nil {
return err
}
if _, ok := s.children[base]; !ok {
s.children[base] = make(map[string]File)
}
if filename != "" {
s.children[base][filename] = f
}
return nil
}
func (s *storage) Children(path string) (map[string]File, error) {
path = clean(path)
files, err := s.getDirFromFs(path)
if err == nil {
return files, nil
}
if !os.IsNotExist(err) {
return nil, err
}
l := make(map[string]File)
for n, f := range s.children[path] {
l[n] = f
}
return l, nil
}
func (s *storage) Get(path string) (File, error) {
path = clean(path)
if !s.Has(path) {
return nil, os.ErrNotExist
}
file, ok := s.files[path]
if ok {
return file, nil
}
return s.getFileFromFs(path)
}
func (s *storage) getFileFromFs(p string) (File, error) {
for fsp, fs := range s.filesystems {
if strings.HasPrefix(p, fsp) {
return fs.Open(separator + strings.TrimPrefix(p, fsp))
}
}
return nil, os.ErrNotExist
}
func (s *storage) getDirFromFs(p string) (map[string]File, error) {
for fsp, fs := range s.filesystems {
if strings.HasPrefix(p, fsp) {
path := strings.TrimPrefix(p, fsp)
return fs.ReadDir(path)
}
}
return nil, os.ErrNotExist
}
func clean(p string) string {
return path.Clean(separator + strings.ReplaceAll(p, "\\", "/"))
}

195
src/fs/storage_test.go Normal file
View file

@ -0,0 +1,195 @@
package fs
import (
"os"
"testing"
"github.com/stretchr/testify/require"
)
var dummyFactories = map[string]FsFactory{
".test": func(f File) (Filesystem, error) {
return &DummyFs{}, nil
},
}
func TestStorage(t *testing.T) {
t.Parallel()
require := require.New(t)
s := newStorage(dummyFactories)
err := s.Add(&Dummy{}, "/path/to/dummy/file.txt")
require.NoError(err)
err = s.Add(&Dummy{}, "/path/to/dummy/file2.txt")
require.NoError(err)
contains := s.Has("/path")
require.True(contains)
contains = s.Has("/path/to/dummy/")
require.True(contains)
file, err := s.Get("/path/to/dummy/file.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
file, err = s.Get("/path/to/dummy/file3.txt")
require.Error(err)
require.Nil(file)
files, err := s.Children("/path/to/dummy/")
require.NoError(err)
require.Len(files, 2)
require.Contains(files, "file.txt")
require.Contains(files, "file2.txt")
err = s.Add(&Dummy{}, "/path/to/dummy/folder/file.txt")
require.NoError(err)
files, err = s.Children("/path/to/dummy/")
require.NoError(err)
require.Len(files, 3)
require.Contains(files, "file.txt")
require.Contains(files, "file2.txt")
require.Contains(files, "folder")
err = s.Add(&Dummy{}, "path/file4.txt")
require.NoError(err)
require.True(s.Has("/path/file4.txt"))
files, err = s.Children("/")
require.NoError(err)
require.Len(files, 1)
err = s.Add(&Dummy{}, "/path/special_file.test")
require.NoError(err)
file, err = s.Get("/path/special_file.test/dir/here/file1.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
files, err = s.Children("/path/special_file.test")
require.NoError(err)
require.NotNil(files)
files, err = s.Children("/path/special_file.test/dir/here")
require.NoError(err)
require.Len(files, 2)
err = s.Add(&Dummy{}, "/path/to/__special__path/file3.txt")
require.NoError(err)
file, err = s.Get("/path/to/__special__path/file3.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
s.Clear()
}
func TestStorageWindowsPath(t *testing.T) {
t.Parallel()
require := require.New(t)
s := newStorage(dummyFactories)
err := s.Add(&Dummy{}, "\\path\\to\\dummy\\file.txt")
require.NoError(err)
file, err := s.Get("\\path\\to\\dummy\\file.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
file, err = s.Get("/path/to/dummy/file.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
}
func TestStorageAddFs(t *testing.T) {
t.Parallel()
require := require.New(t)
s := newStorage(dummyFactories)
err := s.AddFS(&DummyFs{}, "/test")
require.NoError(err)
f, err := s.Get("/test/dir/here/file1.txt")
require.NoError(err)
require.NotNil(f)
err = s.AddFS(&DummyFs{}, "/test")
require.Error(err)
}
func TestSupportedFactories(t *testing.T) {
t.Parallel()
require := require.New(t)
require.Contains(SupportedFactories, ".zip")
require.Contains(SupportedFactories, ".rar")
require.Contains(SupportedFactories, ".7z")
fs, err := SupportedFactories[".zip"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = SupportedFactories[".rar"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = SupportedFactories[".7z"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
}
var _ Filesystem = &DummyFs{}
type DummyFs struct {
}
func (d *DummyFs) Open(filename string) (File, error) {
return &Dummy{}, nil
}
func (d *DummyFs) ReadDir(path string) (map[string]File, error) {
if path == "/dir/here" {
return map[string]File{
"file1.txt": &Dummy{},
"file2.txt": &Dummy{},
}, nil
}
return nil, os.ErrNotExist
}
var _ File = &Dummy{}
type Dummy struct {
}
func (d *Dummy) Size() int64 {
return 0
}
func (d *Dummy) IsDir() bool {
return false
}
func (d *Dummy) Close() error {
return nil
}
func (d *Dummy) Read(p []byte) (n int, err error) {
return 0, nil
}
func (d *Dummy) ReadAt(p []byte, off int64) (n int, err error) {
return 0, nil
}

197
src/fs/torrent.go Normal file
View file

@ -0,0 +1,197 @@
package fs
import (
"context"
"io"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent"
)
var _ Filesystem = &Torrent{}
type Torrent struct {
mu sync.RWMutex
ts map[string]*torrent.Torrent
s *storage
loaded bool
readTimeout int
}
func NewTorrent(readTimeout int) *Torrent {
return &Torrent{
s: newStorage(SupportedFactories),
ts: make(map[string]*torrent.Torrent),
readTimeout: readTimeout,
}
}
func (fs *Torrent) AddTorrent(t *torrent.Torrent) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.loaded = false
fs.ts[t.InfoHash().HexString()] = t
}
func (fs *Torrent) RemoveTorrent(h string) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.s.Clear()
fs.loaded = false
delete(fs.ts, h)
}
func (fs *Torrent) load() {
if fs.loaded {
return
}
fs.mu.RLock()
defer fs.mu.RUnlock()
for _, t := range fs.ts {
<-t.GotInfo()
for _, file := range t.Files() {
fs.s.Add(&torrentFile{
readerFunc: file.NewReader,
len: file.Length(),
timeout: fs.readTimeout,
}, file.Path())
}
}
fs.loaded = true
}
func (fs *Torrent) Open(filename string) (File, error) {
fs.load()
return fs.s.Get(filename)
}
func (fs *Torrent) ReadDir(path string) (map[string]File, error) {
fs.load()
return fs.s.Children(path)
}
type reader interface {
iio.Reader
missinggo.ReadContexter
}
type readAtWrapper struct {
timeout int
mu sync.Mutex
torrent.Reader
io.ReaderAt
io.Closer
}
func newReadAtWrapper(r torrent.Reader, timeout int) reader {
return &readAtWrapper{Reader: r, timeout: timeout}
}
func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) {
rw.mu.Lock()
defer rw.mu.Unlock()
_, err := rw.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
return readAtLeast(rw, rw.timeout, p, len(p))
}
func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(timeout)*time.Second,
func() {
cancel()
},
)
nn, err = r.ReadContext(ctx, buf[n:])
n += nn
timer.Stop()
}
if n >= min {
err = nil
} else if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}
func (rw *readAtWrapper) Close() error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.Reader.Close()
}
var _ File = &torrentFile{}
type torrentFile struct {
readerFunc func() torrent.Reader
reader reader
len int64
timeout int
}
func (d *torrentFile) load() {
if d.reader != nil {
return
}
d.reader = newReadAtWrapper(d.readerFunc(), d.timeout)
}
func (d *torrentFile) Size() int64 {
return d.len
}
func (d *torrentFile) IsDir() bool {
return false
}
func (d *torrentFile) Close() error {
var err error
if d.reader != nil {
err = d.reader.Close()
}
d.reader = nil
return err
}
func (d *torrentFile) Read(p []byte) (n int, err error) {
d.load()
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
func() {
cancel()
},
)
defer timer.Stop()
return d.reader.ReadContext(ctx, p)
}
func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) {
d.load()
return d.reader.ReadAt(p, off)
}

140
src/fs/torrent_test.go Normal file
View file

@ -0,0 +1,140 @@
package fs
import (
"os"
"testing"
"github.com/anacrolix/torrent"
"github.com/stretchr/testify/require"
)
const testMagnet = "magnet:?xt=urn:btih:a88fda5954e89178c372716a6a78b8180ed4dad3&dn=The+WIRED+CD+-+Rip.+Sample.+Mash.+Share&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fwired-cd.torrent"
var Cli *torrent.Client
func TestMain(m *testing.M) {
cfg := torrent.NewDefaultClientConfig()
cfg.DataDir = os.TempDir()
// disable webseeds to avoid a panic when closing client on tests
cfg.DisableWebseeds = true
client, err := torrent.NewClient(cfg)
if err != nil {
panic(err)
}
Cli = client
exitVal := m.Run()
client.Close()
os.Exit(exitVal)
}
func TestTorrentFilesystem(t *testing.T) {
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
tfs := NewTorrent(600)
tfs.AddTorrent(to)
files, err := tfs.ReadDir("/")
require.NoError(err)
require.Len(files, 1)
require.Contains(files, "The WIRED CD - Rip. Sample. Mash. Share")
files, err = tfs.ReadDir("/The WIRED CD - Rip. Sample. Mash. Share")
require.NoError(err)
require.Len(files, 18)
f, err := tfs.Open("/The WIRED CD - Rip. Sample. Mash. Share/not_existing_file.txt")
require.Equal(os.ErrNotExist, err)
require.Nil(f)
f, err = tfs.Open("/The WIRED CD - Rip. Sample. Mash. Share/01 - Beastie Boys - Now Get Busy.mp3")
require.NoError(err)
require.NotNil(f)
require.Equal(f.Size(), int64(1964275))
b := make([]byte, 10)
n, err := f.Read(b)
require.NoError(err)
require.Equal(10, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0, 0x0, 0x0, 0x0, 0x1f, 0x76}, b)
n, err = f.ReadAt(b, 10)
require.NoError(err)
require.Equal(10, n)
n, err = f.ReadAt(b, 10000)
require.NoError(err)
require.Equal(10, n)
tfs.RemoveTorrent(to.InfoHash().String())
files, err = tfs.ReadDir("/")
require.NoError(err)
require.Len(files, 0)
require.NoError(f.Close())
}
func TestReadAtTorrent(t *testing.T) {
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
<-to.GotInfo()
torrFile := to.Files()[0]
tf := torrentFile{
readerFunc: torrFile.NewReader,
len: torrFile.Length(),
timeout: 500,
}
defer tf.Close()
toRead := make([]byte, 5)
n, err := tf.ReadAt(toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = tf.ReadAt(toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}
func TestReadAtWrapper(t *testing.T) {
t.Parallel()
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
<-to.GotInfo()
torrFile := to.Files()[0]
r := newReadAtWrapper(torrFile.NewReader(), 10)
defer r.Close()
toRead := make([]byte, 5)
n, err := r.ReadAt(toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = r.ReadAt(toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}

127
src/http/api.go Normal file
View file

@ -0,0 +1,127 @@
package http
import (
"bytes"
"io"
"math"
"net/http"
"os"
"sort"
"git.kmsign.ru/royalcat/tstor/src/torrent"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-gonic/gin"
)
var apiStatusHandler = func(fc *filecache.Cache, ss *torrent.Stats) gin.HandlerFunc {
return func(ctx *gin.Context) {
stat := gin.H{
"torrentStats": ss.GlobalStats(),
}
if fc != nil {
stat["cacheItems"] = fc.Info().NumItems
stat["cacheFilled"] = fc.Info().Filled / 1024 / 1024
stat["cacheCapacity"] = fc.Info().Capacity / 1024 / 1024
}
// TODO move to a struct
ctx.JSON(http.StatusOK, stat)
}
}
var apiServersHandler = func(ss []*torrent.Server) gin.HandlerFunc {
return func(ctx *gin.Context) {
var infos []*torrent.ServerInfo
for _, s := range ss {
infos = append(infos, s.Info())
}
ctx.JSON(http.StatusOK, infos)
}
}
var apiRoutesHandler = func(ss *torrent.Stats) gin.HandlerFunc {
return func(ctx *gin.Context) {
s := ss.RoutesStats()
sort.Sort(torrent.ByName(s))
ctx.JSON(http.StatusOK, s)
}
}
var apiAddTorrentHandler = func(s *torrent.Service) gin.HandlerFunc {
return func(ctx *gin.Context) {
route := ctx.Param("route")
var json RouteAdd
if err := ctx.ShouldBindJSON(&json); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := s.AddMagnet(route, json.Magnet); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
ctx.JSON(http.StatusOK, nil)
}
}
var apiDelTorrentHandler = func(s *torrent.Service) gin.HandlerFunc {
return func(ctx *gin.Context) {
route := ctx.Param("route")
hash := ctx.Param("torrent_hash")
if err := s.RemoveFromHash(route, hash); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
ctx.JSON(http.StatusOK, nil)
}
}
var apiLogHandler = func(path string) gin.HandlerFunc {
return func(ctx *gin.Context) {
f, err := os.Open(path)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
fi, err := f.Stat()
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
max := math.Max(float64(-fi.Size()), -1024*8*8)
_, err = f.Seek(int64(max), io.SeekEnd)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
var b bytes.Buffer
ctx.Stream(func(w io.Writer) bool {
_, err := b.ReadFrom(f)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return false
}
_, err = b.WriteTo(w)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return false
}
return true
})
if err := f.Close(); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
}
}

92
src/http/http.go Normal file
View file

@ -0,0 +1,92 @@
package http
import (
"fmt"
"net/http"
"git.kmsign.ru/royalcat/tstor"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/torrent"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/shurcooL/httpfs/html/vfstemplate"
)
func New(fc *filecache.Cache, ss *torrent.Stats, s *torrent.Service, ch *config.Config, tss []*torrent.Server, fs http.FileSystem, logPath string, cfg *config.Config) error {
gin.SetMode(gin.ReleaseMode)
r := gin.New()
r.Use(gin.Recovery())
r.Use(gin.ErrorLogger())
r.Use(Logger())
r.GET("/assets/*filepath", func(c *gin.Context) {
c.FileFromFS(c.Request.URL.Path, http.FS(tstor.Assets))
})
if cfg.Mounts.HttpFs.Enabled {
log.Info().Str("host", fmt.Sprintf("0.0.0.0:%d/fs", cfg.Mounts.HttpFs.Port)).Msg("starting HTTPFS")
r.GET("/fs/*filepath", func(c *gin.Context) {
path := c.Param("filepath")
c.FileFromFS(path, fs)
})
}
t, err := vfstemplate.ParseGlob(http.FS(tstor.Templates), nil, "/templates/*")
if err != nil {
return fmt.Errorf("error parsing html: %w", err)
}
r.SetHTMLTemplate(t)
r.GET("/", indexHandler)
r.GET("/routes", routesHandler(ss))
r.GET("/logs", logsHandler)
r.GET("/servers", serversFoldersHandler())
api := r.Group("/api")
{
api.GET("/log", apiLogHandler(logPath))
api.GET("/status", apiStatusHandler(fc, ss))
api.GET("/servers", apiServersHandler(tss))
api.GET("/routes", apiRoutesHandler(ss))
api.POST("/routes/:route/torrent", apiAddTorrentHandler(s))
api.DELETE("/routes/:route/torrent/:torrent_hash", apiDelTorrentHandler(s))
}
log.Info().Str("host", fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port)).Msg("starting webserver")
if err := r.Run(fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port)); err != nil {
return fmt.Errorf("error initializing server: %w", err)
}
return nil
}
func Logger() gin.HandlerFunc {
l := log.Logger.With().Str("component", "http").Logger()
return func(c *gin.Context) {
path := c.Request.URL.Path
raw := c.Request.URL.RawQuery
c.Next()
if raw != "" {
path = path + "?" + raw
}
msg := c.Errors.String()
if msg == "" {
msg = "Request"
}
s := c.Writer.Status()
switch {
case s >= 400 && s < 500:
l.Warn().Str("path", path).Int("status", s).Msg(msg)
case s >= 500:
l.Error().Str("path", path).Int("status", s).Msg(msg)
default:
l.Debug().Str("path", path).Int("status", s).Msg(msg)
}
}
}

9
src/http/model.go Normal file
View file

@ -0,0 +1,9 @@
package http
type RouteAdd struct {
Magnet string `json:"magnet" binding:"required"`
}
type Error struct {
Error string `json:"error"`
}

28
src/http/web.go Normal file
View file

@ -0,0 +1,28 @@
package http
import (
"net/http"
"git.kmsign.ru/royalcat/tstor/src/torrent"
"github.com/gin-gonic/gin"
)
var indexHandler = func(c *gin.Context) {
c.HTML(http.StatusOK, "index.html", nil)
}
var routesHandler = func(ss *torrent.Stats) gin.HandlerFunc {
return func(c *gin.Context) {
c.HTML(http.StatusOK, "routes.html", ss.RoutesStats())
}
}
var logsHandler = func(c *gin.Context) {
c.HTML(http.StatusOK, "logs.html", nil)
}
var serversFoldersHandler = func() gin.HandlerFunc {
return func(c *gin.Context) {
c.HTML(http.StatusOK, "servers.html", nil)
}
}

64
src/iio/disk.go Normal file
View file

@ -0,0 +1,64 @@
package iio
import (
"io"
"os"
"sync"
)
type DiskTeeReader struct {
io.ReaderAt
io.Closer
io.Reader
m sync.Mutex
fo int64
fr *os.File
to int64
tr io.Reader
}
func NewDiskTeeReader(r io.Reader) (Reader, error) {
fr, err := os.CreateTemp("", "dtb_tmp")
if err != nil {
return nil, err
}
tr := io.TeeReader(r, fr)
return &DiskTeeReader{fr: fr, tr: tr}, nil
}
func (dtr *DiskTeeReader) ReadAt(p []byte, off int64) (int, error) {
dtr.m.Lock()
defer dtr.m.Unlock()
tb := off + int64(len(p))
if tb > dtr.fo {
w, err := io.CopyN(io.Discard, dtr.tr, tb-dtr.fo)
dtr.to += w
if err != nil && err != io.EOF {
return 0, err
}
}
n, err := dtr.fr.ReadAt(p, off)
dtr.fo += int64(n)
return n, err
}
func (dtr *DiskTeeReader) Read(p []byte) (n int, err error) {
dtr.m.Lock()
defer dtr.m.Unlock()
// use directly tee reader here
n, err = dtr.tr.Read(p)
dtr.to += int64(n)
return
}
func (dtr *DiskTeeReader) Close() error {
if err := dtr.fr.Close(); err != nil {
return err
}
return os.Remove(dtr.fr.Name())
}

50
src/iio/disk_test.go Normal file
View file

@ -0,0 +1,50 @@
package iio
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/require"
)
var testData []byte = []byte("Hello World")
func TestReadData(t *testing.T) {
t.Parallel()
require := require.New(t)
br := bytes.NewReader(testData)
r, err := NewDiskTeeReader(br)
require.NoError(err)
toRead := make([]byte, 5)
n, err := r.ReadAt(toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal("World", string(toRead))
r.ReadAt(toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal("Hello", string(toRead))
}
func TestReadDataEOF(t *testing.T) {
t.Parallel()
require := require.New(t)
br := bytes.NewReader(testData)
r, err := NewDiskTeeReader(br)
require.NoError(err)
toRead := make([]byte, 6)
n, err := r.ReadAt(toRead, 6)
require.Equal(io.EOF, err)
require.Equal(5, n)
require.Equal("World\x00", string(toRead))
}

14
src/iio/reader.go Normal file
View file

@ -0,0 +1,14 @@
package iio
import "io"
type Reader interface {
io.ReaderAt
io.Closer
io.Reader
}
type ReaderSeeker interface {
Reader
io.Seeker
}

45
src/iio/wrapper.go Normal file
View file

@ -0,0 +1,45 @@
package iio
import (
"io"
"sync"
)
type seekerWrapper struct {
mu sync.Mutex
pos int64
size int64
io.Seeker
Reader
}
func NewSeekerWrapper(r Reader, size int64) *seekerWrapper {
return &seekerWrapper{Reader: r}
}
func (r *seekerWrapper) Seek(offset int64, whence int) (int64, error) {
r.mu.Lock()
defer r.mu.Unlock()
switch whence {
case io.SeekStart:
r.pos = offset
case io.SeekCurrent:
r.pos = r.pos + offset
case io.SeekEnd:
r.pos = r.size + offset
}
return r.pos, nil
}
func (r *seekerWrapper) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
n, err := r.ReadAt(p, r.pos)
r.pos += int64(n)
return n, err
}

33
src/iio/wrapper_test.go Normal file
View file

@ -0,0 +1,33 @@
package iio_test
import (
"io"
"testing"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/stretchr/testify/require"
)
var testData []byte = []byte("Hello World")
func TestSeekerWrapper(t *testing.T) {
t.Parallel()
require := require.New(t)
mf := fs.NewMemoryFile(testData)
r := iio.NewSeekerWrapper(mf, mf.Size())
defer r.Close()
n, err := r.Seek(6, io.SeekStart)
require.NoError(err)
require.Equal(int64(6), n)
toRead := make([]byte, 5)
nn, err := r.Read(toRead)
require.NoError(err)
require.Equal(5, nn)
require.Equal("World", string(toRead))
}

27
src/log/badger.go Normal file
View file

@ -0,0 +1,27 @@
package log
import (
"strings"
"github.com/rs/zerolog"
)
type Badger struct {
L zerolog.Logger
}
func (l *Badger) Errorf(m string, f ...interface{}) {
l.L.Error().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
}
func (l *Badger) Warningf(m string, f ...interface{}) {
l.L.Warn().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
}
func (l *Badger) Infof(m string, f ...interface{}) {
l.L.Info().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
}
func (l *Badger) Debugf(m string, f ...interface{}) {
l.L.Debug().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
}

50
src/log/log.go Normal file
View file

@ -0,0 +1,50 @@
package log
import (
"io"
"os"
"path/filepath"
"git.kmsign.ru/royalcat/tstor/src/config"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/natefinch/lumberjack.v2"
)
const FileName = "tstor.log"
func Load(config *config.Log) {
var writers []io.Writer
// fix console colors on windows
cso := colorable.NewColorableStdout()
writers = append(writers, zerolog.ConsoleWriter{Out: cso})
writers = append(writers, newRollingFile(config))
mw := io.MultiWriter(writers...)
log.Logger = log.Output(mw)
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
l := zerolog.InfoLevel
if config.Debug {
l = zerolog.DebugLevel
}
zerolog.SetGlobalLevel(l)
}
func newRollingFile(config *config.Log) io.Writer {
if err := os.MkdirAll(config.Path, 0744); err != nil {
log.Error().Err(err).Str("path", config.Path).Msg("can't create log directory")
return nil
}
return &lumberjack.Logger{
Filename: filepath.Join(config.Path, FileName),
MaxBackups: config.MaxBackups, // files
MaxSize: config.MaxSize, // megabytes
MaxAge: config.MaxAge, // days
}
}

32
src/log/torrent.go Normal file
View file

@ -0,0 +1,32 @@
package log
import (
"github.com/anacrolix/log"
"github.com/rs/zerolog"
)
var _ log.Handler = &Torrent{}
type Torrent struct {
L zerolog.Logger
}
func (l *Torrent) Handle(r log.Record) {
e := l.L.Info()
switch r.Level {
case log.Debug:
e = l.L.Debug()
case log.Info:
e = l.L.Debug().Str("error-type", "info")
case log.Warning:
e = l.L.Warn()
case log.Error:
e = l.L.Warn().Str("error-type", "error")
case log.Critical:
e = l.L.Warn().Str("error-type", "critical")
}
// TODO set log values somehow
e.Msgf(r.Text())
}

View file

@ -0,0 +1,79 @@
package fuse
import (
"os"
"path/filepath"
"runtime"
"git.kmsign.ru/royalcat/tstor/src/fs"
"github.com/billziss-gh/cgofuse/fuse"
"github.com/rs/zerolog/log"
)
type Handler struct {
fuseAllowOther bool
path string
host *fuse.FileSystemHost
}
func NewHandler(fuseAllowOther bool, path string) *Handler {
return &Handler{
fuseAllowOther: fuseAllowOther,
path: path,
}
}
func (s *Handler) Mount(fss map[string]fs.Filesystem) error {
folder := s.path
// On windows, the folder must don't exist
if runtime.GOOS == "windows" {
folder = filepath.Dir(s.path)
}
if filepath.VolumeName(folder) == "" {
if err := os.MkdirAll(folder, 0744); err != nil && !os.IsExist(err) {
return err
}
}
cfs, err := fs.NewContainerFs(fss)
if err != nil {
return err
}
host := fuse.NewFileSystemHost(NewFS(cfs))
// TODO improve error handling here
go func() {
var config []string
if s.fuseAllowOther {
config = append(config, "-o", "allow_other")
}
ok := host.Mount(s.path, config)
if !ok {
log.Error().Str("path", s.path).Msg("error trying to mount filesystem")
}
}()
s.host = host
log.Info().Str("path", s.path).Msg("starting FUSE mount")
return nil
}
func (s *Handler) Unmount() {
if s.host == nil {
return
}
ok := s.host.Unmount()
if !ok {
//TODO try to force unmount if possible
log.Error().Str("path", s.path).Msg("unmount failed")
}
}

254
src/mounts/fuse/mount.go Normal file
View file

@ -0,0 +1,254 @@
package fuse
import (
"errors"
"io"
"math"
"os"
"sync"
"git.kmsign.ru/royalcat/tstor/src/fs"
"github.com/billziss-gh/cgofuse/fuse"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type FS struct {
fuse.FileSystemBase
fh *fileHandler
log zerolog.Logger
}
func NewFS(fs fs.Filesystem) fuse.FileSystemInterface {
l := log.Logger.With().Str("component", "fuse").Logger()
return &FS{
fh: &fileHandler{fs: fs},
log: l,
}
}
func (fs *FS) Open(path string, flags int) (errc int, fh uint64) {
fh, err := fs.fh.OpenHolder(path)
if os.IsNotExist(err) {
fs.log.Debug().Str("path", path).Msg("file does not exists")
return -fuse.ENOENT, fhNone
}
if err != nil {
fs.log.Error().Err(err).Str("path", path).Msg("error opening file")
return -fuse.EIO, fhNone
}
return 0, fh
}
// Unlink removes a file.
// The FileSystemBase implementation returns -ENOSYS.
func (fs *FS) Unlink(path string) int {
return -fuse.ENOSYS
}
func (fs *FS) Opendir(path string) (errc int, fh uint64) {
return fs.Open(path, 0)
}
func (fs *FS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int) {
if path == "/" {
stat.Mode = fuse.S_IFDIR | 0555
return 0
}
file, err := fs.fh.GetFile(path, fh)
if os.IsNotExist(err) {
fs.log.Debug().Str("path", path).Msg("file does not exists")
return -fuse.ENOENT
}
if err != nil {
fs.log.Error().Err(err).Str("path", path).Msg("error getting holder when reading file attributes")
return -fuse.EIO
}
if file.IsDir() {
stat.Mode = fuse.S_IFDIR | 0555
} else {
stat.Mode = fuse.S_IFREG | 0444
stat.Size = file.Size()
}
return 0
}
func (fs *FS) Read(path string, dest []byte, off int64, fh uint64) int {
file, err := fs.fh.GetFile(path, fh)
if os.IsNotExist(err) {
fs.log.Error().Err(err).Str("path", path).Msg("file not found on READ operation")
return -fuse.ENOENT
}
if err != nil {
fs.log.Error().Err(err).Str("path", path).Msg("error getting holder reading data from file")
return -fuse.EIO
}
end := int(math.Min(float64(len(dest)), float64(int64(file.Size())-off)))
if end < 0 {
end = 0
}
buf := dest[:end]
n, err := file.ReadAt(buf, off)
if err != nil && err != io.EOF {
log.Error().Err(err).Str("path", path).Msg("error reading data")
return -fuse.EIO
}
dest = buf[:n]
return n
}
func (fs *FS) Release(path string, fh uint64) int {
if err := fs.fh.Remove(fh); err != nil {
fs.log.Error().Err(err).Str("path", path).Msg("error getting holder when releasing file")
return -fuse.EIO
}
return 0
}
func (fs *FS) Releasedir(path string, fh uint64) int {
return fs.Release(path, fh)
}
func (fs *FS) Readdir(path string,
fill func(name string, stat *fuse.Stat_t, ofst int64) bool,
ofst int64,
fh uint64) (errc int) {
fill(".", nil, 0)
fill("..", nil, 0)
//TODO improve this function to make use of fh index if possible
paths, err := fs.fh.ListDir(path)
if err != nil {
fs.log.Error().Err(err).Str("path", path).Msg("error reading directory")
return -fuse.ENOSYS
}
for _, p := range paths {
if !fill(p, nil, 0) {
fs.log.Error().Str("path", path).Msg("error adding directory")
break
}
}
return 0
}
const fhNone = ^uint64(0)
var ErrHolderEmpty = errors.New("file holder is empty")
var ErrBadHolderIndex = errors.New("holder index too big")
type fileHandler struct {
mu sync.RWMutex
opened []fs.File
fs fs.Filesystem
}
func (fh *fileHandler) GetFile(path string, fhi uint64) (fs.File, error) {
fh.mu.RLock()
defer fh.mu.RUnlock()
if fhi == fhNone {
return fh.lookupFile(path)
}
return fh.get(fhi)
}
func (fh *fileHandler) ListDir(path string) ([]string, error) {
fh.mu.RLock()
defer fh.mu.RUnlock()
var out []string
files, err := fh.fs.ReadDir(path)
if err != nil {
return nil, err
}
for p := range files {
out = append(out, p)
}
return out, nil
}
func (fh *fileHandler) OpenHolder(path string) (uint64, error) {
file, err := fh.lookupFile(path)
if err != nil {
return fhNone, err
}
fh.mu.Lock()
defer fh.mu.Unlock()
for i, old := range fh.opened {
if old == nil {
fh.opened[i] = file
return uint64(i), nil
}
}
fh.opened = append(fh.opened, file)
return uint64(len(fh.opened) - 1), nil
}
func (fh *fileHandler) get(fhi uint64) (fs.File, error) {
if int(fhi) >= len(fh.opened) {
return nil, ErrBadHolderIndex
}
// TODO check opened slice to avoid panics
h := fh.opened[int(fhi)]
if h == nil {
return nil, ErrHolderEmpty
}
return h, nil
}
func (fh *fileHandler) Remove(fhi uint64) error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fhi == fhNone {
return nil
}
// TODO check opened slice to avoid panics
f := fh.opened[int(fhi)]
if f == nil {
return ErrHolderEmpty
}
if err := f.Close(); err != nil {
return err
}
fh.opened[int(fhi)] = nil
return nil
}
func (fh *fileHandler) lookupFile(path string) (fs.File, error) {
file, err := fh.fs.Open(path)
if err != nil {
return nil, err
}
if file != nil {
return file, nil
}
return nil, os.ErrNotExist
}

View file

@ -0,0 +1,68 @@
package fuse
import (
"os"
"path/filepath"
"runtime"
"testing"
"time"
"git.kmsign.ru/royalcat/tstor/src/fs"
"github.com/stretchr/testify/require"
)
func TestHandler(t *testing.T) {
if runtime.GOOS != "windows" {
t.Skip("test for windows only")
}
require := require.New(t)
p := "./testmnt"
h := NewHandler(false, p)
mem := fs.NewMemory()
err := mem.Storage.Add(fs.NewMemoryFile([]byte("test")), "/test.txt")
require.NoError(err)
err = h.Mount(map[string]fs.Filesystem{"/mem": mem})
require.NoError(err)
time.Sleep(5 * time.Second)
fi, err := os.Stat(filepath.Join(p, "mem", "test.txt"))
require.NoError(err)
require.False(fi.IsDir())
require.Equal(int64(4), fi.Size())
}
func TestHandlerDriveLetter(t *testing.T) {
if runtime.GOOS != "windows" {
t.Skip("test for windows only")
}
require := require.New(t)
p := "Z:"
h := NewHandler(false, p)
mem := fs.NewMemory()
err := mem.Storage.Add(fs.NewMemoryFile([]byte("test")), "/test.txt")
require.NoError(err)
err = h.Mount(map[string]fs.Filesystem{"/mem": mem})
require.NoError(err)
time.Sleep(5 * time.Second)
fi, err := os.Stat(filepath.Join(p, "mem", "test.txt"))
require.NoError(err)
require.False(fi.IsDir())
require.Equal(int64(4), fi.Size())
}

109
src/mounts/httpfs/httpfs.go Normal file
View file

@ -0,0 +1,109 @@
package httpfs
import (
"io"
"io/fs"
"net/http"
"os"
"sync"
dfs "git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/iio"
)
var _ http.FileSystem = &HTTPFS{}
type HTTPFS struct {
fs dfs.Filesystem
}
func NewHTTPFS(fs dfs.Filesystem) *HTTPFS {
return &HTTPFS{fs: fs}
}
func (fs *HTTPFS) Open(name string) (http.File, error) {
f, err := fs.fs.Open(name)
if err != nil {
return nil, err
}
fi := dfs.NewFileInfo(name, f.Size(), f.IsDir())
// TODO make this lazy
fis, err := fs.filesToFileInfo(name)
if err != nil {
return nil, err
}
return newHTTPFile(f, fis, fi), nil
}
func (fs *HTTPFS) filesToFileInfo(path string) ([]fs.FileInfo, error) {
files, err := fs.fs.ReadDir(path)
if err != nil {
return nil, err
}
var out []os.FileInfo
for n, f := range files {
out = append(out, dfs.NewFileInfo(n, f.Size(), f.IsDir()))
}
return out, nil
}
var _ http.File = &httpFile{}
type httpFile struct {
iio.ReaderSeeker
mu sync.Mutex
// dirPos is protected by mu.
dirPos int
dirContent []os.FileInfo
fi fs.FileInfo
}
func newHTTPFile(f dfs.File, fis []fs.FileInfo, fi fs.FileInfo) *httpFile {
return &httpFile{
dirContent: fis,
fi: fi,
ReaderSeeker: iio.NewSeekerWrapper(f, f.Size()),
}
}
func (f *httpFile) Readdir(count int) ([]fs.FileInfo, error) {
f.mu.Lock()
defer f.mu.Unlock()
if !f.fi.IsDir() {
return nil, os.ErrInvalid
}
old := f.dirPos
if old >= len(f.dirContent) {
// The os.File Readdir docs say that at the end of a directory,
// the error is io.EOF if count > 0 and nil if count <= 0.
if count > 0 {
return nil, io.EOF
}
return nil, nil
}
if count > 0 {
f.dirPos += count
if f.dirPos > len(f.dirContent) {
f.dirPos = len(f.dirContent)
}
} else {
f.dirPos = len(f.dirContent)
old = 0
}
return f.dirContent[old:f.dirPos], nil
}
func (f *httpFile) Stat() (fs.FileInfo, error) {
return f.fi, nil
}

217
src/mounts/webdav/fs.go Normal file
View file

@ -0,0 +1,217 @@
package webdav
import (
"context"
"io"
"os"
"path/filepath"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/iio"
"golang.org/x/net/webdav"
)
var _ webdav.FileSystem = &WebDAV{}
type WebDAV struct {
fs fs.Filesystem
}
func newFS(fs fs.Filesystem) *WebDAV {
return &WebDAV{fs: fs}
}
func (wd *WebDAV) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) {
p := "/" + name
// TODO handle flag and permissions
f, err := wd.lookupFile(p)
if err != nil {
return nil, err
}
wdf := newFile(filepath.Base(p), f, func() ([]os.FileInfo, error) {
return wd.listDir(p)
})
return wdf, nil
}
func (wd *WebDAV) Stat(ctx context.Context, name string) (os.FileInfo, error) {
p := "/" + name
f, err := wd.lookupFile(p)
if err != nil {
return nil, err
}
fi := newFileInfo(name, f.Size(), f.IsDir())
return fi, nil
}
func (wd *WebDAV) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
return webdav.ErrNotImplemented
}
func (wd *WebDAV) RemoveAll(ctx context.Context, name string) error {
return webdav.ErrNotImplemented
}
func (wd *WebDAV) Rename(ctx context.Context, oldName, newName string) error {
return webdav.ErrNotImplemented
}
func (wd *WebDAV) lookupFile(path string) (fs.File, error) {
return wd.fs.Open(path)
}
func (wd *WebDAV) listDir(path string) ([]os.FileInfo, error) {
files, err := wd.fs.ReadDir(path)
if err != nil {
return nil, err
}
var out []os.FileInfo
for n, f := range files {
out = append(out, newFileInfo(n, f.Size(), f.IsDir()))
}
return out, nil
}
var _ webdav.File = &webDAVFile{}
type webDAVFile struct {
iio.Reader
fi os.FileInfo
mudp sync.Mutex
dirPos int
mup sync.Mutex
pos int64
dirFunc func() ([]os.FileInfo, error)
dirContent []os.FileInfo
}
func newFile(name string, f fs.File, df func() ([]os.FileInfo, error)) *webDAVFile {
return &webDAVFile{
fi: newFileInfo(name, f.Size(), f.IsDir()),
dirFunc: df,
Reader: f,
}
}
func (wdf *webDAVFile) Readdir(count int) ([]os.FileInfo, error) {
wdf.mudp.Lock()
defer wdf.mudp.Unlock()
if !wdf.fi.IsDir() {
return nil, os.ErrInvalid
}
if wdf.dirContent == nil {
dc, err := wdf.dirFunc()
if err != nil {
return nil, err
}
wdf.dirContent = dc
}
old := wdf.dirPos
if old >= len(wdf.dirContent) {
// The os.File Readdir docs say that at the end of a directory,
// the error is io.EOF if count > 0 and nil if count <= 0.
if count > 0 {
return nil, io.EOF
}
return nil, nil
}
if count > 0 {
wdf.dirPos += count
if wdf.dirPos > len(wdf.dirContent) {
wdf.dirPos = len(wdf.dirContent)
}
} else {
wdf.dirPos = len(wdf.dirContent)
old = 0
}
return wdf.dirContent[old:wdf.dirPos], nil
}
func (wdf *webDAVFile) Stat() (os.FileInfo, error) {
return wdf.fi, nil
}
func (wdf *webDAVFile) Read(p []byte) (int, error) {
wdf.mup.Lock()
defer wdf.mup.Unlock()
n, err := wdf.Reader.ReadAt(p, wdf.pos)
wdf.pos += int64(n)
return n, err
}
func (wdf *webDAVFile) Seek(offset int64, whence int) (int64, error) {
wdf.mup.Lock()
defer wdf.mup.Unlock()
switch whence {
case io.SeekStart:
wdf.pos = offset
case io.SeekCurrent:
wdf.pos = wdf.pos + offset
case io.SeekEnd:
wdf.pos = wdf.fi.Size() + offset
}
return wdf.pos, nil
}
func (wdf *webDAVFile) Write(p []byte) (n int, err error) {
return 0, webdav.ErrNotImplemented
}
type webDAVFileInfo struct {
name string
size int64
isDir bool
}
func newFileInfo(name string, size int64, isDir bool) *webDAVFileInfo {
return &webDAVFileInfo{
name: name,
size: size,
isDir: isDir,
}
}
func (wdfi *webDAVFileInfo) Name() string {
return wdfi.name
}
func (wdfi *webDAVFileInfo) Size() int64 {
return wdfi.size
}
func (wdfi *webDAVFileInfo) Mode() os.FileMode {
if wdfi.isDir {
return 0555 | os.ModeDir
}
return 0555
}
func (wdfi *webDAVFileInfo) ModTime() time.Time {
// TODO fix it
return time.Now()
}
func (wdfi *webDAVFileInfo) IsDir() bool {
return wdfi.isDir
}
func (wdfi *webDAVFileInfo) Sys() interface{} {
return nil
}

View file

@ -0,0 +1,80 @@
package webdav
import (
"context"
"io"
"os"
"testing"
"git.kmsign.ru/royalcat/tstor/src/fs"
"github.com/stretchr/testify/require"
"golang.org/x/net/webdav"
)
func TestWebDAVFilesystem(t *testing.T) {
t.Parallel()
require := require.New(t)
mfs := fs.NewMemory()
mf := fs.NewMemoryFile([]byte("test file content."))
err := mfs.Storage.Add(mf, "/folder/file.txt")
require.NoError(err)
wfs := newFS(mfs)
dir, err := wfs.OpenFile(context.Background(), "/", 0, 0)
require.NoError(err)
fi, err := dir.Readdir(0)
require.NoError(err)
require.Len(fi, 1)
require.Equal("folder", fi[0].Name())
file, err := wfs.OpenFile(context.Background(), "/folder/file.txt", 0, 0)
require.NoError(err)
_, err = file.Readdir(0)
require.ErrorIs(err, os.ErrInvalid)
n, err := file.Seek(5, io.SeekStart)
require.NoError(err)
require.Equal(int64(5), n)
br := make([]byte, 4)
nn, err := file.Read(br)
require.NoError(err)
require.Equal(4, nn)
require.Equal([]byte("file"), br)
n, err = file.Seek(0, io.SeekStart)
require.NoError(err)
require.Equal(int64(0), n)
nn, err = file.Read(br)
require.NoError(err)
require.Equal(4, nn)
require.Equal([]byte("test"), br)
fInfo, err := wfs.Stat(context.Background(), "/folder/file.txt")
require.NoError(err)
require.Equal("/folder/file.txt", fInfo.Name())
require.Equal(false, fInfo.IsDir())
require.Equal(int64(18), fInfo.Size())
}
func TestErrNotImplemented(t *testing.T) {
t.Parallel()
require := require.New(t)
mfs := fs.NewMemory()
mf := fs.NewMemoryFile([]byte("test file content."))
err := mfs.Storage.Add(mf, "/folder/file.txt")
require.NoError(err)
wfs := newFS(mfs)
require.ErrorIs(wfs.Mkdir(context.Background(), "test", 0), webdav.ErrNotImplemented)
require.ErrorIs(wfs.RemoveAll(context.Background(), "test"), webdav.ErrNotImplemented)
require.ErrorIs(wfs.Rename(context.Background(), "test", "newTest"), webdav.ErrNotImplemented)
}

View file

@ -0,0 +1,23 @@
package webdav
import (
"net/http"
"git.kmsign.ru/royalcat/tstor/src/fs"
"github.com/rs/zerolog/log"
"golang.org/x/net/webdav"
)
func newHandler(fs fs.Filesystem) *webdav.Handler {
l := log.Logger.With().Str("component", "webDAV").Logger()
return &webdav.Handler{
Prefix: "/",
FileSystem: newFS(fs),
LockSystem: webdav.NewMemLS(),
Logger: func(req *http.Request, err error) {
if err != nil {
l.Error().Err(err).Str("path", req.RequestURI).Msg("webDAV error")
}
},
}
}

29
src/mounts/webdav/http.go Normal file
View file

@ -0,0 +1,29 @@
package webdav
import (
"fmt"
"net/http"
"git.kmsign.ru/royalcat/tstor/src/fs"
"github.com/rs/zerolog/log"
)
func NewWebDAVServer(fs fs.Filesystem, port int, user, pass string) error {
log.Info().Str("host", fmt.Sprintf("0.0.0.0:%d", port)).Msg("starting webDAV server")
srv := newHandler(fs)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
username, password, _ := r.BasicAuth()
if username == user && password == pass {
srv.ServeHTTP(w, r)
return
}
w.Header().Set("WWW-Authenticate", `Basic realm="BASIC WebDAV REALM"`)
w.WriteHeader(401)
w.Write([]byte("401 Unauthorized\n"))
})
return http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", port), nil)
}

38
src/torrent/client.go Normal file
View file

@ -0,0 +1,38 @@
package torrent
import (
"time"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/bep44"
tlog "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/storage"
"github.com/rs/zerolog/log"
"git.kmsign.ru/royalcat/tstor/src/config"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
)
func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) {
// TODO download and upload limits
torrentCfg := torrent.NewDefaultClientConfig()
torrentCfg.Seed = true
torrentCfg.PeerID = string(id[:])
torrentCfg.DefaultStorage = st
torrentCfg.DisableIPv6 = cfg.DisableIPv6
l := log.Logger.With().Str("component", "torrent-client").Logger()
tl := tlog.NewLogger()
tl.SetHandlers(&dlog.Torrent{L: l})
torrentCfg.Logger = tl
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
cfg.Store = fis
cfg.Exp = 2 * time.Hour
cfg.NoSecurity = false
}
return torrent.NewClient(torrentCfg)
}

30
src/torrent/id.go Normal file
View file

@ -0,0 +1,30 @@
package torrent
import (
"crypto/rand"
"os"
)
var emptyBytes [20]byte
func GetOrCreatePeerID(p string) ([20]byte, error) {
idb, err := os.ReadFile(p)
if err == nil {
var out [20]byte
copy(out[:], idb)
return out, nil
}
if !os.IsNotExist(err) {
return emptyBytes, err
}
var out [20]byte
_, err = rand.Read(out[:])
if err != nil {
return emptyBytes, err
}
return out, os.WriteFile(p, out[:], 0755)
}

View file

@ -0,0 +1,45 @@
package loader
import "git.kmsign.ru/royalcat/tstor/src/config"
var _ Loader = &Config{}
type Config struct {
c []config.Route
}
func NewConfig(r []config.Route) *Config {
return &Config{
c: r,
}
}
func (l *Config) ListMagnets() (map[string][]string, error) {
out := make(map[string][]string)
for _, r := range l.c {
for _, t := range r.Torrents {
if t.MagnetURI == "" {
continue
}
out[r.Name] = append(out[r.Name], t.MagnetURI)
}
}
return out, nil
}
func (l *Config) ListTorrentPaths() (map[string][]string, error) {
out := make(map[string][]string)
for _, r := range l.c {
for _, t := range r.Torrents {
if t.TorrentPath == "" {
continue
}
out[r.Name] = append(out[r.Name], t.TorrentPath)
}
}
return out, nil
}

112
src/torrent/loader/db.go Normal file
View file

@ -0,0 +1,112 @@
package loader
import (
"path"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/metainfo"
"github.com/dgraph-io/badger/v3"
"github.com/rs/zerolog/log"
)
var _ LoaderAdder = &DB{}
const routeRootKey = "/route/"
type DB struct {
db *badger.DB
}
func NewDB(path string) (*DB, error) {
l := log.Logger.With().Str("component", "torrent-store").Logger()
opts := badger.DefaultOptions(path).
WithLogger(&dlog.Badger{L: l}).
WithValueLogFileSize(1<<26 - 1)
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
err = db.RunValueLogGC(0.5)
if err != nil && err != badger.ErrNoRewrite {
return nil, err
}
return &DB{
db: db,
}, nil
}
func (l *DB) AddMagnet(r, m string) error {
err := l.db.Update(func(txn *badger.Txn) error {
spec, err := metainfo.ParseMagnetUri(m)
if err != nil {
return err
}
ih := spec.InfoHash.HexString()
rp := path.Join(routeRootKey, ih, r)
return txn.Set([]byte(rp), []byte(m))
})
if err != nil {
return err
}
return l.db.Sync()
}
func (l *DB) RemoveFromHash(r, h string) (bool, error) {
tx := l.db.NewTransaction(true)
defer tx.Discard()
var mh metainfo.Hash
if err := mh.FromHexString(h); err != nil {
return false, err
}
rp := path.Join(routeRootKey, h, r)
if _, err := tx.Get([]byte(rp)); err != nil {
return false, nil
}
if err := tx.Delete([]byte(rp)); err != nil {
return false, err
}
return true, tx.Commit()
}
func (l *DB) ListMagnets() (map[string][]string, error) {
tx := l.db.NewTransaction(false)
defer tx.Discard()
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte(routeRootKey)
out := make(map[string][]string)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
_, r := path.Split(string(it.Item().Key()))
i := it.Item()
if err := i.Value(func(v []byte) error {
out[r] = append(out[r], string(v))
return nil
}); err != nil {
return nil, err
}
}
return out, nil
}
func (l *DB) ListTorrentPaths() (map[string][]string, error) {
return nil, nil
}
func (l *DB) Close() error {
return l.db.Close()
}

View file

@ -0,0 +1,62 @@
package loader
import (
"os"
"testing"
"github.com/anacrolix/torrent/storage"
"github.com/stretchr/testify/require"
)
const m1 = "magnet:?xt=urn:btih:c9e15763f722f23e98a29decdfae341b98d53056"
func TestDB(t *testing.T) {
require := require.New(t)
tmpService, err := os.MkdirTemp("", "service")
require.NoError(err)
tmpStorage, err := os.MkdirTemp("", "storage")
require.NoError(err)
cs := storage.NewFile(tmpStorage)
defer cs.Close()
s, err := NewDB(tmpService)
require.NoError(err)
defer s.Close()
err = s.AddMagnet("route1", "WRONG MAGNET")
require.Error(err)
err = s.AddMagnet("route1", m1)
require.NoError(err)
err = s.AddMagnet("route2", m1)
require.NoError(err)
l, err := s.ListMagnets()
require.NoError(err)
require.Len(l, 2)
require.Len(l["route1"], 1)
require.Equal(l["route1"][0], m1)
require.Len(l["route2"], 1)
require.Equal(l["route2"][0], m1)
removed, err := s.RemoveFromHash("other", "c9e15763f722f23e98a29decdfae341b98d53056")
require.NoError(err)
require.False(removed)
removed, err = s.RemoveFromHash("route1", "c9e15763f722f23e98a29decdfae341b98d53056")
require.NoError(err)
require.True(removed)
l, err = s.ListMagnets()
require.NoError(err)
require.Len(l, 1)
require.Len(l["route2"], 1)
require.Equal(l["route2"][0], m1)
require.NoError(s.Close())
require.NoError(cs.Close())
}

View file

@ -0,0 +1,55 @@
package loader
import (
"io/fs"
"path"
"path/filepath"
"git.kmsign.ru/royalcat/tstor/src/config"
)
var _ Loader = &Folder{}
type Folder struct {
c []config.Route
}
func NewFolder(r []config.Route) *Folder {
return &Folder{
c: r,
}
}
func (f *Folder) ListMagnets() (map[string][]string, error) {
return nil, nil
}
func (f *Folder) ListTorrentPaths() (map[string][]string, error) {
out := make(map[string][]string)
for _, r := range f.c {
if r.TorrentFolder == "" {
continue
}
err := filepath.WalkDir(r.TorrentFolder, func(p string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if path.Ext(p) == ".torrent" {
out[r.Name] = append(out[r.Name], p)
}
return nil
})
if err != nil {
return nil, err
}
}
return out, nil
}

View file

@ -0,0 +1,13 @@
package loader
type Loader interface {
ListMagnets() (map[string][]string, error)
ListTorrentPaths() (map[string][]string, error)
}
type LoaderAdder interface {
Loader
RemoveFromHash(r, h string) (bool, error)
AddMagnet(r, m string) error
}

255
src/torrent/server.go Normal file
View file

@ -0,0 +1,255 @@
package torrent
import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"git.kmsign.ru/royalcat/tstor/src/config"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type ServerState int
const (
UNKNOWN ServerState = iota
SEEDING
READING
UPDATING
STOPPED
ERROR
)
func (ss ServerState) String() string {
return [...]string{"Unknown", "Seeding", "Reading", "Updating", "Stopped", "Error"}[ss]
}
type ServerInfo struct {
Magnet string `json:"magnetUri"`
UpdatedAt int64 `json:"updatedAt"`
Name string `json:"name"`
Folder string `json:"folder"`
State string `json:"state"`
Peers int `json:"peers"`
Seeds int `json:"seeds"`
}
type Server struct {
cfg *config.Server
log zerolog.Logger
fw *fsnotify.Watcher
eventsCount uint64
c *torrent.Client
pc storage.PieceCompletion
mu sync.RWMutex
t *torrent.Torrent
si ServerInfo
}
func NewServer(c *torrent.Client, pc storage.PieceCompletion, cfg *config.Server) *Server {
l := log.Logger.With().Str("component", "server").Str("name", cfg.Name).Logger()
return &Server{
cfg: cfg,
log: l,
c: c,
pc: pc,
}
}
func (s *Server) Start() error {
s.log.Info().Msg("starting new server folder")
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
if err := os.MkdirAll(s.cfg.Path, 0744); err != nil {
return fmt.Errorf("error creating server folder: %s. Error: %w", s.cfg.Path, err)
}
if err := filepath.Walk(s.cfg.Path,
func(path string, info os.FileInfo, err error) error {
if info.Mode().IsDir() {
s.log.Debug().Str("folder", path).Msg("adding new folder")
return w.Add(path)
}
return nil
}); err != nil {
return err
}
s.fw = w
go func() {
if err := s.makeMagnet(); err != nil {
s.updateState(ERROR)
s.log.Error().Err(err).Msg("error generating magnet on start")
}
s.watch()
}()
go func() {
for {
select {
case event, ok := <-w.Events:
if !ok {
return
}
s.log.Info().Str("file", event.Name).Str("op", event.Op.String()).Msg("file changed inside server folder")
atomic.AddUint64(&s.eventsCount, 1)
case err, ok := <-w.Errors:
if !ok {
return
}
s.updateState(STOPPED)
s.log.Error().Err(err).Msg("error watching server folder")
}
}
}()
s.log.Info().Msg("server folder started")
return nil
}
func (s *Server) watch() {
s.log.Info().Msg("starting watcher")
for range time.Tick(time.Second * 5) {
if s.eventsCount == 0 {
continue
}
ec := s.eventsCount
if err := s.makeMagnet(); err != nil {
s.updateState(ERROR)
s.log.Error().Err(err).Msg("error generating magnet")
}
atomic.AddUint64(&s.eventsCount, -ec)
}
}
func (s *Server) makeMagnet() error {
s.log.Info().Msg("starting serving new torrent")
info := metainfo.Info{
PieceLength: 2 << 18,
}
s.updateState(READING)
if err := info.BuildFromFilePath(s.cfg.Path); err != nil {
return err
}
s.updateState(UPDATING)
if len(info.Files) == 0 {
s.mu.Lock()
s.si.Magnet = ""
s.si.Folder = s.cfg.Path
s.si.Name = s.cfg.Name
s.si.UpdatedAt = time.Now().Unix()
s.mu.Unlock()
s.log.Info().Msg("not creating magnet from empty folder")
s.updateState(STOPPED)
return nil
}
mi := metainfo.MetaInfo{
InfoBytes: bencode.MustMarshal(info),
}
ih := mi.HashInfoBytes()
to, _ := s.c.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: ih,
Storage: storage.NewFileOpts(storage.NewFileClientOpts{
ClientBaseDir: s.cfg.Path,
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
return filepath.Join(opts.File.Path...)
},
TorrentDirMaker: nil,
PieceCompletion: s.pc,
}),
})
tks := s.trackers()
err := to.MergeSpec(&torrent.TorrentSpec{
InfoBytes: mi.InfoBytes,
Trackers: [][]string{tks},
})
if err != nil {
return err
}
m := metainfo.Magnet{
InfoHash: ih,
DisplayName: s.cfg.Name,
Trackers: tks,
}
s.mu.Lock()
s.t = to
s.si.Magnet = m.String()
s.si.Folder = s.cfg.Path
s.si.Name = s.cfg.Name
s.si.UpdatedAt = time.Now().Unix()
s.mu.Unlock()
s.updateState(SEEDING)
s.log.Info().Str("hash", ih.HexString()).Msg("new torrent is ready")
return nil
}
func (s *Server) updateState(ss ServerState) {
s.mu.Lock()
s.si.State = ss.String()
s.mu.Unlock()
}
func (s *Server) trackers() []string {
// TODO load trackers from URL too
return s.cfg.Trackers
}
func (s *Server) Close() error {
if s.fw == nil {
return nil
}
return s.fw.Close()
}
func (s *Server) Info() *ServerInfo {
s.mu.RLock()
defer s.mu.RUnlock()
if s.t != nil {
st := s.t.Stats()
s.si.Peers = st.TotalPeers
s.si.Seeds = st.ConnectedSeeders
}
return &s.si
}

203
src/torrent/service.go Normal file
View file

@ -0,0 +1,203 @@
package torrent
import (
"errors"
"fmt"
"path"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/src/fs"
"git.kmsign.ru/royalcat/tstor/src/torrent/loader"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type Service struct {
c *torrent.Client
s *Stats
mu sync.Mutex
fss map[string]fs.Filesystem
loaders []loader.Loader
db loader.LoaderAdder
log zerolog.Logger
addTimeout, readTimeout int
}
func NewService(loaders []loader.Loader, db loader.LoaderAdder, stats *Stats, c *torrent.Client, addTimeout, readTimeout int) *Service {
l := log.Logger.With().Str("component", "torrent-service").Logger()
return &Service{
log: l,
s: stats,
c: c,
fss: make(map[string]fs.Filesystem),
loaders: loaders,
db: db,
addTimeout: addTimeout,
readTimeout: readTimeout,
}
}
func (s *Service) Load() (map[string]fs.Filesystem, error) {
// Load from config
s.log.Info().Msg("adding torrents from configuration")
for _, loader := range s.loaders {
if err := s.load(loader); err != nil {
return nil, err
}
}
// Load from DB
s.log.Info().Msg("adding torrents from database")
return s.fss, s.load(s.db)
}
func (s *Service) load(l loader.Loader) error {
list, err := l.ListMagnets()
if err != nil {
return err
}
for r, ms := range list {
s.addRoute(r)
for _, m := range ms {
if err := s.addMagnet(r, m); err != nil {
return err
}
}
}
list, err = l.ListTorrentPaths()
if err != nil {
return err
}
for r, ms := range list {
s.addRoute(r)
for _, p := range ms {
if err := s.addTorrentPath(r, p); err != nil {
return err
}
}
}
return nil
}
func (s *Service) AddMagnet(r, m string) error {
if err := s.addMagnet(r, m); err != nil {
return err
}
// Add to db
return s.db.AddMagnet(r, m)
}
func (s *Service) addTorrentPath(r, p string) error {
// Add to client
t, err := s.c.AddTorrentFromFile(p)
if err != nil {
return err
}
return s.addTorrent(r, t)
}
func (s *Service) addMagnet(r, m string) error {
// Add to client
t, err := s.c.AddMagnet(m)
if err != nil {
return err
}
return s.addTorrent(r, t)
}
func (s *Service) addRoute(r string) {
s.s.AddRoute(r)
// Add to filesystems
folder := path.Join("/", r)
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.fss[folder]
if !ok {
s.fss[folder] = fs.NewTorrent(s.readTimeout)
}
}
func (s *Service) addTorrent(r string, t *torrent.Torrent) error {
// only get info if name is not available
if t.Info() == nil {
s.log.Info().Str("hash", t.InfoHash().String()).Msg("getting torrent info")
select {
case <-time.After(time.Duration(s.addTimeout) * time.Second):
s.log.Error().Str("hash", t.InfoHash().String()).Msg("timeout getting torrent info")
return errors.New("timeout getting torrent info")
case <-t.GotInfo():
s.log.Info().Str("hash", t.InfoHash().String()).Msg("obtained torrent info")
}
}
// Add to stats
s.s.Add(r, t)
// Add to filesystems
folder := path.Join("/", r)
s.mu.Lock()
defer s.mu.Unlock()
tfs, ok := s.fss[folder].(*fs.Torrent)
if !ok {
return errors.New("error adding torrent to filesystem")
}
tfs.AddTorrent(t)
s.log.Info().Str("name", t.Info().Name).Str("route", r).Msg("torrent added")
return nil
}
func (s *Service) RemoveFromHash(r, h string) error {
// Remove from db
deleted, err := s.db.RemoveFromHash(r, h)
if err != nil {
return err
}
if !deleted {
return fmt.Errorf("element with hash %v on route %v cannot be removed", h, r)
}
// Remove from stats
s.s.Del(r, h)
// Remove from fs
folder := path.Join("/", r)
tfs, ok := s.fss[folder].(*fs.Torrent)
if !ok {
return errors.New("error removing torrent from filesystem")
}
tfs.RemoveTorrent(h)
// Remove from client
var mh metainfo.Hash
if err := mh.FromHexString(h); err != nil {
return err
}
t, ok := s.c.Torrent(metainfo.NewHashFromHex(h))
if ok {
t.Drop()
}
return nil
}

268
src/torrent/stats.go Normal file
View file

@ -0,0 +1,268 @@
package torrent
import (
"errors"
"sort"
"sync"
"time"
"github.com/anacrolix/torrent"
)
var ErrTorrentNotFound = errors.New("torrent not found")
type PieceStatus string
const (
Checking PieceStatus = "H"
Partial PieceStatus = "P"
Complete PieceStatus = "C"
Waiting PieceStatus = "W"
Error PieceStatus = "?"
)
type PieceChunk struct {
Status PieceStatus `json:"status"`
NumPieces int `json:"numPieces"`
}
type TorrentStats struct {
Name string `json:"name"`
Hash string `json:"hash"`
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
Peers int `json:"peers"`
Seeders int `json:"seeders"`
TimePassed float64 `json:"timePassed"`
PieceChunks []*PieceChunk `json:"pieceChunks"`
TotalPieces int `json:"totalPieces"`
PieceSize int64 `json:"pieceSize"`
}
type byName []*TorrentStats
func (a byName) Len() int { return len(a) }
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type GlobalTorrentStats struct {
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
TimePassed float64 `json:"timePassed"`
}
type RouteStats struct {
Name string `json:"name"`
TorrentStats []*TorrentStats `json:"torrentStats"`
}
type ByName []*RouteStats
func (a ByName) Len() int { return len(a) }
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type stat struct {
totalDownloadBytes int64
downloadBytes int64
totalUploadBytes int64
uploadBytes int64
peers int
seeders int
time time.Time
}
type Stats struct {
mut sync.Mutex
torrents map[string]*torrent.Torrent
torrentsByRoute map[string]map[string]*torrent.Torrent
previousStats map[string]*stat
gTime time.Time
}
func NewStats() *Stats {
return &Stats{
gTime: time.Now(),
torrents: make(map[string]*torrent.Torrent),
torrentsByRoute: make(map[string]map[string]*torrent.Torrent),
previousStats: make(map[string]*stat),
}
}
func (s *Stats) AddRoute(route string) {
_, ok := s.torrentsByRoute[route]
if !ok {
s.torrentsByRoute[route] = make(map[string]*torrent.Torrent)
}
}
func (s *Stats) Add(route string, t *torrent.Torrent) {
s.mut.Lock()
defer s.mut.Unlock()
h := t.InfoHash().String()
s.torrents[h] = t
s.previousStats[h] = &stat{}
_, ok := s.torrentsByRoute[route]
if !ok {
s.torrentsByRoute[route] = make(map[string]*torrent.Torrent)
}
s.torrentsByRoute[route][h] = t
}
func (s *Stats) Del(route, hash string) {
s.mut.Lock()
defer s.mut.Unlock()
delete(s.torrents, hash)
delete(s.previousStats, hash)
ts, ok := s.torrentsByRoute[route]
if !ok {
return
}
delete(ts, hash)
}
func (s *Stats) Stats(hash string) (*TorrentStats, error) {
s.mut.Lock()
defer s.mut.Unlock()
t, ok := s.torrents[hash]
if !(ok) {
return nil, ErrTorrentNotFound
}
now := time.Now()
return s.stats(now, t, true), nil
}
func (s *Stats) RoutesStats() []*RouteStats {
s.mut.Lock()
defer s.mut.Unlock()
now := time.Now()
var out []*RouteStats
for r, tl := range s.torrentsByRoute {
var tStats []*TorrentStats
for _, t := range tl {
ts := s.stats(now, t, true)
tStats = append(tStats, ts)
}
sort.Sort(byName(tStats))
rs := &RouteStats{
Name: r,
TorrentStats: tStats,
}
out = append(out, rs)
}
return out
}
func (s *Stats) GlobalStats() *GlobalTorrentStats {
s.mut.Lock()
defer s.mut.Unlock()
now := time.Now()
var totalDownload int64
var totalUpload int64
for _, torrent := range s.torrents {
tStats := s.stats(now, torrent, false)
totalDownload += tStats.DownloadedBytes
totalUpload += tStats.UploadedBytes
}
timePassed := now.Sub(s.gTime)
s.gTime = now
return &GlobalTorrentStats{
DownloadedBytes: totalDownload,
UploadedBytes: totalUpload,
TimePassed: timePassed.Seconds(),
}
}
func (s *Stats) stats(now time.Time, t *torrent.Torrent, chunks bool) *TorrentStats {
ts := &TorrentStats{}
prev, ok := s.previousStats[t.InfoHash().String()]
if !ok {
return &TorrentStats{}
}
if s.returnPreviousMeasurements(now) {
ts.DownloadedBytes = prev.downloadBytes
ts.UploadedBytes = prev.uploadBytes
} else {
st := t.Stats()
rd := st.BytesReadData.Int64()
wd := st.BytesWrittenData.Int64()
ist := &stat{
downloadBytes: rd - prev.totalDownloadBytes,
uploadBytes: wd - prev.totalUploadBytes,
totalDownloadBytes: rd,
totalUploadBytes: wd,
time: now,
peers: st.TotalPeers,
seeders: st.ConnectedSeeders,
}
ts.DownloadedBytes = ist.downloadBytes
ts.UploadedBytes = ist.uploadBytes
ts.Peers = ist.peers
ts.Seeders = ist.seeders
s.previousStats[t.InfoHash().String()] = ist
}
ts.TimePassed = now.Sub(prev.time).Seconds()
var totalPieces int
if chunks {
var pch []*PieceChunk
for _, psr := range t.PieceStateRuns() {
var s PieceStatus
switch {
case psr.Checking:
s = Checking
case psr.Partial:
s = Partial
case psr.Complete:
s = Complete
case !psr.Ok:
s = Error
default:
s = Waiting
}
pch = append(pch, &PieceChunk{
Status: s,
NumPieces: psr.Length,
})
totalPieces += psr.Length
}
ts.PieceChunks = pch
}
ts.Hash = t.InfoHash().String()
ts.Name = t.Name()
ts.TotalPieces = totalPieces
if t.Info() != nil {
ts.PieceSize = t.Info().PieceLength
}
return ts
}
const gap time.Duration = 2 * time.Second
func (s *Stats) returnPreviousMeasurements(now time.Time) bool {
return now.Sub(s.gTime) < gap
}

97
src/torrent/store.go Normal file
View file

@ -0,0 +1,97 @@
package torrent
import (
"bytes"
"encoding/gob"
"time"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/dht/v2/bep44"
"github.com/dgraph-io/badger/v4"
"github.com/rs/zerolog/log"
)
var _ bep44.Store = &FileItemStore{}
type FileItemStore struct {
ttl time.Duration
db *badger.DB
}
func NewFileItemStore(path string, itemsTTL time.Duration) (*FileItemStore, error) {
l := log.Logger.With().Str("component", "item-store").Logger()
opts := badger.DefaultOptions(path).
WithLogger(&dlog.Badger{L: l}).
WithValueLogFileSize(1<<26 - 1)
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
err = db.RunValueLogGC(0.5)
if err != nil && err != badger.ErrNoRewrite {
return nil, err
}
return &FileItemStore{
db: db,
ttl: itemsTTL,
}, nil
}
func (fis *FileItemStore) Put(i *bep44.Item) error {
tx := fis.db.NewTransaction(true)
defer tx.Discard()
key := i.Target()
var value bytes.Buffer
enc := gob.NewEncoder(&value)
if err := enc.Encode(i); err != nil {
return err
}
e := badger.NewEntry(key[:], value.Bytes()).WithTTL(fis.ttl)
if err := tx.SetEntry(e); err != nil {
return err
}
return tx.Commit()
}
func (fis *FileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
tx := fis.db.NewTransaction(false)
defer tx.Discard()
dbi, err := tx.Get(t[:])
if err == badger.ErrKeyNotFound {
return nil, bep44.ErrItemNotFound
}
if err != nil {
return nil, err
}
valb, err := dbi.ValueCopy(nil)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(valb)
dec := gob.NewDecoder(buf)
var i *bep44.Item
if err := dec.Decode(&i); err != nil {
return nil, err
}
return i, nil
}
func (fis *FileItemStore) Del(t bep44.Target) error {
// ignore this
return nil
}
func (fis *FileItemStore) Close() error {
return fis.db.Close()
}