New Expandable file formats ()

This commit is contained in:
Antonio Navarro Perez 2021-11-29 11:07:54 +01:00 committed by GitHub
parent 15c72452de
commit 8d9a9281c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 808 additions and 346 deletions

246
fs/archive.go Normal file
View file

@ -0,0 +1,246 @@
package fs
import (
"archive/zip"
"io"
"os"
"path/filepath"
"sync"
"github.com/bodgit/sevenzip"
"github.com/distribyted/distribyted/iio"
"github.com/nwaples/rardecode/v2"
)
var _ loader = &Zip{}
type Zip struct {
}
func (fs *Zip) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
zr, err := zip.NewReader(reader, size)
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
for _, f := range zr.File {
f := f
if f.FileInfo().IsDir() {
continue
}
rf := func() (iio.Reader, error) {
zr, err := f.Open()
if err != nil {
return nil, err
}
return iio.NewDiskTeeReader(zr)
}
n := filepath.Join(string(os.PathSeparator), f.Name)
af := NewArchiveFile(rf, f.FileInfo().Size())
out[n] = af
}
return out, nil
}
var _ loader = &SevenZip{}
type SevenZip struct {
}
func (fs *SevenZip) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
r, err := sevenzip.NewReader(reader, size)
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
for _, f := range r.File {
f := f
if f.FileInfo().IsDir() {
continue
}
rf := func() (iio.Reader, error) {
zr, err := f.Open()
if err != nil {
return nil, err
}
return iio.NewDiskTeeReader(zr)
}
af := NewArchiveFile(rf, f.FileInfo().Size())
n := filepath.Join(string(os.PathSeparator), f.Name)
out[n] = af
}
return out, nil
}
var _ loader = &Rar{}
type Rar struct {
}
func (fs *Rar) getFiles(reader iio.Reader, size int64) (map[string]*ArchiveFile, error) {
r, err := rardecode.NewReader(iio.NewSeekerWrapper(reader, size))
if err != nil {
return nil, err
}
out := make(map[string]*ArchiveFile)
for {
header, err := r.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
rf := func() (iio.Reader, error) {
return iio.NewDiskTeeReader(r)
}
n := filepath.Join(string(os.PathSeparator), header.Name)
af := NewArchiveFile(rf, header.UnPackedSize)
out[n] = af
}
return out, nil
}
type loader interface {
getFiles(r iio.Reader, size int64) (map[string]*ArchiveFile, error)
}
var _ Filesystem = &archive{}
type archive struct {
r iio.Reader
s *storage
size int64
once sync.Once
l loader
}
func NewArchive(r iio.Reader, size int64, l loader) *archive {
return &archive{
r: r,
s: newStorage(nil),
size: size,
l: l,
}
}
func (fs *archive) loadOnce() error {
var errOut error
fs.once.Do(func() {
files, err := fs.l.getFiles(fs.r, fs.size)
if err != nil {
errOut = err
return
}
for name, file := range files {
if err := fs.s.Add(file, name); err != nil {
errOut = err
return
}
}
})
return errOut
}
func (fs *archive) Open(filename string) (File, error) {
if filename == string(os.PathSeparator) {
return &Dir{}, nil
}
if err := fs.loadOnce(); err != nil {
return nil, err
}
return fs.s.Get(filename)
}
func (fs *archive) ReadDir(path string) (map[string]File, error) {
if err := fs.loadOnce(); err != nil {
return nil, err
}
return fs.s.Children(path)
}
var _ File = &ArchiveFile{}
func NewArchiveFile(readerFunc func() (iio.Reader, error), len int64) *ArchiveFile {
return &ArchiveFile{
readerFunc: readerFunc,
len: len,
}
}
type ArchiveFile struct {
readerFunc func() (iio.Reader, error)
reader iio.Reader
len int64
}
func (d *ArchiveFile) load() error {
if d.reader != nil {
return nil
}
r, err := d.readerFunc()
if err != nil {
return err
}
d.reader = r
return nil
}
func (d *ArchiveFile) Size() int64 {
return d.len
}
func (d *ArchiveFile) IsDir() bool {
return false
}
func (d *ArchiveFile) Close() (err error) {
if d.reader != nil {
err = d.reader.Close()
d.reader = nil
}
return
}
func (d *ArchiveFile) Read(p []byte) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.Read(p)
}
func (d *ArchiveFile) ReadAt(p []byte, off int64) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.ReadAt(p, off)
}

View file

@ -18,7 +18,7 @@ func TestZipFilesystem(t *testing.T) {
zReader, len := createTestZip(require)
zfs := NewZip(zReader, len)
zfs := NewArchive(zReader, len, &Zip{})
files, err := zfs.ReadDir("/path/to/test/file")
require.NoError(err)

View file

@ -20,5 +20,5 @@ func (fs *ContainerFs) Open(filename string) (File, error) {
}
func (fs *ContainerFs) ReadDir(path string) (map[string]File, error) {
return fs.s.Children(path), nil
return fs.s.Children(path)
}

View file

@ -21,7 +21,7 @@ func (fs *Memory) Open(filename string) (File, error) {
}
func (fs *Memory) ReadDir(path string) (map[string]File, error) {
return fs.Storage.Children(path), nil
return fs.Storage.Children(path)
}
var _ File = &MemoryFile{}

View file

@ -12,7 +12,13 @@ type FsFactory func(f File) (Filesystem, error)
var SupportedFactories = map[string]FsFactory{
".zip": func(f File) (Filesystem, error) {
return NewZip(f, f.Size()), nil
return NewArchive(f, f.Size(), &Zip{}), nil
},
".rar": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &Rar{}), nil
},
".7z": func(f File) (Filesystem, error) {
return NewArchive(f, f.Size(), &SevenZip{}), nil
},
}
@ -37,6 +43,8 @@ func (s *storage) Clear() {
s.files = make(map[string]File)
s.children = make(map[string]map[string]File)
s.filesystems = make(map[string]Filesystem)
s.Add(&Dir{}, "/")
}
func (s *storage) Has(path string) bool {
@ -116,20 +124,20 @@ func (s *storage) createParent(p string, f File) error {
return nil
}
func (s *storage) Children(path string) map[string]File {
func (s *storage) Children(path string) (map[string]File, error) {
path = clean(path)
out, err := s.getDirFromFs(path)
if err == nil {
return out
}
l := make(map[string]File)
for n, f := range s.children[path] {
l[n] = f
}
return l
if _, ok := s.children[path]; ok {
return l, nil
}
return s.getDirFromFs(path)
}
func (s *storage) Get(path string) (File, error) {

View file

@ -40,7 +40,8 @@ func TestStorage(t *testing.T) {
require.Error(err)
require.Nil(file)
files := s.Children("/path/to/dummy/")
files, err := s.Children("/path/to/dummy/")
require.NoError(err)
require.Len(files, 2)
require.Contains(files, "file.txt")
require.Contains(files, "file2.txt")
@ -48,7 +49,8 @@ func TestStorage(t *testing.T) {
err = s.Add(&Dummy{}, "/path/to/dummy/folder/file.txt")
require.NoError(err)
files = s.Children("/path/to/dummy/")
files, err = s.Children("/path/to/dummy/")
require.NoError(err)
require.Len(files, 3)
require.Contains(files, "file.txt")
require.Contains(files, "file2.txt")
@ -59,7 +61,8 @@ func TestStorage(t *testing.T) {
require.True(s.Has("/path/file4.txt"))
files = s.Children("/")
files, err = s.Children("/")
require.NoError(err)
require.Len(files, 1)
err = s.Add(&Dummy{}, "/path/special_file.test")
@ -69,10 +72,12 @@ func TestStorage(t *testing.T) {
require.NoError(err)
require.Equal(&Dummy{}, file)
files = s.Children("/path/special_file.test")
require.Len(files, 0)
files, err = s.Children("/path/special_file.test")
require.Error(err)
require.Nil(files)
files = s.Children("/path/special_file.test/dir/here")
files, err = s.Children("/path/special_file.test/dir/here")
require.NoError(err)
require.Len(files, 2)
err = s.Add(&Dummy{}, "/path/to/__special__path/file3.txt")
@ -81,6 +86,8 @@ func TestStorage(t *testing.T) {
file, err = s.Get("/path/to/__special__path/file3.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
s.Clear()
}
func TestStorageWindowsPath(t *testing.T) {
@ -120,6 +127,28 @@ func TestStorageAddFs(t *testing.T) {
require.Error(err)
}
func TestSupportedFactories(t *testing.T) {
t.Parallel()
require := require.New(t)
require.Contains(SupportedFactories, ".zip")
require.Contains(SupportedFactories, ".rar")
require.Contains(SupportedFactories, ".7z")
fs, err := SupportedFactories[".zip"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = SupportedFactories[".rar"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
fs, err = SupportedFactories[".7z"](&Dummy{})
require.NoError(err)
require.NotNil(fs)
}
var _ Filesystem = &DummyFs{}
type DummyFs struct {

View file

@ -1,25 +1,29 @@
package fs
import (
"context"
"io"
"sync"
"time"
"github.com/anacrolix/torrent"
"github.com/distribyted/distribyted/iio"
)
var _ Filesystem = &Torrent{}
type Torrent struct {
mu sync.Mutex
ts map[string]*torrent.Torrent
s *storage
loaded bool
mu sync.RWMutex
ts map[string]*torrent.Torrent
s *storage
loaded bool
readTimeout int
}
func NewTorrent() *Torrent {
func NewTorrent(readTimeout int) *Torrent {
return &Torrent{
s: newStorage(SupportedFactories),
ts: make(map[string]*torrent.Torrent),
s: newStorage(SupportedFactories),
ts: make(map[string]*torrent.Torrent),
readTimeout: readTimeout,
}
}
@ -45,14 +49,17 @@ func (fs *Torrent) load() {
if fs.loaded {
return
}
fs.mu.Lock()
defer fs.mu.Unlock()
fs.mu.RLock()
defer fs.mu.RUnlock()
for _, t := range fs.ts {
<-t.GotInfo()
for _, file := range t.Files() {
fs.s.Add(&torrentFile{readerFunc: file.NewReader, len: file.Length()}, file.Path())
fs.s.Add(&torrentFile{
reader: file.NewReader(),
len: file.Length(),
timeout: fs.readTimeout,
}, file.Path())
}
}
@ -66,23 +73,18 @@ func (fs *Torrent) Open(filename string) (File, error) {
func (fs *Torrent) ReadDir(path string) (map[string]File, error) {
fs.load()
return fs.s.Children(path), nil
return fs.s.Children(path)
}
var _ File = &torrentFile{}
type torrentFile struct {
readerFunc func() torrent.Reader
reader iio.Reader
len int64
}
mu sync.Mutex
func (d *torrentFile) load() {
if d.reader != nil {
return
}
reader torrent.Reader
len int64
d.reader = iio.NewReadAtWrapper(d.readerFunc())
timeout int
}
func (d *torrentFile) Size() int64 {
@ -94,22 +96,58 @@ func (d *torrentFile) IsDir() bool {
}
func (d *torrentFile) Close() error {
var err error
if d.reader != nil {
err = d.reader.Close()
}
d.reader = nil
return err
return d.reader.Close()
}
func (d *torrentFile) Read(p []byte) (n int, err error) {
d.load()
return d.reader.Read(p)
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
func() {
cancel()
},
)
defer timer.Stop()
return d.reader.ReadContext(ctx, p)
}
func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) {
d.load()
return d.reader.ReadAt(p, off)
func (d *torrentFile) ReadAt(p []byte, off int64) (int, error) {
d.mu.Lock()
defer d.mu.Unlock()
_, err := d.reader.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
i, err := d.readAtLeast(p, len(p))
return i, err
}
func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
func() {
cancel()
},
)
nn, err = d.reader.ReadContext(ctx, buf[n:])
n += nn
timer.Stop()
}
if n >= min {
err = nil
} else if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}

View file

@ -11,21 +11,33 @@ import (
const testMagnet = "magnet:?xt=urn:btih:a88fda5954e89178c372716a6a78b8180ed4dad3&dn=The+WIRED+CD+-+Rip.+Sample.+Mash.+Share&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fwired-cd.torrent"
func TestTorrentFilesystem(t *testing.T) {
t.Parallel()
require := require.New(t)
var Cli *torrent.Client
func TestMain(m *testing.M) {
cfg := torrent.NewDefaultClientConfig()
cfg.DataDir = os.TempDir()
client, err := torrent.NewClient(cfg)
if err != nil {
panic(err)
}
Cli = client
exitVal := m.Run()
client.Close()
os.Exit(exitVal)
}
func TestTorrentFilesystem(t *testing.T) {
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
to, err := client.AddMagnet(testMagnet)
require.NoError(err)
tfs := NewTorrent()
tfs := NewTorrent(600)
tfs.AddTorrent(to)
files, err := tfs.ReadDir("/")
@ -61,5 +73,39 @@ func TestTorrentFilesystem(t *testing.T) {
require.NoError(err)
require.Equal(10, n)
tfs.RemoveTorrent(to.InfoHash().String())
files, err = tfs.ReadDir("/")
require.NoError(err)
require.Len(files, 0)
require.NoError(f.Close())
}
func TestReadAtWrapper(t *testing.T) {
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
<-to.GotInfo()
torrFile := to.Files()[0]
tf := torrentFile{
reader: torrFile.NewReader(),
len: torrFile.Length(),
timeout: 500,
}
defer tf.Close()
toRead := make([]byte, 5)
n, err := tf.ReadAt(toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = tf.ReadAt(toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}

141
fs/zip.go
View file

@ -1,141 +0,0 @@
package fs
import (
"archive/zip"
"os"
"github.com/distribyted/distribyted/iio"
)
var _ Filesystem = &Zip{}
type Zip struct {
r iio.Reader
s *storage
size int64
loaded bool
}
func NewZip(r iio.Reader, size int64) *Zip {
return &Zip{
r: r,
size: size,
s: newStorage(nil),
}
}
func (fs *Zip) load() error {
if fs.loaded {
return nil
}
zr, err := zip.NewReader(fs.r, fs.size)
if err != nil {
return err
}
for _, f := range zr.File {
f := f
if f.FileInfo().IsDir() {
continue
}
err := fs.s.Add(newZipFile(
func() (iio.Reader, error) {
zr, err := f.Open()
if err != nil {
return nil, err
}
return iio.NewDiskTeeReader(zr)
},
f.FileInfo().Size(),
), string(os.PathSeparator)+f.Name)
if err != nil {
return err
}
}
fs.loaded = true
return nil
}
func (fs *Zip) Open(filename string) (File, error) {
if err := fs.load(); err != nil {
return nil, err
}
return fs.s.Get(filename)
}
func (fs *Zip) ReadDir(path string) (map[string]File, error) {
if err := fs.load(); err != nil {
return nil, err
}
return fs.s.Children(path), nil
}
var _ File = &zipFile{}
func newZipFile(readerFunc func() (iio.Reader, error), len int64) *zipFile {
return &zipFile{
readerFunc: readerFunc,
len: len,
}
}
type zipFile struct {
readerFunc func() (iio.Reader, error)
reader iio.Reader
len int64
}
func (d *zipFile) load() error {
if d.reader != nil {
return nil
}
r, err := d.readerFunc()
if err != nil {
return err
}
d.reader = r
return nil
}
func (d *zipFile) Size() int64 {
return d.len
}
func (d *zipFile) IsDir() bool {
return false
}
func (d *zipFile) Close() (err error) {
if d.reader != nil {
err = d.reader.Close()
d.reader = nil
}
return
}
func (d *zipFile) Read(p []byte) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.Read(p)
}
func (d *zipFile) ReadAt(p []byte, off int64) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
}
return d.reader.ReadAt(p, off)
}