diff --git a/src/host/vfs/torrent.go b/src/host/vfs/torrent.go index 70e70f2..88bd057 100644 --- a/src/host/vfs/torrent.go +++ b/src/host/vfs/torrent.go @@ -8,6 +8,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "git.kmsign.ru/royalcat/tstor/src/host/controller" @@ -25,6 +26,8 @@ type TorrentFs struct { filesCache map[string]File + lastAccessTimeout atomic.Pointer[time.Time] + resolver *resolver } @@ -248,62 +251,104 @@ func (fs *TorrentFs) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption } // Stat implements Filesystem. -func (fs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { +func (tfs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { ctx, span := tracer.Start(ctx, "Stat", - fs.traceAttrs(attribute.String("filename", filename)), + tfs.traceAttrs(attribute.String("filename", filename)), ) defer span.End() if isRoot(filename) { - return fs, nil + return tfs, nil } - fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen) + fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { + lastReadTimeout := tfs.lastAccessTimeout.Load() + if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files + span.SetAttributes(attribute.Bool("short_timeout", true)) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Millisecond) + defer cancel() + } + defer func() { + if err == context.DeadlineExceeded { + now := time.Now() + tfs.lastAccessTimeout.Store(&now) + } + }() + return nestedFs.Stat(ctx, nestedFsPath) } - return fs.rawStat(ctx, fsPath) + return tfs.rawStat(ctx, fsPath) } -func (fs *TorrentFs) Open(ctx context.Context, filename string) (File, error) { +func (tfs *TorrentFs) Open(ctx context.Context, filename string) (file File, err error) { ctx, span := tracer.Start(ctx, "Open", - fs.traceAttrs(attribute.String("filename", filename)), + tfs.traceAttrs(attribute.String("filename", filename)), ) defer span.End() if isRoot(filename) { - return newDirFile(fs.name), nil + return newDirFile(tfs.name), nil } - fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen) + fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { + lastReadTimeout := tfs.lastAccessTimeout.Load() + if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files + span.SetAttributes(attribute.Bool("short_timeout", true)) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Millisecond) + defer cancel() + } + defer func() { + if err == context.DeadlineExceeded { + now := time.Now() + tfs.lastAccessTimeout.Store(&now) + } + }() + return nestedFs.Open(ctx, nestedFsPath) } - return fs.rawOpen(ctx, fsPath) + return tfs.rawOpen(ctx, fsPath) } -func (fs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { +func (tfs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { ctx, span := tracer.Start(ctx, "ReadDir", - fs.traceAttrs(attribute.String("name", name)), + tfs.traceAttrs(attribute.String("name", name)), ) defer span.End() - fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, name, fs.rawOpen) + fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, name, tfs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { + lastReadTimeout := tfs.lastAccessTimeout.Load() + if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files + span.SetAttributes(attribute.Bool("short_timeout", true)) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Millisecond) + defer cancel() + } + defer func() { + if err == context.DeadlineExceeded { + now := time.Now() + tfs.lastAccessTimeout.Store(&now) + } + }() + return nestedFs.ReadDir(ctx, nestedFsPath) } - files, err := fs.files(ctx) + files, err := tfs.files(ctx) if err != nil { return nil, err } @@ -347,16 +392,16 @@ var _ File = (*torrentFile)(nil) type torrentFile struct { name string - mu sync.Mutex + mu sync.RWMutex tr torrent.Reader - lastReadTimeout time.Time + lastReadTimeout atomic.Pointer[time.Time] file *torrent.File } -const secondaryTimeout = time.Hour +const secondaryTimeout = time.Hour * 24 func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) { // select { @@ -416,10 +461,11 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) { span.End() }() - tf.mu.Lock() - defer tf.mu.Unlock() + tf.mu.RLock() + defer tf.mu.RUnlock() - if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files + lastReadTimeout := tf.lastReadTimeout.Load() + if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files span.SetAttributes(attribute.Bool("short_timeout", true)) var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Millisecond) @@ -427,7 +473,8 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) { } defer func() { if err == context.DeadlineExceeded { - tf.lastReadTimeout = time.Now() + now := time.Now() + tf.lastReadTimeout.Store(&now) } }() @@ -443,10 +490,11 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, span.End() }() - tf.mu.Lock() - defer tf.mu.Unlock() + tf.mu.RLock() + defer tf.mu.RUnlock() - if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files + lastReadTimeout := tf.lastReadTimeout.Load() + if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { /// make short timeout for already faliled files span.SetAttributes(attribute.Bool("short_timeout", true)) var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Millisecond) @@ -454,7 +502,8 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, } defer func() { if err == context.DeadlineExceeded { - tf.lastReadTimeout = time.Now() + now := time.Now() + tf.lastReadTimeout.Store(&now) } }()