diff --git a/src/sources/torrent/fs.go b/src/sources/torrent/fs.go index 024bd78..396ae97 100644 --- a/src/sources/torrent/fs.go +++ b/src/sources/torrent/fs.go @@ -27,13 +27,16 @@ type TorrentFS struct { filesCache map[string]vfs.File - lastAccessTimeout atomic.Pointer[time.Time] + lastTorrentReadTimeout atomic.Pointer[time.Time] resolver *vfs.Resolver } var _ vfs.Filesystem = (*TorrentFS)(nil) +const shortTimeout = time.Millisecond +const lowTimeout = time.Second * 5 + func (s *Daemon) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) { c, err := s.LoadTorrent(ctx, f) if err != nil { @@ -113,7 +116,7 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) { fs.filesCache = make(map[string]vfs.File) for _, file := range files { p := vfs.AbsPath(file.Path()) - tf, err := openTorrentFile(ctx, path.Base(p), file) + tf, err := openTorrentFile(ctx, path.Base(p), file, &fs.lastTorrentReadTimeout) if err != nil { return nil, err } @@ -233,11 +236,11 @@ func (fs *TorrentFS) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption } func (tfs *TorrentFS) readContext(ctx context.Context) (context.Context, context.CancelFunc) { - lastReadTimeout := tfs.lastAccessTimeout.Load() + lastReadTimeout := tfs.lastTorrentReadTimeout.Load() if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true)) - return context.WithTimeout(ctx, time.Millisecond) + return context.WithTimeout(ctx, shortTimeout) } return ctx, func() {} @@ -260,7 +263,7 @@ func (tfs *TorrentFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e cancel() if err == context.DeadlineExceeded { now := time.Now() - tfs.lastAccessTimeout.Store(&now) + tfs.lastTorrentReadTimeout.Store(&now) } }() @@ -290,7 +293,7 @@ func (tfs *TorrentFS) Open(ctx context.Context, filename string) (file vfs.File, cancel() if err == context.DeadlineExceeded { now := time.Now() - tfs.lastAccessTimeout.Store(&now) + tfs.lastTorrentReadTimeout.Store(&now) } }() @@ -318,7 +321,7 @@ func (tfs *TorrentFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, cancel() if err == context.DeadlineExceeded { now := time.Now() - tfs.lastAccessTimeout.Store(&now) + tfs.lastTorrentReadTimeout.Store(&now) } }() @@ -377,14 +380,15 @@ type torrentFile struct { tr torrent.Reader - lastReadTimeout atomic.Pointer[time.Time] + lastReadTimeout atomic.Pointer[time.Time] + lastTorrentReadTimeout *atomic.Pointer[time.Time] file *torrent.File } const secondaryTimeout = time.Hour * 24 -func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) { +func openTorrentFile(ctx context.Context, name string, file *torrent.File, lastTorrentReadTimeout *atomic.Pointer[time.Time]) (*torrentFile, error) { select { case <-file.Torrent().GotInfo(): break @@ -404,9 +408,10 @@ func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*tor } return &torrentFile{ - name: name, - tr: r, - file: file, + name: name, + tr: r, + file: file, + lastTorrentReadTimeout: lastTorrentReadTimeout, }, nil } @@ -443,7 +448,13 @@ func (tf *torrentFile) readTimeout(ctx context.Context) (context.Context, contex lastReadTimeout := tf.lastReadTimeout.Load() if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("short_timeout", true)) - return context.WithTimeout(ctx, time.Millisecond) + return context.WithTimeout(ctx, shortTimeout) + } + + lastTorrentReadTimeout := tf.lastTorrentReadTimeout.Load() + if lastTorrentReadTimeout != nil && time.Since(*lastTorrentReadTimeout) < secondaryTimeout { // make short timeout for already faliled files + trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("low_timeout", true)) + return context.WithTimeout(ctx, lowTimeout) } return ctx, func() {} @@ -468,6 +479,7 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) { if err == context.DeadlineExceeded { now := time.Now() tf.lastReadTimeout.Store(&now) + tf.lastTorrentReadTimeout.Store(&now) } }() @@ -492,6 +504,7 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, if err == context.DeadlineExceeded { now := time.Now() tf.lastReadTimeout.Store(&now) + tf.lastTorrentReadTimeout.Store(&now) } }()