tstor/fs/torrent.go

198 lines
3.4 KiB
Go
Raw Normal View History

package fs
import (
2021-11-29 10:07:54 +00:00
"context"
"io"
"sync"
2021-11-29 10:07:54 +00:00
"time"
2021-12-01 18:59:21 +00:00
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent"
2021-12-01 18:59:21 +00:00
"github.com/distribyted/distribyted/iio"
)
var _ Filesystem = &Torrent{}
type Torrent struct {
2021-11-29 10:07:54 +00:00
mu sync.RWMutex
ts map[string]*torrent.Torrent
s *storage
loaded bool
readTimeout int
}
2021-11-29 10:07:54 +00:00
func NewTorrent(readTimeout int) *Torrent {
return &Torrent{
2021-11-29 10:07:54 +00:00
s: newStorage(SupportedFactories),
ts: make(map[string]*torrent.Torrent),
readTimeout: readTimeout,
}
}
func (fs *Torrent) AddTorrent(t *torrent.Torrent) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.loaded = false
fs.ts[t.InfoHash().HexString()] = t
}
func (fs *Torrent) RemoveTorrent(h string) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.s.Clear()
fs.loaded = false
delete(fs.ts, h)
}
func (fs *Torrent) load() {
if fs.loaded {
return
}
2021-11-29 10:07:54 +00:00
fs.mu.RLock()
defer fs.mu.RUnlock()
for _, t := range fs.ts {
<-t.GotInfo()
for _, file := range t.Files() {
2021-11-29 10:07:54 +00:00
fs.s.Add(&torrentFile{
2021-12-01 18:59:21 +00:00
readerFunc: file.NewReader,
len: file.Length(),
timeout: fs.readTimeout,
2021-11-29 10:07:54 +00:00
}, file.Path())
}
}
fs.loaded = true
}
func (fs *Torrent) Open(filename string) (File, error) {
fs.load()
return fs.s.Get(filename)
}
func (fs *Torrent) ReadDir(path string) (map[string]File, error) {
fs.load()
2021-11-29 10:07:54 +00:00
return fs.s.Children(path)
}
2021-12-01 18:59:21 +00:00
type reader interface {
iio.Reader
missinggo.ReadContexter
}
2021-12-01 18:59:21 +00:00
type readAtWrapper struct {
timeout int
mu sync.Mutex
2021-12-01 18:59:21 +00:00
torrent.Reader
io.ReaderAt
io.Closer
}
2021-12-01 18:59:21 +00:00
func newReadAtWrapper(r torrent.Reader, timeout int) reader {
return &readAtWrapper{Reader: r, timeout: timeout}
2021-11-29 10:07:54 +00:00
}
2021-12-01 18:59:21 +00:00
func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) {
rw.mu.Lock()
defer rw.mu.Unlock()
_, err := rw.Seek(off, io.SeekStart)
2021-11-29 10:07:54 +00:00
if err != nil {
return 0, err
}
2021-12-01 18:59:21 +00:00
return readAtLeast(rw, rw.timeout, p, len(p))
}
2021-12-01 18:59:21 +00:00
func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) {
2021-11-29 10:07:54 +00:00
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
2021-12-01 18:59:21 +00:00
time.Duration(timeout)*time.Second,
2021-11-29 10:07:54 +00:00
func() {
cancel()
},
)
2021-12-01 18:59:21 +00:00
nn, err = r.ReadContext(ctx, buf[n:])
2021-11-29 10:07:54 +00:00
n += nn
timer.Stop()
}
if n >= min {
err = nil
} else if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}
2021-12-01 18:59:21 +00:00
func (rw *readAtWrapper) Close() error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.Reader.Close()
}
var _ File = &torrentFile{}
type torrentFile struct {
readerFunc func() torrent.Reader
reader reader
len int64
timeout int
}
func (d *torrentFile) load() {
if d.reader != nil {
return
}
d.reader = newReadAtWrapper(d.readerFunc(), d.timeout)
}
func (d *torrentFile) Size() int64 {
return d.len
}
func (d *torrentFile) IsDir() bool {
return false
}
func (d *torrentFile) Close() error {
var err error
if d.reader != nil {
err = d.reader.Close()
}
d.reader = nil
return err
}
func (d *torrentFile) Read(p []byte) (n int, err error) {
d.load()
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
func() {
cancel()
},
)
defer timer.Stop()
return d.reader.ReadContext(ctx, p)
}
func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) {
d.load()
return d.reader.ReadAt(p, off)
}