parent
a89b9e7303
commit
832f5b9710
33 changed files with 900 additions and 1086 deletions
daemons
123
daemons/archive/archive.go
Normal file
123
daemons/archive/archive.go
Normal file
|
@ -0,0 +1,123 @@
|
|||
package archive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/fs"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
)
|
||||
|
||||
var ArchiveFactories = map[string]vfs.FsFactory{
|
||||
".zip": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) {
|
||||
stat, err := f.Info()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), ZipLoader)
|
||||
},
|
||||
".rar": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) {
|
||||
stat, err := f.Info()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), RarLoader)
|
||||
},
|
||||
".7z": func(ctx context.Context, sourcePath string, f vfs.File) (vfs.Filesystem, error) {
|
||||
stat, err := f.Info()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewArchive(ctx, sourcePath, stat.Name(), f, stat.Size(), SevenZipLoader)
|
||||
},
|
||||
}
|
||||
|
||||
type archiveLoader func(ctx context.Context, archivePath string, r vfs.File, size int64) (map[string]fileEntry, error)
|
||||
|
||||
var _ vfs.Filesystem = &ArchiveFS{}
|
||||
|
||||
type fileEntry struct {
|
||||
fs.FileInfo
|
||||
open func(ctx context.Context) (vfs.File, error)
|
||||
}
|
||||
|
||||
type ArchiveFS struct {
|
||||
name string
|
||||
size int64
|
||||
files map[string]fileEntry
|
||||
}
|
||||
|
||||
// Rename implements Filesystem.
|
||||
func (a *ArchiveFS) Rename(ctx context.Context, oldpath string, newpath string) error {
|
||||
return vfs.ErrNotImplemented
|
||||
}
|
||||
|
||||
// ModTime implements Filesystem.
|
||||
func (a *ArchiveFS) ModTime() time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
// Mode implements Filesystem.
|
||||
func (a *ArchiveFS) Mode() fs.FileMode {
|
||||
return fs.ModeDir
|
||||
}
|
||||
|
||||
// Size implements Filesystem.
|
||||
func (a *ArchiveFS) Size() int64 {
|
||||
return int64(a.size)
|
||||
}
|
||||
|
||||
// Sys implements Filesystem.
|
||||
func (a *ArchiveFS) Sys() any {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FsName implements Filesystem.
|
||||
func (a *ArchiveFS) FsName() string {
|
||||
return "archivefs"
|
||||
}
|
||||
|
||||
func NewArchive(ctx context.Context, archivePath, name string, f vfs.File, size int64, loader archiveLoader) (*ArchiveFS, error) {
|
||||
archiveFiles, err := loader(ctx, archivePath, f, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO make optional
|
||||
singleDir := true
|
||||
for k := range archiveFiles {
|
||||
if !strings.HasPrefix(k, "/"+name+"/") {
|
||||
singleDir = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
files := make(map[string]fileEntry, len(archiveFiles))
|
||||
for k, v := range archiveFiles {
|
||||
// TODO make optional
|
||||
if strings.Contains(k, "/__MACOSX/") {
|
||||
continue
|
||||
}
|
||||
|
||||
if singleDir {
|
||||
k, _ = strings.CutPrefix(k, "/"+name)
|
||||
}
|
||||
|
||||
files[k] = v
|
||||
}
|
||||
|
||||
// FIXME configurable
|
||||
files["/.forcegallery"] = fileEntry{
|
||||
FileInfo: vfs.NewFileInfo("/.forcegallery", 0, time.Time{}),
|
||||
open: func(ctx context.Context) (vfs.File, error) {
|
||||
return vfs.NewMemoryFile(".forcegallery", []byte{}), nil
|
||||
},
|
||||
}
|
||||
|
||||
return &ArchiveFS{
|
||||
name: name,
|
||||
size: size,
|
||||
files: files,
|
||||
}, nil
|
||||
}
|
183
daemons/archive/archive_cache.go
Normal file
183
daemons/archive/archive_cache.go
Normal file
|
@ -0,0 +1,183 @@
|
|||
package archive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/hashicorp/golang-lru/arc/v2"
|
||||
"github.com/royalcat/ctxio"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// TODO переделать кеш в демон
|
||||
|
||||
const blockSize = 1024 * 16 // 16KB
|
||||
const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage
|
||||
const defaultBlockCount = cacheSize / blockSize
|
||||
|
||||
type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error)
|
||||
|
||||
type archiveFileIndex struct {
|
||||
archiveHash vfs.Hash
|
||||
filename string
|
||||
}
|
||||
|
||||
type blockIndex struct {
|
||||
index archiveFileIndex
|
||||
off int64
|
||||
}
|
||||
|
||||
type block struct {
|
||||
data [blockSize]byte
|
||||
len int
|
||||
}
|
||||
|
||||
var blockCache *arc.ARCCache[blockIndex, block]
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
blockCache, err = arc.NewARC[blockIndex, block](defaultBlockCount)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func newRandomReaderFromLinear(index archiveFileIndex, size int64, readerFactory archiveFileReaderFactory) *randomReaderFromLinear {
|
||||
return &randomReaderFromLinear{
|
||||
index: index,
|
||||
size: size,
|
||||
readerFactory: readerFactory,
|
||||
}
|
||||
}
|
||||
|
||||
type randomReaderFromLinear struct {
|
||||
index archiveFileIndex
|
||||
readerFactory archiveFileReaderFactory
|
||||
reader ctxio.ReadCloser
|
||||
readen int64
|
||||
readerMutex sync.Mutex
|
||||
size int64
|
||||
closed bool
|
||||
}
|
||||
|
||||
var _ ctxio.ReaderAt = (*randomReaderFromLinear)(nil)
|
||||
var _ ctxio.Closer = (*randomReaderFromLinear)(nil)
|
||||
|
||||
// ReadAt implements ctxio.ReaderAt.
|
||||
func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
ctx, span := tracer.Start(ctx, "archive.RandomReader.ReadAt")
|
||||
defer span.End()
|
||||
|
||||
if a.closed {
|
||||
return 0, errors.New("reader is closed")
|
||||
}
|
||||
|
||||
if off >= a.size {
|
||||
return 0, ctxio.EOF
|
||||
}
|
||||
|
||||
aligntOff := (off / blockSize) * blockSize
|
||||
bI := blockIndex{index: a.index, off: aligntOff}
|
||||
|
||||
block, err := a.readBlock(ctx, bI)
|
||||
if err != nil && err != ctxio.EOF {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if off-aligntOff >= int64(block.len) {
|
||||
return 0, ctxio.EOF
|
||||
}
|
||||
|
||||
return copy(p, block.data[off-aligntOff:block.len]), err
|
||||
}
|
||||
|
||||
func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (block, error) {
|
||||
ctx, span := tracer.Start(ctx, "archive.RandomReader.readBlock")
|
||||
defer span.End()
|
||||
|
||||
// check block in cache before locking
|
||||
if b, ok := blockCache.Get(bI); ok && b.len != 0 {
|
||||
return b, nil
|
||||
}
|
||||
|
||||
a.readerMutex.Lock()
|
||||
defer a.readerMutex.Unlock()
|
||||
|
||||
if b, ok := blockCache.Get(bI); ok && b.len != 0 { // check again, maybe another goroutine already read this block
|
||||
return b, nil
|
||||
}
|
||||
|
||||
if a.reader == nil || a.readen > bI.off {
|
||||
span.AddEvent("reader not valid, creating new reader", trace.WithAttributes(
|
||||
attribute.Bool("reader_initialized", a.reader != nil),
|
||||
attribute.Int64("readen", a.readen),
|
||||
attribute.Int64("target_offset", bI.off),
|
||||
))
|
||||
|
||||
if a.reader != nil {
|
||||
if err := a.reader.Close(ctx); err != nil {
|
||||
return block{}, fmt.Errorf("failed to close previous reader: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
a.reader, err = a.readerFactory(context.TODO())
|
||||
if err != nil {
|
||||
return block{}, err
|
||||
}
|
||||
a.readen = 0
|
||||
}
|
||||
|
||||
for off := a.readen; off <= bI.off; off += blockSize {
|
||||
// TODO sync.Pool ?
|
||||
buf := [blockSize]byte{}
|
||||
n, err := a.reader.Read(ctx, buf[:])
|
||||
if err != nil && err != ctxio.EOF {
|
||||
return block{}, err
|
||||
}
|
||||
a.readen += int64(n)
|
||||
|
||||
if n == 0 {
|
||||
return block{}, io.EOF
|
||||
}
|
||||
|
||||
blockCache.Add(blockIndex{bI.index, off}, block{len: n, data: buf})
|
||||
|
||||
if off == bI.off {
|
||||
return block{len: n, data: buf}, err
|
||||
}
|
||||
|
||||
if n < int(blockSize) && errors.Is(err, ctxio.EOF) {
|
||||
return block{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return block{}, io.EOF
|
||||
}
|
||||
|
||||
// Close implements ctxio.Closer.
|
||||
func (a *randomReaderFromLinear) Close(ctx context.Context) error {
|
||||
if a.closed {
|
||||
return nil
|
||||
}
|
||||
a.closed = true
|
||||
|
||||
var errs []error
|
||||
|
||||
if a.reader != nil {
|
||||
errs = append(errs, a.reader.Close(ctx))
|
||||
}
|
||||
|
||||
for _, block := range blockCache.Keys() {
|
||||
if block.index == a.index {
|
||||
blockCache.Remove(block)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Join(errs...)
|
||||
}
|
155
daemons/archive/archive_test.go
Normal file
155
daemons/archive/archive_test.go
Normal file
|
@ -0,0 +1,155 @@
|
|||
package archive_test
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/fs"
|
||||
"testing"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/daemons/archive"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TODO
|
||||
// func TestArchiveFactories(t *testing.T) {
|
||||
// t.Parallel()
|
||||
|
||||
// ctx := context.Background()
|
||||
|
||||
// require := require.New(t)
|
||||
|
||||
// require.Contains(vfs.ArchiveFactories, ".zip")
|
||||
// require.Contains(vfs.ArchiveFactories, ".rar")
|
||||
// require.Contains(vfs.ArchiveFactories, ".7z")
|
||||
|
||||
// fs, err := vfs.ArchiveFactories[".zip"](ctx, &vfs.DummyFile{})
|
||||
// require.NoError(err)
|
||||
// require.NotNil(fs)
|
||||
|
||||
// fs, err = vfs.ArchiveFactories[".rar"](ctx, &vfs.DummyFile{})
|
||||
// require.NoError(err)
|
||||
// require.NotNil(fs)
|
||||
|
||||
// fs, err = vfs.ArchiveFactories[".7z"](ctx, &vfs.DummyFile{})
|
||||
// require.NoError(err)
|
||||
// require.NotNil(fs)
|
||||
// }
|
||||
|
||||
var fileContent []byte = []byte("Hello World")
|
||||
|
||||
func TestZipFilesystem(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
zReader, size := createTestZip(require)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// TODO add single dir collapse test
|
||||
zfs, err := archive.NewArchive(ctx, "test", "test", zReader, size, archive.ZipLoader)
|
||||
require.NoError(err)
|
||||
|
||||
files, err := zfs.ReadDir(ctx, "/path/to/test/file")
|
||||
require.NoError(err)
|
||||
|
||||
require.Len(files, 1)
|
||||
e := files[0]
|
||||
require.Equal("1.txt", e.Name())
|
||||
require.NotNil(e)
|
||||
|
||||
out := make([]byte, 5)
|
||||
f, err := zfs.Open(ctx, "/path/to/test/file/1.txt")
|
||||
require.NoError(err)
|
||||
n, err := f.Read(ctx, out)
|
||||
require.ErrorIs(err, io.EOF)
|
||||
require.Equal(5, n)
|
||||
require.Equal([]byte("Hello"), out)
|
||||
|
||||
outSpace := make([]byte, 1)
|
||||
n, err = f.Read(ctx, outSpace)
|
||||
require.ErrorIs(err, io.EOF)
|
||||
require.Equal(1, n)
|
||||
require.Equal([]byte(" "), outSpace)
|
||||
|
||||
n, err = f.Read(ctx, out)
|
||||
require.ErrorIs(err, io.EOF)
|
||||
require.Equal(5, n)
|
||||
require.Equal([]byte("World"), out)
|
||||
|
||||
}
|
||||
|
||||
func createTestZip(require *require.Assertions) (vfs.File, int64) {
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
|
||||
zWriter := zip.NewWriter(buf)
|
||||
|
||||
f1, err := zWriter.Create("path/to/test/file/1.txt")
|
||||
require.NoError(err)
|
||||
_, err = f1.Write(fileContent)
|
||||
require.NoError(err)
|
||||
|
||||
err = zWriter.Close()
|
||||
require.NoError(err)
|
||||
|
||||
return newCBR(buf.Bytes()), int64(buf.Len())
|
||||
}
|
||||
|
||||
func newCBR(b []byte) *closeableByteReader {
|
||||
return &closeableByteReader{
|
||||
data: bytes.NewReader(b),
|
||||
}
|
||||
}
|
||||
|
||||
var _ vfs.File = &closeableByteReader{}
|
||||
|
||||
type closeableByteReader struct {
|
||||
data *bytes.Reader
|
||||
}
|
||||
|
||||
// ReadAt implements ctxio.ReaderAt.
|
||||
func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
return c.data.ReadAt(p, off)
|
||||
}
|
||||
|
||||
// Close implements vfs.File.
|
||||
func (c *closeableByteReader) Close(ctx context.Context) error {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Info implements vfs.File.
|
||||
func (c *closeableByteReader) Info() (fs.FileInfo, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// IsDir implements vfs.File.
|
||||
func (c *closeableByteReader) IsDir() bool {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Name implements vfs.File.
|
||||
func (c *closeableByteReader) Name() string {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Read implements vfs.File.
|
||||
func (c *closeableByteReader) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
return c.data.Read(p)
|
||||
}
|
||||
|
||||
// Seek implements vfs.File.
|
||||
func (c *closeableByteReader) Seek(offset int64, whence int) (int64, error) {
|
||||
return c.data.Seek(offset, whence)
|
||||
}
|
||||
|
||||
// Size implements vfs.File.
|
||||
func (c *closeableByteReader) Size() int64 {
|
||||
return c.data.Size()
|
||||
}
|
||||
|
||||
// Type implements vfs.File.
|
||||
func (c *closeableByteReader) Type() fs.FileMode {
|
||||
panic("unimplemented")
|
||||
}
|
44
daemons/archive/daemon.go
Normal file
44
daemons/archive/daemon.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package archive
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/daemon"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/knadh/koanf/v2"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
const DaemonName string = "archive"
|
||||
|
||||
var _ daemon.DaemonConstructor = NewDaemon
|
||||
|
||||
func NewDaemon(ctx context.Context, koanf *koanf.Koanf) (daemon.Daemon, error) {
|
||||
return &Daemon{}, nil
|
||||
}
|
||||
|
||||
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/daemons/archive")
|
||||
|
||||
var _ daemon.Daemon = (*Daemon)(nil)
|
||||
|
||||
type Daemon struct{}
|
||||
|
||||
// Name implements daemon.Daemon.
|
||||
func (d *Daemon) Name() string {
|
||||
return DaemonName
|
||||
}
|
||||
|
||||
// Extensions implements daemon.Daemon.
|
||||
func (d *Daemon) Extensions() []string {
|
||||
return []string{".zip", ".rar", ".7z"}
|
||||
}
|
||||
|
||||
// GetFS implements daemon.Daemon.
|
||||
func (d *Daemon) GetFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Close implements daemon.Daemon.
|
||||
func (d *Daemon) Close(ctx context.Context) error {
|
||||
panic("unimplemented")
|
||||
}
|
158
daemons/archive/fs.go
Normal file
158
daemons/archive/fs.go
Normal file
|
@ -0,0 +1,158 @@
|
|||
package archive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/fs"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
)
|
||||
|
||||
// Unlink implements Filesystem.
|
||||
func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error {
|
||||
return vfs.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (a *ArchiveFS) Open(ctx context.Context, filename string) (vfs.File, error) {
|
||||
if filename == vfs.Separator {
|
||||
return vfs.NewDirFile(filename), nil
|
||||
}
|
||||
|
||||
f, ok := a.files[filename]
|
||||
if ok {
|
||||
return f.open(ctx)
|
||||
}
|
||||
|
||||
for p := range a.files {
|
||||
if strings.HasPrefix(p, filename) {
|
||||
return vfs.NewDirFile(filename), nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfs.ErrNotExist
|
||||
}
|
||||
|
||||
func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
|
||||
infos := make(map[string]fs.FileInfo, len(a.files))
|
||||
for k, v := range a.files {
|
||||
infos[k] = v
|
||||
}
|
||||
|
||||
return vfs.ListDirFromInfo(infos, path)
|
||||
}
|
||||
|
||||
// Stat implements Filesystem.
|
||||
func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
|
||||
if entry, ok := afs.files[filename]; ok {
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
for p, _ := range afs.files {
|
||||
if strings.HasPrefix(p, filename) {
|
||||
return vfs.NewDirInfo(path.Base(filename), time.Time{}), nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfs.ErrNotExist
|
||||
}
|
||||
|
||||
// Info implements Filesystem.
|
||||
func (a *ArchiveFS) Info() (fs.FileInfo, error) {
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// IsDir implements Filesystem.
|
||||
func (a *ArchiveFS) IsDir() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Name implements Filesystem.
|
||||
func (a *ArchiveFS) Name() string {
|
||||
return a.name
|
||||
}
|
||||
|
||||
// Type implements Filesystem.
|
||||
func (a *ArchiveFS) Type() fs.FileMode {
|
||||
return fs.ModeDir
|
||||
}
|
||||
|
||||
var _ vfs.File = (*archiveFile)(nil)
|
||||
|
||||
func newArchiveFile(name string, size int64, rr *randomReaderFromLinear) *archiveFile {
|
||||
return &archiveFile{
|
||||
name: name,
|
||||
size: size,
|
||||
rr: rr,
|
||||
}
|
||||
}
|
||||
|
||||
type archiveFile struct {
|
||||
name string
|
||||
size int64
|
||||
|
||||
m sync.Mutex
|
||||
offset int64
|
||||
|
||||
rr *randomReaderFromLinear
|
||||
}
|
||||
|
||||
// Seek implements File.
|
||||
func (d *archiveFile) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
d.offset = offset
|
||||
|
||||
case io.SeekCurrent:
|
||||
d.offset += offset
|
||||
case io.SeekEnd:
|
||||
d.offset = d.size + offset
|
||||
}
|
||||
return d.offset, nil
|
||||
}
|
||||
|
||||
// Name implements File.
|
||||
func (d *archiveFile) Name() string {
|
||||
return d.name
|
||||
}
|
||||
|
||||
// Type implements File.
|
||||
func (d *archiveFile) Type() fs.FileMode {
|
||||
return vfs.ModeFileRO
|
||||
}
|
||||
|
||||
func (d *archiveFile) Info() (fs.FileInfo, error) {
|
||||
return vfs.NewFileInfo(d.name, d.size, time.Time{}), nil
|
||||
}
|
||||
|
||||
func (d *archiveFile) Size() int64 {
|
||||
return d.size
|
||||
}
|
||||
|
||||
func (d *archiveFile) IsDir() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
ctx, span := tracer.Start(ctx, "archive.File.Read")
|
||||
defer span.End()
|
||||
|
||||
n, err = d.rr.ReadAt(ctx, p, d.offset)
|
||||
d.offset += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
d.m.Lock()
|
||||
defer d.m.Unlock()
|
||||
|
||||
return d.rr.ReadAt(ctx, p, off)
|
||||
}
|
||||
|
||||
func (d *archiveFile) Close(ctx context.Context) error {
|
||||
// FIXME close should do nothing as archive fs currently reuse the same file instances
|
||||
return nil
|
||||
}
|
69
daemons/archive/rar.go
Normal file
69
daemons/archive/rar.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package archive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/ioutils"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/nwaples/rardecode/v2"
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
var _ archiveLoader = RarLoader
|
||||
|
||||
func RarLoader(ctx context.Context, archivePath string, f vfs.File, size int64) (map[string]fileEntry, error) {
|
||||
hash, err := vfs.FileHash(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
|
||||
|
||||
r, err := rardecode.NewReader(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make(map[string]fileEntry)
|
||||
for {
|
||||
header, err := r.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
name := header.Name
|
||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
|
||||
r, err := rardecode.NewReader(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for header, err := r.Next(); err != io.EOF; header, err = r.Next() {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if header.Name == name {
|
||||
return ctxio.NopCloser(ctxio.WrapIoReader(r)), nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("file with name '%s' not found", name)
|
||||
}
|
||||
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af)
|
||||
|
||||
out[vfs.AbsPath(header.Name)] = fileEntry{
|
||||
FileInfo: vfs.NewFileInfo(header.Name, header.UnPackedSize, header.ModificationTime),
|
||||
open: func(ctx context.Context) (vfs.File, error) {
|
||||
return newArchiveFile(header.Name, header.UnPackedSize, rr), nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
59
daemons/archive/sevenzip.go
Normal file
59
daemons/archive/sevenzip.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package archive
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/bodgit/sevenzip"
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
var _ archiveLoader = SevenZipLoader
|
||||
|
||||
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader vfs.File, size int64) (map[string]fileEntry, error) {
|
||||
hash, err := vfs.FileHash(ctx, ctxreader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||
r, err := sevenzip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make(map[string]fileEntry)
|
||||
for i, f := range r.File {
|
||||
if f.FileInfo().IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||
zr, err := sevenzip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rc, err := zr.File[i].Open()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ctxio.WrapIoReadCloser(rc), nil
|
||||
}
|
||||
|
||||
info := f.FileInfo()
|
||||
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af)
|
||||
|
||||
out[vfs.AbsPath(f.Name)] = fileEntry{
|
||||
FileInfo: f.FileInfo(),
|
||||
open: func(ctx context.Context) (vfs.File, error) {
|
||||
return newArchiveFile(info.Name(), info.Size(), rr), nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
63
daemons/archive/zip.go
Normal file
63
daemons/archive/zip.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package archive
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/royalcat/ctxio"
|
||||
)
|
||||
|
||||
var _ archiveLoader = ZipLoader
|
||||
|
||||
func ZipLoader(ctx context.Context, archivePath string, f vfs.File, size int64) (map[string]fileEntry, error) {
|
||||
hash, err := vfs.FileHash(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader := ctxio.IoReaderAt(ctx, f)
|
||||
zr, err := zip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make(map[string]fileEntry)
|
||||
for i := range zr.File {
|
||||
zipFile := zr.File[i]
|
||||
if zipFile.FileInfo().IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
i := i
|
||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||
reader := ctxio.IoReaderAt(ctx, f)
|
||||
|
||||
zr, err := zip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create zip reader: %w", err)
|
||||
}
|
||||
|
||||
rc, err := zr.File[i].Open()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open file in zip archive: %w", err)
|
||||
}
|
||||
|
||||
return ctxio.WrapIoReadCloser(rc), nil
|
||||
}
|
||||
|
||||
info := zipFile.FileInfo()
|
||||
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af)
|
||||
|
||||
out[vfs.AbsPath(zipFile.Name)] = fileEntry{
|
||||
FileInfo: info,
|
||||
open: func(ctx context.Context) (vfs.File, error) {
|
||||
return newArchiveFile(info.Name(), info.Size(), rr), nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
|
@ -7,9 +7,9 @@ import (
|
|||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
"github.com/creativecreature/sturdyc"
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/royalcat/btrgo/btrsync"
|
||||
"github.com/viccon/sturdyc"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
|
66
daemons/qbittorrent/config.go
Normal file
66
daemons/qbittorrent/config.go
Normal file
|
@ -0,0 +1,66 @@
|
|||
package qbittorrent
|
||||
|
||||
import (
|
||||
"github.com/knadh/koanf/providers/structs"
|
||||
"github.com/knadh/koanf/v2"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DataFolder string `koanf:"data_folder,omitempty"`
|
||||
MetadataFolder string `koanf:"metadata_folder,omitempty"`
|
||||
}
|
||||
|
||||
var defaultConfig = Config{
|
||||
DataFolder: "./qbittorrent/data",
|
||||
MetadataFolder: "./qbittorrent/metadata",
|
||||
}
|
||||
|
||||
func loadConfig(koanf *koanf.Koanf) (Config, error) {
|
||||
if err := koanf.Load(structs.Provider(defaultConf, "koanf"), nil); err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
|
||||
var config Config
|
||||
if err := koanf.Unmarshal("", &config); err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// var defaultRoutes = []Route{
|
||||
// {
|
||||
// Name: "multimedia",
|
||||
// Torrents: []Torrent{
|
||||
// {
|
||||
// MagnetURI: "magnet:?xt=urn:btih:c9e15763f722f23e98a29decdfae341b98d53056&dn=Cosmos+Laundromat&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fcosmos-laundromat.torrent",
|
||||
// },
|
||||
// {
|
||||
// MagnetURI: "magnet:?xt=urn:btih:dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c&dn=Big+Buck+Bunny&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fbig-buck-bunny.torrent",
|
||||
// },
|
||||
// {
|
||||
// MagnetURI: "magnet:?xt=urn:btih:08ada5a7a6183aae1e09d831df6748d566095a10&dn=Sintel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fsintel.torrent",
|
||||
// },
|
||||
// {
|
||||
// MagnetURI: "magnet:?xt=urn:btih:209c8226b299b308beaf2b9cd3fb49212dbd13ec&dn=Tears+of+Steel&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Ftears-of-steel.torrent",
|
||||
// },
|
||||
// {
|
||||
// MagnetURI: "magnet:?xt=urn:btih:a88fda5954e89178c372716a6a78b8180ed4dad3&dn=The+WIRED+CD+-+Rip.+Sample.+Mash.+Share&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fwired-cd.torrent",
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
// var defaultServers = []Server{
|
||||
// {
|
||||
// Name: "server",
|
||||
// Path: "server",
|
||||
// Trackers: []string{
|
||||
// "wss://tracker.btorrent.xyz",
|
||||
// "wss://tracker.openwebtorrent.com",
|
||||
// "http://p4p.arenabg.com:1337/announce",
|
||||
// "udp://tracker.opentrackr.org:1337/announce",
|
||||
// "udp://open.tracker.cl:1337/announce",
|
||||
// "http://openbittorrent.com:80/announce",
|
||||
// },
|
||||
// },
|
||||
// }
|
|
@ -14,7 +14,7 @@ import (
|
|||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/daemon"
|
||||
"git.kmsign.ru/royalcat/tstor/src/logwrap"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
|
@ -22,6 +22,7 @@ import (
|
|||
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
|
||||
mapset "github.com/deckarep/golang-set/v2"
|
||||
"github.com/iceber/iouring-go"
|
||||
"github.com/knadh/koanf/v2"
|
||||
"github.com/royalcat/ctxio"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
@ -52,12 +53,20 @@ WebUI\LocalHostAuth=false
|
|||
WebUI\Password_PBKDF2="@ByteArray(qef5I4wZBkDG+PP6/5mQwA==:LoTmorQM/QM5RHI4+dOiu6xfAz9xak6fhR4ZGpRtJF3JNCGG081Yrtva4G71kXz//ODUuWQKTLlrZPuIDvzqUQ==)"
|
||||
`
|
||||
|
||||
func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
|
||||
ctx := context.Background()
|
||||
log := rlog.Component("qbittorrent")
|
||||
const DaemonName = "qbittorrent"
|
||||
|
||||
binPath := conf.MetadataFolder + "/qbittorrent-nox"
|
||||
err := downloadLatestQbitRelease(ctx, binPath)
|
||||
var _ daemon.DaemonConstructor = NewDaemon
|
||||
|
||||
func NewDaemon(ctx context.Context, koanf *koanf.Koanf) (daemon.Daemon, error) {
|
||||
log := rlog.Component(DaemonName)
|
||||
|
||||
config, err := loadConfig(koanf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
binPath := config.MetadataFolder + "/qbittorrent-nox"
|
||||
err = downloadLatestQbitRelease(ctx, binPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -66,26 +75,26 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
|
|||
outLog := logwrap.NewSlogWriter(ctx, slog.LevelInfo, daemonLog.Slog())
|
||||
errLog := logwrap.NewSlogWriter(ctx, slog.LevelError, daemonLog.Slog())
|
||||
|
||||
_, err = os.Stat(conf.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf")
|
||||
_, err = os.Stat(config.MetadataFolder + "/profile/qBittorrent/config/qBittorrent.conf")
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
err = os.MkdirAll(conf.MetadataFolder+"/profile/qBittorrent/config", 0744)
|
||||
err = os.MkdirAll(config.MetadataFolder+"/profile/qBittorrent/config", 0744)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = os.WriteFile(conf.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644)
|
||||
err = os.WriteFile(config.MetadataFolder+"/profile/qBittorrent/config/qBittorrent.conf", []byte(defaultConf), 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = os.MkdirAll(conf.DataFolder, 0744)
|
||||
err = os.MkdirAll(config.DataFolder, 0744)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
const port = 25436
|
||||
|
||||
proc, err := runQBittorrent(binPath, conf.MetadataFolder+"/profile", port, outLog, errLog)
|
||||
proc, err := runQBittorrent(binPath, config.MetadataFolder+"/profile", port, outLog, errLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -109,7 +118,7 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
dataDir, err := filepath.Abs(conf.DataFolder)
|
||||
dataDir, err := filepath.Abs(config.DataFolder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -129,15 +138,23 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
|
|||
return &Daemon{
|
||||
qb: qb,
|
||||
proc: proc,
|
||||
dataDir: conf.DataFolder,
|
||||
dataDir: config.DataFolder,
|
||||
ur: ur,
|
||||
sourceFiles: make(map[string]string),
|
||||
registeredTorrents: mapset.NewSet[string](),
|
||||
client: wrapClient(qb),
|
||||
log: rlog.Component("qbittorrent"),
|
||||
log: rlog.Component(DaemonName),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *Daemon) Name() string {
|
||||
return DaemonName
|
||||
}
|
||||
|
||||
func (d *Daemon) Extensions() []string {
|
||||
return []string{".torrent"}
|
||||
}
|
||||
|
||||
func (d *Daemon) Close(ctx context.Context) error {
|
||||
err := d.proc.Signal(os.Interrupt)
|
||||
if err != nil {
|
||||
|
@ -156,7 +173,7 @@ func torrentDataPath(dataDir string, ih string) (string, error) {
|
|||
return filepath.Abs(path.Join(dataDir, ih))
|
||||
}
|
||||
|
||||
func (fs *Daemon) GetTorrentFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) {
|
||||
func (fs *Daemon) GetFS(ctx context.Context, sourcePath string, file vfs.File) (vfs.Filesystem, error) {
|
||||
ctx, span := trace.Start(ctx, "GetTorrentFS")
|
||||
defer span.End()
|
||||
|
||||
|
|
3
daemons/qbittorrent/go.mod
Normal file
3
daemons/qbittorrent/go.mod
Normal file
|
@ -0,0 +1,3 @@
|
|||
module github.com/royalcat/tstor/daemons/qbittorrent
|
||||
|
||||
go 1.23.5
|
10
daemons/qbittorrent/plugin/main.go
Normal file
10
daemons/qbittorrent/plugin/main.go
Normal file
|
@ -0,0 +1,10 @@
|
|||
package main
|
||||
|
||||
import "git.kmsign.ru/royalcat/tstor/daemons/qbittorrent"
|
||||
|
||||
func main() {
|
||||
}
|
||||
|
||||
const DaemonName = qbittorrent.DaemonName
|
||||
|
||||
var NewDaemon = qbittorrent.NewDaemon
|
Loading…
Add table
Add a link
Reference in a new issue