Implement zip explode first iteration
Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com>
This commit is contained in:
parent
1e46df7dd7
commit
40eefbf008
8 changed files with 229 additions and 7 deletions
|
@ -25,6 +25,7 @@ require (
|
||||||
github.com/mschoch/smat v0.2.0 // indirect
|
github.com/mschoch/smat v0.2.0 // indirect
|
||||||
github.com/panjf2000/ants/v2 v2.3.1
|
github.com/panjf2000/ants/v2 v2.3.1
|
||||||
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
|
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
|
||||||
|
github.com/stretchr/testify v1.4.0
|
||||||
github.com/tinylib/msgp v1.1.2 // indirect
|
github.com/tinylib/msgp v1.1.2 // indirect
|
||||||
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect
|
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect
|
||||||
golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f // indirect
|
golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f // indirect
|
||||||
|
|
65
torrent/iio/disk.go
Normal file
65
torrent/iio/disk.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package iio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DiskTeeReader struct {
|
||||||
|
io.ReaderAt
|
||||||
|
io.Closer
|
||||||
|
io.Reader
|
||||||
|
|
||||||
|
m sync.Mutex
|
||||||
|
|
||||||
|
fo int64
|
||||||
|
fr *os.File
|
||||||
|
to int64
|
||||||
|
tr io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDiskTeeReader(r io.Reader) (*DiskTeeReader, error) {
|
||||||
|
fr, err := ioutil.TempFile("", "dtb_tmp")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tr := io.TeeReader(r, fr)
|
||||||
|
return &DiskTeeReader{fr: fr, tr: tr}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dtr *DiskTeeReader) ReadAt(p []byte, off int64) (int, error) {
|
||||||
|
dtr.m.Lock()
|
||||||
|
defer dtr.m.Unlock()
|
||||||
|
tb := off + int64(len(p))
|
||||||
|
|
||||||
|
if tb > dtr.fo {
|
||||||
|
w, err := io.CopyN(ioutil.Discard, dtr.tr, tb-dtr.fo)
|
||||||
|
dtr.to += w
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := dtr.fr.ReadAt(p, off)
|
||||||
|
dtr.fo += int64(n)
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dtr *DiskTeeReader) Read(p []byte) (n int, err error) {
|
||||||
|
dtr.m.Lock()
|
||||||
|
defer dtr.m.Unlock()
|
||||||
|
// use directly tee reader here
|
||||||
|
n, err = dtr.tr.Read(p)
|
||||||
|
dtr.to += int64(n)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dtr *DiskTeeReader) Close() error {
|
||||||
|
if err := dtr.fr.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.Remove(dtr.fr.Name())
|
||||||
|
}
|
46
torrent/iio/disk_test.go
Normal file
46
torrent/iio/disk_test.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package iio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var testData []byte = []byte("Hello World")
|
||||||
|
|
||||||
|
func TestReadData(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
br := bytes.NewReader(testData)
|
||||||
|
r, err := NewDiskTeeReader(br)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
toRead := make([]byte, 5)
|
||||||
|
|
||||||
|
n, err := r.ReadAt(toRead, 6)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(5, n)
|
||||||
|
require.Equal("World", string(toRead))
|
||||||
|
|
||||||
|
r.ReadAt(toRead, 0)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(5, n)
|
||||||
|
require.Equal("Hello", string(toRead))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadDataEOF(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
br := bytes.NewReader(testData)
|
||||||
|
r, err := NewDiskTeeReader(br)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
toRead := make([]byte, 6)
|
||||||
|
|
||||||
|
n, err := r.ReadAt(toRead, 6)
|
||||||
|
require.Equal(io.EOF, err)
|
||||||
|
require.Equal(5, n)
|
||||||
|
require.Equal("World\x00", string(toRead))
|
||||||
|
}
|
|
@ -14,6 +14,7 @@ import (
|
||||||
var _ fs.NodeGetattrer = &File{}
|
var _ fs.NodeGetattrer = &File{}
|
||||||
var _ fs.NodeOpener = &File{}
|
var _ fs.NodeOpener = &File{}
|
||||||
var _ fs.NodeReader = &File{}
|
var _ fs.NodeReader = &File{}
|
||||||
|
var _ fs.NodeReleaser = &File{}
|
||||||
|
|
||||||
// File is a fuse node for files inside a torrent
|
// File is a fuse node for files inside a torrent
|
||||||
type File struct {
|
type File struct {
|
||||||
|
@ -60,7 +61,7 @@ func (tr *File) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseF
|
||||||
r, err := tr.f()
|
r, err := tr.f()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("error opening reader for file", err)
|
log.Println("error opening reader for file", err)
|
||||||
return nil, fuse.FOPEN_KEEP_CACHE, syscall.EIO
|
return nil, 0, syscall.EIO
|
||||||
}
|
}
|
||||||
|
|
||||||
tr.r = r
|
tr.r = r
|
||||||
|
@ -71,6 +72,9 @@ func (tr *File) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseF
|
||||||
|
|
||||||
func (tr *File) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
|
func (tr *File) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
|
||||||
end := int(math.Min(float64(len(dest)), float64(int64(tr.len)-off)))
|
end := int(math.Min(float64(len(dest)), float64(int64(tr.len)-off)))
|
||||||
|
if end < 0 {
|
||||||
|
end = 0
|
||||||
|
}
|
||||||
|
|
||||||
buf := dest[:end]
|
buf := dest[:end]
|
||||||
|
|
||||||
|
@ -84,3 +88,20 @@ func (tr *File) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int6
|
||||||
buf = buf[:n]
|
buf = buf[:n]
|
||||||
return fuse.ReadResultData(buf), fs.OK
|
return fuse.ReadResultData(buf), fs.OK
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tr *File) Release(ctx context.Context, f fs.FileHandle) syscall.Errno {
|
||||||
|
log.Println("closing file...")
|
||||||
|
if tr.r != nil {
|
||||||
|
closer, ok := tr.r.(io.Closer)
|
||||||
|
if ok {
|
||||||
|
if err := closer.Close(); err != nil {
|
||||||
|
log.Println("error closing file", err)
|
||||||
|
return syscall.EIO
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Println("file is not implementing close method")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fs.OK
|
||||||
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (root *Root) OnAdd(ctx context.Context) {
|
||||||
root.pool.Submit(func() {
|
root.pool.Submit(func() {
|
||||||
root.AddChild(
|
root.AddChild(
|
||||||
filepath.Clean(torrent.Name()),
|
filepath.Clean(torrent.Name()),
|
||||||
root.NewPersistentInode(ctx, &Folder{t: torrent, pool: root.pool}, fs.StableAttr{
|
root.NewPersistentInode(ctx, &Torrent{t: torrent, pool: root.pool}, fs.StableAttr{
|
||||||
Mode: syscall.S_IFDIR,
|
Mode: syscall.S_IFDIR,
|
||||||
}), true)
|
}), true)
|
||||||
})
|
})
|
||||||
|
|
|
@ -12,17 +12,17 @@ import (
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ fs.NodeOnAdder = &Folder{}
|
var _ fs.NodeOnAdder = &Torrent{}
|
||||||
var _ fs.NodeGetattrer = &Folder{}
|
var _ fs.NodeGetattrer = &Torrent{}
|
||||||
|
|
||||||
type Folder struct {
|
type Torrent struct {
|
||||||
fs.Inode
|
fs.Inode
|
||||||
t *torrent.Torrent
|
t *torrent.Torrent
|
||||||
|
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (folder *Folder) OnAdd(ctx context.Context) {
|
func (folder *Torrent) OnAdd(ctx context.Context) {
|
||||||
for _, file := range folder.t.Files() {
|
for _, file := range folder.t.Files() {
|
||||||
file := file
|
file := file
|
||||||
LoadNodeByPath(
|
LoadNodeByPath(
|
||||||
|
@ -38,7 +38,7 @@ func (folder *Folder) OnAdd(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (folder *Folder) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
|
func (folder *Torrent) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
|
||||||
out.Mode = syscall.S_IFDIR & 07777
|
out.Mode = syscall.S_IFDIR & 07777
|
||||||
|
|
||||||
return fs.OK
|
return fs.OK
|
||||||
|
|
|
@ -39,6 +39,13 @@ func LoadNodeByPath(ctx context.Context, pool *ants.Pool, fp string, reader Read
|
||||||
|
|
||||||
ext := path.Ext(base)
|
ext := path.Ext(base)
|
||||||
switch ext {
|
switch ext {
|
||||||
|
case ".zip":
|
||||||
|
n := NewZip(reader, fileLength)
|
||||||
|
p.AddChild(
|
||||||
|
strings.TrimRight(base, ext),
|
||||||
|
p.NewPersistentInode(ctx, n, fs.StableAttr{
|
||||||
|
Mode: fuse.S_IFDIR,
|
||||||
|
}), true)
|
||||||
default:
|
default:
|
||||||
n := NewFileWithBlocks(
|
n := NewFileWithBlocks(
|
||||||
reader,
|
reader,
|
||||||
|
|
82
torrent/node/zip.go
Normal file
82
torrent/node/zip.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/zip"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/ajnavarro/distribyted/iio"
|
||||||
|
"github.com/hanwen/go-fuse/v2/fs"
|
||||||
|
"github.com/hanwen/go-fuse/v2/fuse"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ fs.NodeGetattrer = &Zip{}
|
||||||
|
var _ fs.NodeOpendirer = &Zip{}
|
||||||
|
|
||||||
|
type Zip struct {
|
||||||
|
fs.Inode
|
||||||
|
|
||||||
|
reader ReaderFunc
|
||||||
|
size int64
|
||||||
|
files []*zip.File
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewZip(reader ReaderFunc, size int64) *Zip {
|
||||||
|
return &Zip{
|
||||||
|
reader: reader,
|
||||||
|
size: size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (z *Zip) Opendir(ctx context.Context) syscall.Errno {
|
||||||
|
if z.files == nil {
|
||||||
|
r, err := z.reader()
|
||||||
|
if err != nil {
|
||||||
|
log.Println("error opening reader for zip", err)
|
||||||
|
return syscall.EIO
|
||||||
|
}
|
||||||
|
zr, err := zip.NewReader(r, z.size)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("error getting zip reader from reader", err)
|
||||||
|
return syscall.EIO
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range zr.File {
|
||||||
|
f := f
|
||||||
|
if f.FileInfo().IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
LoadNodeByPath(
|
||||||
|
ctx,
|
||||||
|
nil,
|
||||||
|
f.Name,
|
||||||
|
func() (io.ReaderAt, error) {
|
||||||
|
zfr, err := f.Open()
|
||||||
|
if err != nil {
|
||||||
|
log.Println("ERROR OPENING ZIP", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return iio.NewDiskTeeReader(zfr)
|
||||||
|
},
|
||||||
|
&z.Inode,
|
||||||
|
int64(f.UncompressedSize64),
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
z.files = zr.File
|
||||||
|
}
|
||||||
|
|
||||||
|
return fs.OK
|
||||||
|
}
|
||||||
|
|
||||||
|
func (z *Zip) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
|
||||||
|
out.Mode = syscall.S_IFDIR & 07777
|
||||||
|
out.Size = uint64(z.size)
|
||||||
|
|
||||||
|
return fs.OK
|
||||||
|
}
|
Loading…
Reference in a new issue