Fix storage and torrent reader. (#101)

This commit is contained in:
Antonio Navarro Perez 2021-12-01 19:59:21 +01:00 committed by GitHub
parent 0b89fda8e9
commit c51c88e511
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 133 additions and 62 deletions

View file

@ -119,8 +119,6 @@ jobs:
shell: bash
run: |
make build
if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain.
- name: Cross-compile
shell: bash
run: |
@ -133,8 +131,7 @@ jobs:
if-no-files-found: error
name: build-${{ matrix.job_name }}
path: bin/*
if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain.
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')

View file

@ -127,17 +127,21 @@ func (s *storage) createParent(p string, f File) error {
func (s *storage) Children(path string) (map[string]File, error) {
path = clean(path)
files, err := s.getDirFromFs(path)
if err == nil {
return files, nil
}
if !os.IsNotExist(err) {
return nil, err
}
l := make(map[string]File)
for n, f := range s.children[path] {
l[n] = f
}
if _, ok := s.children[path]; ok {
return l, nil
}
return s.getDirFromFs(path)
return l, nil
}
func (s *storage) Get(path string) (File, error) {

View file

@ -73,8 +73,8 @@ func TestStorage(t *testing.T) {
require.Equal(&Dummy{}, file)
files, err = s.Children("/path/special_file.test")
require.Error(err)
require.Nil(files)
require.NoError(err)
require.NotNil(files)
files, err = s.Children("/path/special_file.test/dir/here")
require.NoError(err)

View file

@ -6,7 +6,9 @@ import (
"sync"
"time"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent"
"github.com/distribyted/distribyted/iio"
)
var _ Filesystem = &Torrent{}
@ -56,9 +58,9 @@ func (fs *Torrent) load() {
<-t.GotInfo()
for _, file := range t.Files() {
fs.s.Add(&torrentFile{
reader: file.NewReader(),
len: file.Length(),
timeout: fs.readTimeout,
readerFunc: file.NewReader,
len: file.Length(),
timeout: fs.readTimeout,
}, file.Path())
}
}
@ -76,15 +78,83 @@ func (fs *Torrent) ReadDir(path string) (map[string]File, error) {
return fs.s.Children(path)
}
type reader interface {
iio.Reader
missinggo.ReadContexter
}
type readAtWrapper struct {
timeout int
mu sync.Mutex
torrent.Reader
io.ReaderAt
io.Closer
}
func newReadAtWrapper(r torrent.Reader, timeout int) reader {
return &readAtWrapper{Reader: r, timeout: timeout}
}
func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) {
rw.mu.Lock()
defer rw.mu.Unlock()
_, err := rw.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
return readAtLeast(rw, rw.timeout, p, len(p))
}
func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(timeout)*time.Second,
func() {
cancel()
},
)
nn, err = r.ReadContext(ctx, buf[n:])
n += nn
timer.Stop()
}
if n >= min {
err = nil
} else if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}
func (rw *readAtWrapper) Close() error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.Reader.Close()
}
var _ File = &torrentFile{}
type torrentFile struct {
mu sync.Mutex
readerFunc func() torrent.Reader
reader reader
len int64
timeout int
}
reader torrent.Reader
len int64
timeout int
func (d *torrentFile) load() {
if d.reader != nil {
return
}
d.reader = newReadAtWrapper(d.readerFunc(), d.timeout)
}
func (d *torrentFile) Size() int64 {
@ -96,10 +166,18 @@ func (d *torrentFile) IsDir() bool {
}
func (d *torrentFile) Close() error {
return d.reader.Close()
var err error
if d.reader != nil {
err = d.reader.Close()
}
d.reader = nil
return err
}
func (d *torrentFile) Read(p []byte) (n int, err error) {
d.load()
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
@ -113,41 +191,7 @@ func (d *torrentFile) Read(p []byte) (n int, err error) {
return d.reader.ReadContext(ctx, p)
}
func (d *torrentFile) ReadAt(p []byte, off int64) (int, error) {
d.mu.Lock()
defer d.mu.Unlock()
_, err := d.reader.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
i, err := d.readAtLeast(p, len(p))
return i, err
}
func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
func() {
cancel()
},
)
nn, err = d.reader.ReadContext(ctx, buf[n:])
n += nn
timer.Stop()
}
if n >= min {
err = nil
} else if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) {
d.load()
return d.reader.ReadAt(p, off)
}

View file

@ -81,7 +81,7 @@ func TestTorrentFilesystem(t *testing.T) {
require.NoError(f.Close())
}
func TestReadAtWrapper(t *testing.T) {
func TestReadAtTorrent(t *testing.T) {
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
@ -91,9 +91,9 @@ func TestReadAtWrapper(t *testing.T) {
torrFile := to.Files()[0]
tf := torrentFile{
reader: torrFile.NewReader(),
len: torrFile.Length(),
timeout: 500,
readerFunc: torrFile.NewReader,
len: torrFile.Length(),
timeout: 500,
}
defer tf.Close()
@ -109,3 +109,29 @@ func TestReadAtWrapper(t *testing.T) {
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}
func TestReadAtWrapper(t *testing.T) {
t.Parallel()
require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
<-to.GotInfo()
torrFile := to.Files()[0]
r := newReadAtWrapper(torrFile.NewReader(), 10)
defer r.Close()
toRead := make([]byte, 5)
n, err := r.ReadAt(toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = r.ReadAt(toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}