torrent local read timeout
This commit is contained in:
parent
7651c22dbc
commit
3afd12dce3
1 changed files with 26 additions and 13 deletions
|
@ -27,13 +27,16 @@ type TorrentFS struct {
|
|||
|
||||
filesCache map[string]vfs.File
|
||||
|
||||
lastAccessTimeout atomic.Pointer[time.Time]
|
||||
lastTorrentReadTimeout atomic.Pointer[time.Time]
|
||||
|
||||
resolver *vfs.Resolver
|
||||
}
|
||||
|
||||
var _ vfs.Filesystem = (*TorrentFS)(nil)
|
||||
|
||||
const shortTimeout = time.Millisecond
|
||||
const lowTimeout = time.Second * 5
|
||||
|
||||
func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
|
||||
c, err := s.LoadTorrent(ctx, f)
|
||||
if err != nil {
|
||||
|
@ -113,7 +116,7 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
|
|||
fs.filesCache = make(map[string]vfs.File)
|
||||
for _, file := range files {
|
||||
p := vfs.AbsPath(file.Path())
|
||||
tf, err := openTorrentFile(ctx, path.Base(p), file)
|
||||
tf, err := openTorrentFile(ctx, path.Base(p), file, &fs.lastTorrentReadTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -233,11 +236,11 @@ func (fs *TorrentFS) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption
|
|||
}
|
||||
|
||||
func (tfs *TorrentFS) readContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||
lastReadTimeout := tfs.lastAccessTimeout.Load()
|
||||
lastReadTimeout := tfs.lastTorrentReadTimeout.Load()
|
||||
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
||||
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true))
|
||||
|
||||
return context.WithTimeout(ctx, time.Millisecond)
|
||||
return context.WithTimeout(ctx, shortTimeout)
|
||||
}
|
||||
|
||||
return ctx, func() {}
|
||||
|
@ -260,7 +263,7 @@ func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e
|
|||
cancel()
|
||||
if err == context.DeadlineExceeded {
|
||||
now := time.Now()
|
||||
tfs.lastAccessTimeout.Store(&now)
|
||||
tfs.lastTorrentReadTimeout.Store(&now)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -290,7 +293,7 @@ func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file vfs.File,
|
|||
cancel()
|
||||
if err == context.DeadlineExceeded {
|
||||
now := time.Now()
|
||||
tfs.lastAccessTimeout.Store(&now)
|
||||
tfs.lastTorrentReadTimeout.Store(&now)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -318,7 +321,7 @@ func (tfs *TorrentFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry,
|
|||
cancel()
|
||||
if err == context.DeadlineExceeded {
|
||||
now := time.Now()
|
||||
tfs.lastAccessTimeout.Store(&now)
|
||||
tfs.lastTorrentReadTimeout.Store(&now)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -377,14 +380,15 @@ type torrentFile struct {
|
|||
|
||||
tr torrent.Reader
|
||||
|
||||
lastReadTimeout atomic.Pointer[time.Time]
|
||||
lastReadTimeout atomic.Pointer[time.Time]
|
||||
lastTorrentReadTimeout *atomic.Pointer[time.Time]
|
||||
|
||||
file *torrent.File
|
||||
}
|
||||
|
||||
const secondaryTimeout = time.Hour * 24
|
||||
|
||||
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
|
||||
func openTorrentFile(ctx context.Context, name string, file *torrent.File, lastTorrentReadTimeout *atomic.Pointer[time.Time]) (*torrentFile, error) {
|
||||
select {
|
||||
case <-file.Torrent().GotInfo():
|
||||
break
|
||||
|
@ -404,9 +408,10 @@ func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*tor
|
|||
}
|
||||
|
||||
return &torrentFile{
|
||||
name: name,
|
||||
tr: r,
|
||||
file: file,
|
||||
name: name,
|
||||
tr: r,
|
||||
file: file,
|
||||
lastTorrentReadTimeout: lastTorrentReadTimeout,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -443,7 +448,13 @@ func (tf *torrentFile) readTimeout(ctx context.Context) (context.Context, contex
|
|||
lastReadTimeout := tf.lastReadTimeout.Load()
|
||||
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
||||
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true))
|
||||
return context.WithTimeout(ctx, time.Millisecond)
|
||||
return context.WithTimeout(ctx, shortTimeout)
|
||||
}
|
||||
|
||||
lastTorrentReadTimeout := tf.lastTorrentReadTimeout.Load()
|
||||
if lastTorrentReadTimeout != nil && time.Since(*lastTorrentReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
||||
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("low_timeout", true))
|
||||
return context.WithTimeout(ctx, lowTimeout)
|
||||
}
|
||||
|
||||
return ctx, func() {}
|
||||
|
@ -468,6 +479,7 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
|||
if err == context.DeadlineExceeded {
|
||||
now := time.Now()
|
||||
tf.lastReadTimeout.Store(&now)
|
||||
tf.lastTorrentReadTimeout.Store(&now)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -492,6 +504,7 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int,
|
|||
if err == context.DeadlineExceeded {
|
||||
now := time.Now()
|
||||
tf.lastReadTimeout.Store(&now)
|
||||
tf.lastTorrentReadTimeout.Store(&now)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
Loading…
Reference in a new issue