torrent fs more adaptive timeouts
All checks were successful
docker / build-docker (linux/amd64) (push) Successful in 4m1s
docker / build-docker (linux/386) (push) Successful in 4m11s
docker / build-docker (linux/arm64) (push) Successful in 17m21s
docker / build-docker (linux/arm64/v8) (push) Successful in 17m27s
docker / build-docker (linux/arm/v7) (push) Successful in 18m38s

This commit is contained in:
royalcat 2024-03-30 13:16:13 +03:00
parent a8002616eb
commit c65fd89887

View file

@ -8,6 +8,7 @@ import (
"slices" "slices"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/host/controller"
@ -25,6 +26,8 @@ type TorrentFs struct {
filesCache map[string]File filesCache map[string]File
lastAccessTimeout atomic.Pointer[time.Time]
resolver *resolver resolver *resolver
} }
@ -248,62 +251,104 @@ func (fs *TorrentFs) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption
} }
// Stat implements Filesystem. // Stat implements Filesystem.
func (fs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) { func (tfs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
ctx, span := tracer.Start(ctx, "Stat", ctx, span := tracer.Start(ctx, "Stat",
fs.traceAttrs(attribute.String("filename", filename)), tfs.traceAttrs(attribute.String("filename", filename)),
) )
defer span.End() defer span.End()
if isRoot(filename) { if isRoot(filename) {
return fs, nil return tfs, nil
} }
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen) fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if nestedFs != nil { if nestedFs != nil {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
return nestedFs.Stat(ctx, nestedFsPath) return nestedFs.Stat(ctx, nestedFsPath)
} }
return fs.rawStat(ctx, fsPath) return tfs.rawStat(ctx, fsPath)
} }
func (fs *TorrentFs) Open(ctx context.Context, filename string) (File, error) { func (tfs *TorrentFs) Open(ctx context.Context, filename string) (file File, err error) {
ctx, span := tracer.Start(ctx, "Open", ctx, span := tracer.Start(ctx, "Open",
fs.traceAttrs(attribute.String("filename", filename)), tfs.traceAttrs(attribute.String("filename", filename)),
) )
defer span.End() defer span.End()
if isRoot(filename) { if isRoot(filename) {
return newDirFile(fs.name), nil return newDirFile(tfs.name), nil
} }
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen) fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if nestedFs != nil { if nestedFs != nil {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
return nestedFs.Open(ctx, nestedFsPath) return nestedFs.Open(ctx, nestedFsPath)
} }
return fs.rawOpen(ctx, fsPath) return tfs.rawOpen(ctx, fsPath)
} }
func (fs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { func (tfs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
ctx, span := tracer.Start(ctx, "ReadDir", ctx, span := tracer.Start(ctx, "ReadDir",
fs.traceAttrs(attribute.String("name", name)), tfs.traceAttrs(attribute.String("name", name)),
) )
defer span.End() defer span.End()
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, name, fs.rawOpen) fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, name, tfs.rawOpen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if nestedFs != nil { if nestedFs != nil {
lastReadTimeout := tfs.lastAccessTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
now := time.Now()
tfs.lastAccessTimeout.Store(&now)
}
}()
return nestedFs.ReadDir(ctx, nestedFsPath) return nestedFs.ReadDir(ctx, nestedFsPath)
} }
files, err := fs.files(ctx) files, err := tfs.files(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -347,16 +392,16 @@ var _ File = (*torrentFile)(nil)
type torrentFile struct { type torrentFile struct {
name string name string
mu sync.Mutex mu sync.RWMutex
tr torrent.Reader tr torrent.Reader
lastReadTimeout time.Time lastReadTimeout atomic.Pointer[time.Time]
file *torrent.File file *torrent.File
} }
const secondaryTimeout = time.Hour const secondaryTimeout = time.Hour * 24
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) { func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
// select { // select {
@ -416,10 +461,11 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
span.End() span.End()
}() }()
tf.mu.Lock() tf.mu.RLock()
defer tf.mu.Unlock() defer tf.mu.RUnlock()
if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files lastReadTimeout := tf.lastReadTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true)) span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond) ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
@ -427,7 +473,8 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
} }
defer func() { defer func() {
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
tf.lastReadTimeout = time.Now() now := time.Now()
tf.lastReadTimeout.Store(&now)
} }
}() }()
@ -443,10 +490,11 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int,
span.End() span.End()
}() }()
tf.mu.Lock() tf.mu.RLock()
defer tf.mu.Unlock() defer tf.mu.RUnlock()
if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files lastReadTimeout := tf.lastReadTimeout.Load()
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { /// make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true)) span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond) ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
@ -454,7 +502,8 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int,
} }
defer func() { defer func() {
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
tf.lastReadTimeout = time.Now() now := time.Now()
tf.lastReadTimeout.Store(&now)
} }
}() }()