diff --git a/torrent/go.mod b/torrent/go.mod index b9176e1..e6a432b 100644 --- a/torrent/go.mod +++ b/torrent/go.mod @@ -25,6 +25,7 @@ require ( github.com/mschoch/smat v0.2.0 // indirect github.com/panjf2000/ants/v2 v2.3.1 github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 + github.com/stretchr/testify v1.4.0 github.com/tinylib/msgp v1.1.2 // indirect golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f // indirect diff --git a/torrent/iio/disk.go b/torrent/iio/disk.go new file mode 100644 index 0000000..fb09a17 --- /dev/null +++ b/torrent/iio/disk.go @@ -0,0 +1,65 @@ +package iio + +import ( + "io" + "io/ioutil" + "os" + "sync" +) + +type DiskTeeReader struct { + io.ReaderAt + io.Closer + io.Reader + + m sync.Mutex + + fo int64 + fr *os.File + to int64 + tr io.Reader +} + +func NewDiskTeeReader(r io.Reader) (*DiskTeeReader, error) { + fr, err := ioutil.TempFile("", "dtb_tmp") + if err != nil { + return nil, err + } + tr := io.TeeReader(r, fr) + return &DiskTeeReader{fr: fr, tr: tr}, nil +} + +func (dtr *DiskTeeReader) ReadAt(p []byte, off int64) (int, error) { + dtr.m.Lock() + defer dtr.m.Unlock() + tb := off + int64(len(p)) + + if tb > dtr.fo { + w, err := io.CopyN(ioutil.Discard, dtr.tr, tb-dtr.fo) + dtr.to += w + if err != nil && err != io.EOF { + return 0, err + } + } + + n, err := dtr.fr.ReadAt(p, off) + dtr.fo += int64(n) + return n, err +} + +func (dtr *DiskTeeReader) Read(p []byte) (n int, err error) { + dtr.m.Lock() + defer dtr.m.Unlock() + // use directly tee reader here + n, err = dtr.tr.Read(p) + dtr.to += int64(n) + return +} + +func (dtr *DiskTeeReader) Close() error { + if err := dtr.fr.Close(); err != nil { + return err + } + + return os.Remove(dtr.fr.Name()) +} diff --git a/torrent/iio/disk_test.go b/torrent/iio/disk_test.go new file mode 100644 index 0000000..9abc95b --- /dev/null +++ b/torrent/iio/disk_test.go @@ -0,0 +1,46 @@ +package iio + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +var testData []byte = []byte("Hello World") + +func TestReadData(t *testing.T) { + require := require.New(t) + + br := bytes.NewReader(testData) + r, err := NewDiskTeeReader(br) + require.NoError(err) + + toRead := make([]byte, 5) + + n, err := r.ReadAt(toRead, 6) + require.NoError(err) + require.Equal(5, n) + require.Equal("World", string(toRead)) + + r.ReadAt(toRead, 0) + require.NoError(err) + require.Equal(5, n) + require.Equal("Hello", string(toRead)) +} + +func TestReadDataEOF(t *testing.T) { + require := require.New(t) + + br := bytes.NewReader(testData) + r, err := NewDiskTeeReader(br) + require.NoError(err) + + toRead := make([]byte, 6) + + n, err := r.ReadAt(toRead, 6) + require.Equal(io.EOF, err) + require.Equal(5, n) + require.Equal("World\x00", string(toRead)) +} diff --git a/torrent/node/file.go b/torrent/node/file.go index 76846d5..9adb6c9 100644 --- a/torrent/node/file.go +++ b/torrent/node/file.go @@ -14,6 +14,7 @@ import ( var _ fs.NodeGetattrer = &File{} var _ fs.NodeOpener = &File{} var _ fs.NodeReader = &File{} +var _ fs.NodeReleaser = &File{} // File is a fuse node for files inside a torrent type File struct { @@ -60,7 +61,7 @@ func (tr *File) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseF r, err := tr.f() if err != nil { log.Println("error opening reader for file", err) - return nil, fuse.FOPEN_KEEP_CACHE, syscall.EIO + return nil, 0, syscall.EIO } tr.r = r @@ -71,6 +72,9 @@ func (tr *File) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseF func (tr *File) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { end := int(math.Min(float64(len(dest)), float64(int64(tr.len)-off))) + if end < 0 { + end = 0 + } buf := dest[:end] @@ -84,3 +88,20 @@ func (tr *File) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int6 buf = buf[:n] return fuse.ReadResultData(buf), fs.OK } + +func (tr *File) Release(ctx context.Context, f fs.FileHandle) syscall.Errno { + log.Println("closing file...") + if tr.r != nil { + closer, ok := tr.r.(io.Closer) + if ok { + if err := closer.Close(); err != nil { + log.Println("error closing file", err) + return syscall.EIO + } + } else { + log.Println("file is not implementing close method") + } + } + + return fs.OK +} diff --git a/torrent/node/root.go b/torrent/node/root.go index f0dca43..dca15af 100644 --- a/torrent/node/root.go +++ b/torrent/node/root.go @@ -29,7 +29,7 @@ func (root *Root) OnAdd(ctx context.Context) { root.pool.Submit(func() { root.AddChild( filepath.Clean(torrent.Name()), - root.NewPersistentInode(ctx, &Folder{t: torrent, pool: root.pool}, fs.StableAttr{ + root.NewPersistentInode(ctx, &Torrent{t: torrent, pool: root.pool}, fs.StableAttr{ Mode: syscall.S_IFDIR, }), true) }) diff --git a/torrent/node/torrent.go b/torrent/node/torrent.go index 26e8fdf..812750e 100644 --- a/torrent/node/torrent.go +++ b/torrent/node/torrent.go @@ -12,17 +12,17 @@ import ( "github.com/panjf2000/ants/v2" ) -var _ fs.NodeOnAdder = &Folder{} -var _ fs.NodeGetattrer = &Folder{} +var _ fs.NodeOnAdder = &Torrent{} +var _ fs.NodeGetattrer = &Torrent{} -type Folder struct { +type Torrent struct { fs.Inode t *torrent.Torrent pool *ants.Pool } -func (folder *Folder) OnAdd(ctx context.Context) { +func (folder *Torrent) OnAdd(ctx context.Context) { for _, file := range folder.t.Files() { file := file LoadNodeByPath( @@ -38,7 +38,7 @@ func (folder *Folder) OnAdd(ctx context.Context) { } } -func (folder *Folder) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno { +func (folder *Torrent) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno { out.Mode = syscall.S_IFDIR & 07777 return fs.OK diff --git a/torrent/node/utils.go b/torrent/node/utils.go index 0c5c04c..be4196f 100644 --- a/torrent/node/utils.go +++ b/torrent/node/utils.go @@ -39,6 +39,13 @@ func LoadNodeByPath(ctx context.Context, pool *ants.Pool, fp string, reader Read ext := path.Ext(base) switch ext { + case ".zip": + n := NewZip(reader, fileLength) + p.AddChild( + strings.TrimRight(base, ext), + p.NewPersistentInode(ctx, n, fs.StableAttr{ + Mode: fuse.S_IFDIR, + }), true) default: n := NewFileWithBlocks( reader, diff --git a/torrent/node/zip.go b/torrent/node/zip.go new file mode 100644 index 0000000..b9535b7 --- /dev/null +++ b/torrent/node/zip.go @@ -0,0 +1,82 @@ +package node + +import ( + "archive/zip" + "context" + "io" + "log" + "syscall" + + "github.com/ajnavarro/distribyted/iio" + "github.com/hanwen/go-fuse/v2/fs" + "github.com/hanwen/go-fuse/v2/fuse" +) + +var _ fs.NodeGetattrer = &Zip{} +var _ fs.NodeOpendirer = &Zip{} + +type Zip struct { + fs.Inode + + reader ReaderFunc + size int64 + files []*zip.File +} + +func NewZip(reader ReaderFunc, size int64) *Zip { + return &Zip{ + reader: reader, + size: size, + } +} + +func (z *Zip) Opendir(ctx context.Context) syscall.Errno { + if z.files == nil { + r, err := z.reader() + if err != nil { + log.Println("error opening reader for zip", err) + return syscall.EIO + } + zr, err := zip.NewReader(r, z.size) + if err != nil { + log.Println("error getting zip reader from reader", err) + return syscall.EIO + } + + for _, f := range zr.File { + f := f + if f.FileInfo().IsDir() { + continue + } + LoadNodeByPath( + ctx, + nil, + f.Name, + func() (io.ReaderAt, error) { + zfr, err := f.Open() + if err != nil { + log.Println("ERROR OPENING ZIP", err) + return nil, err + } + + return iio.NewDiskTeeReader(zfr) + }, + &z.Inode, + int64(f.UncompressedSize64), + 0, + 0, + ) + } + + z.files = zr.File + } + + return fs.OK +} + +func (z *Zip) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno { + out.Mode = syscall.S_IFDIR & 07777 + out.Size = uint64(z.size) + + return fs.OK +}