From c51c88e51141ff1ae343b0dea8ddfc9bb250b696 Mon Sep 17 00:00:00 2001 From: Antonio Navarro Perez Date: Wed, 1 Dec 2021 19:59:21 +0100 Subject: [PATCH] Fix storage and torrent reader. (#101) --- .github/workflows/build.yaml | 5 +- fs/storage.go | 16 +++-- fs/storage_test.go | 4 +- fs/torrent.go | 136 +++++++++++++++++++++++------------ fs/torrent_test.go | 34 +++++++-- 5 files changed, 133 insertions(+), 62 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 9092ce2..c570e4f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -119,8 +119,6 @@ jobs: shell: bash run: | make build - if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain. - - name: Cross-compile shell: bash run: | @@ -133,8 +131,7 @@ jobs: if-no-files-found: error name: build-${{ matrix.job_name }} path: bin/* - if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain. - + - name: Release uses: softprops/action-gh-release@v1 if: startsWith(github.ref, 'refs/tags/') diff --git a/fs/storage.go b/fs/storage.go index e0134dc..9271b23 100644 --- a/fs/storage.go +++ b/fs/storage.go @@ -127,17 +127,21 @@ func (s *storage) createParent(p string, f File) error { func (s *storage) Children(path string) (map[string]File, error) { path = clean(path) + files, err := s.getDirFromFs(path) + if err == nil { + return files, nil + } + + if !os.IsNotExist(err) { + return nil, err + } + l := make(map[string]File) for n, f := range s.children[path] { l[n] = f } - if _, ok := s.children[path]; ok { - return l, nil - } - - return s.getDirFromFs(path) - + return l, nil } func (s *storage) Get(path string) (File, error) { diff --git a/fs/storage_test.go b/fs/storage_test.go index 1038d5f..38d4068 100644 --- a/fs/storage_test.go +++ b/fs/storage_test.go @@ -73,8 +73,8 @@ func TestStorage(t *testing.T) { require.Equal(&Dummy{}, file) files, err = s.Children("/path/special_file.test") - require.Error(err) - require.Nil(files) + require.NoError(err) + require.NotNil(files) files, err = s.Children("/path/special_file.test/dir/here") require.NoError(err) diff --git a/fs/torrent.go b/fs/torrent.go index e66f796..33ebd7e 100644 --- a/fs/torrent.go +++ b/fs/torrent.go @@ -6,7 +6,9 @@ import ( "sync" "time" + "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/torrent" + "github.com/distribyted/distribyted/iio" ) var _ Filesystem = &Torrent{} @@ -56,9 +58,9 @@ func (fs *Torrent) load() { <-t.GotInfo() for _, file := range t.Files() { fs.s.Add(&torrentFile{ - reader: file.NewReader(), - len: file.Length(), - timeout: fs.readTimeout, + readerFunc: file.NewReader, + len: file.Length(), + timeout: fs.readTimeout, }, file.Path()) } } @@ -76,15 +78,83 @@ func (fs *Torrent) ReadDir(path string) (map[string]File, error) { return fs.s.Children(path) } +type reader interface { + iio.Reader + missinggo.ReadContexter +} + +type readAtWrapper struct { + timeout int + mu sync.Mutex + + torrent.Reader + io.ReaderAt + io.Closer +} + +func newReadAtWrapper(r torrent.Reader, timeout int) reader { + return &readAtWrapper{Reader: r, timeout: timeout} +} + +func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) { + rw.mu.Lock() + defer rw.mu.Unlock() + _, err := rw.Seek(off, io.SeekStart) + if err != nil { + return 0, err + } + + return readAtLeast(rw, rw.timeout, p, len(p)) +} + +func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) { + if len(buf) < min { + return 0, io.ErrShortBuffer + } + for n < min && err == nil { + var nn int + + ctx, cancel := context.WithCancel(context.Background()) + timer := time.AfterFunc( + time.Duration(timeout)*time.Second, + func() { + cancel() + }, + ) + + nn, err = r.ReadContext(ctx, buf[n:]) + n += nn + + timer.Stop() + } + if n >= min { + err = nil + } else if n > 0 && err == io.EOF { + err = io.ErrUnexpectedEOF + } + return +} + +func (rw *readAtWrapper) Close() error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.Reader.Close() +} + var _ File = &torrentFile{} type torrentFile struct { - mu sync.Mutex + readerFunc func() torrent.Reader + reader reader + len int64 + timeout int +} - reader torrent.Reader - len int64 - - timeout int +func (d *torrentFile) load() { + if d.reader != nil { + return + } + d.reader = newReadAtWrapper(d.readerFunc(), d.timeout) } func (d *torrentFile) Size() int64 { @@ -96,10 +166,18 @@ func (d *torrentFile) IsDir() bool { } func (d *torrentFile) Close() error { - return d.reader.Close() + var err error + if d.reader != nil { + err = d.reader.Close() + } + + d.reader = nil + + return err } func (d *torrentFile) Read(p []byte) (n int, err error) { + d.load() ctx, cancel := context.WithCancel(context.Background()) timer := time.AfterFunc( time.Duration(d.timeout)*time.Second, @@ -113,41 +191,7 @@ func (d *torrentFile) Read(p []byte) (n int, err error) { return d.reader.ReadContext(ctx, p) } -func (d *torrentFile) ReadAt(p []byte, off int64) (int, error) { - d.mu.Lock() - defer d.mu.Unlock() - _, err := d.reader.Seek(off, io.SeekStart) - if err != nil { - return 0, err - } - i, err := d.readAtLeast(p, len(p)) - return i, err -} - -func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) { - if len(buf) < min { - return 0, io.ErrShortBuffer - } - for n < min && err == nil { - var nn int - - ctx, cancel := context.WithCancel(context.Background()) - timer := time.AfterFunc( - time.Duration(d.timeout)*time.Second, - func() { - cancel() - }, - ) - - nn, err = d.reader.ReadContext(ctx, buf[n:]) - n += nn - - timer.Stop() - } - if n >= min { - err = nil - } else if n > 0 && err == io.EOF { - err = io.ErrUnexpectedEOF - } - return +func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) { + d.load() + return d.reader.ReadAt(p, off) } diff --git a/fs/torrent_test.go b/fs/torrent_test.go index a1c09a3..1cb7439 100644 --- a/fs/torrent_test.go +++ b/fs/torrent_test.go @@ -81,7 +81,7 @@ func TestTorrentFilesystem(t *testing.T) { require.NoError(f.Close()) } -func TestReadAtWrapper(t *testing.T) { +func TestReadAtTorrent(t *testing.T) { require := require.New(t) to, err := Cli.AddMagnet(testMagnet) @@ -91,9 +91,9 @@ func TestReadAtWrapper(t *testing.T) { torrFile := to.Files()[0] tf := torrentFile{ - reader: torrFile.NewReader(), - len: torrFile.Length(), - timeout: 500, + readerFunc: torrFile.NewReader, + len: torrFile.Length(), + timeout: 500, } defer tf.Close() @@ -109,3 +109,29 @@ func TestReadAtWrapper(t *testing.T) { require.Equal(5, n) require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead) } + +func TestReadAtWrapper(t *testing.T) { + t.Parallel() + + require := require.New(t) + + to, err := Cli.AddMagnet(testMagnet) + require.NoError(err) + + <-to.GotInfo() + torrFile := to.Files()[0] + + r := newReadAtWrapper(torrFile.NewReader(), 10) + defer r.Close() + + toRead := make([]byte, 5) + n, err := r.ReadAt(toRead, 6) + require.NoError(err) + require.Equal(5, n) + require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead) + + n, err = r.ReadAt(toRead, 0) + require.NoError(err) + require.Equal(5, n) + require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead) +}