sub timeout for fs init
All checks were successful
docker / build-docker (linux/386) (push) Successful in 14m50s
docker / build-docker (linux/amd64) (push) Successful in 15m5s
docker / build-docker (linux/arm64/v8) (push) Successful in 23m36s
docker / build-docker (linux/arm/v7) (push) Successful in 24m3s
docker / build-docker (linux/arm64) (push) Successful in 11m55s

This commit is contained in:
royalcat 2024-04-06 16:51:36 +03:00
parent c1003c3314
commit bcda69daad
5 changed files with 57 additions and 28 deletions

View file

@ -34,10 +34,6 @@ func ComponentLog(name string) *slog.Logger {
return defaultLogger.With(slog.String("component", name)) return defaultLogger.With(slog.String("component", name))
} }
func ServiceLog(name string) *slog.Logger {
return ComponentLog("service/" + name)
}
func FunctionLog(log *slog.Logger, name string) *slog.Logger { func FunctionLog(log *slog.Logger, name string) *slog.Logger {
return log.With(slog.String("function", name)) return log.With(slog.String("function", name))
} }

View file

@ -2,6 +2,7 @@ package vfs
import ( import (
"context" "context"
"errors"
"io/fs" "io/fs"
"log/slog" "log/slog"
"reflect" "reflect"
@ -20,11 +21,15 @@ type LogFS struct {
readTimeout time.Duration readTimeout time.Duration
} }
func isLoggableError(err error) bool {
return err != nil && !errors.Is(err, fs.ErrNotExist)
}
var _ Filesystem = (*LogFS)(nil) var _ Filesystem = (*LogFS)(nil)
func WrapLogFS(fs Filesystem) *LogFS { func WrapLogFS(vfs Filesystem) *LogFS {
return &LogFS{ return &LogFS{
fs: fs, fs: vfs,
log: rlog.ComponentLog("fs"), log: rlog.ComponentLog("fs"),
timeout: time.Minute * 3, timeout: time.Minute * 3,
readTimeout: time.Minute, readTimeout: time.Minute,
@ -96,7 +101,7 @@ func (fs *LogFS) Open(ctx context.Context, filename string) (file File, err erro
}() }()
file, err = fs.fs.Open(ctx, filename) file, err = fs.fs.Open(ctx, filename)
if err != nil { if isLoggableError(err) {
fs.log.With("filename", filename).Error("Failed to open file") fs.log.With("filename", filename).Error("Failed to open file")
} }
file = WrapLogFile(file, filename, fs.log, fs.readTimeout) file = WrapLogFile(file, filename, fs.log, fs.readTimeout)
@ -118,18 +123,18 @@ func (fs *LogFS) ReadDir(ctx context.Context, path string) (entries []fs.DirEntr
}() }()
entries, err = fs.fs.ReadDir(ctx, path) entries, err = fs.fs.ReadDir(ctx, path)
if err != nil { if isLoggableError(err) {
fs.log.ErrorContext(ctx, "Failed to read dir", "path", path, "error", err.Error(), "fs-type", reflect.TypeOf(fs.fs).Name()) fs.log.ErrorContext(ctx, "Failed to read dir", "path", path, "error", err.Error(), "fs-type", reflect.TypeOf(fs.fs).Name())
} }
return entries, err return entries, err
} }
// Stat implements Filesystem. // Stat implements Filesystem.
func (fs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo, err error) { func (lfs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo, err error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout) ctx, cancel := context.WithTimeout(ctx, lfs.timeout)
defer cancel() defer cancel()
ctx, span := tracer.Start(ctx, "Stat", ctx, span := tracer.Start(ctx, "Stat",
fs.traceAttrs(attribute.String("filename", filename)), lfs.traceAttrs(attribute.String("filename", filename)),
) )
defer func() { defer func() {
if err != nil { if err != nil {
@ -138,9 +143,9 @@ func (fs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo, e
span.End() span.End()
}() }()
info, err = fs.fs.Stat(ctx, filename) info, err = lfs.fs.Stat(ctx, filename)
if err != nil { if isLoggableError(err) {
fs.log.Error("Failed to stat", "filename", filename, "error", err) lfs.log.Error("Failed to stat", "filename", filename, "error", err)
} }
return info, err return info, err
} }
@ -160,7 +165,7 @@ func (fs *LogFS) Unlink(ctx context.Context, filename string) (err error) {
}() }()
err = fs.fs.Unlink(ctx, filename) err = fs.fs.Unlink(ctx, filename)
if err != nil { if isLoggableError(err) {
fs.log.Error("Failed to stat", "filename", filename, "error", err) fs.log.Error("Failed to stat", "filename", filename, "error", err)
} }
return err return err
@ -210,7 +215,7 @@ func (f *LogFile) Close(ctx context.Context) (err error) {
}() }()
err = f.f.Close(ctx) err = f.f.Close(ctx)
if err != nil { if isLoggableError(err) {
f.log.ErrorContext(ctx, "Failed to close", "error", err) f.log.ErrorContext(ctx, "Failed to close", "error", err)
} }
return err return err
@ -240,7 +245,7 @@ func (f *LogFile) Read(ctx context.Context, p []byte) (n int, err error) {
}() }()
n, err = f.f.Read(ctx, p) n, err = f.f.Read(ctx, p)
if err != nil { if isLoggableError(err) {
f.log.Error("Failed to read", "error", err) f.log.Error("Failed to read", "error", err)
} }
return n, err return n, err
@ -265,7 +270,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e
}() }()
n, err = f.f.ReadAt(ctx, p, off) n, err = f.f.ReadAt(ctx, p, off)
if err != nil { if isLoggableError(err) {
f.log.Error("Failed to read", "offset", off, "error", err) f.log.Error("Failed to read", "offset", off, "error", err)
} }
return n, err return n, err
@ -279,8 +284,8 @@ func (f *LogFile) Size() int64 {
// Stat implements File. // Stat implements File.
func (f *LogFile) Info() (fs.FileInfo, error) { func (f *LogFile) Info() (fs.FileInfo, error) {
info, err := f.f.Info() info, err := f.f.Info()
if err != nil { if isLoggableError(err) {
f.log.Error("Failed to read", "error", err) f.log.Error("Failed to info", "error", err)
} }
return info, err return info, err
} }

View file

@ -119,17 +119,26 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
return nil, err return nil, err
} }
defer file.Close(ctx) defer file.Close(ctx)
nestedfs, err := r.resolver.nestedFs(ctx, filepath, file)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.log.ErrorContext(ctx, "creating fs timed out", "filename", e.Name())
continue
}
err = func() error {
factoryCtx, cancel := subTimeout(ctx)
defer cancel()
nestedfs, err := r.resolver.nestedFs(factoryCtx, filepath, file)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.log.ErrorContext(ctx, "creating fs timed out", "filename", e.Name())
return nil
}
return err
}
out = append(out, nestedfs)
return nil
}()
if err != nil {
return nil, err return nil, err
} }
out = append(out, nestedfs)
} else { } else {
out = append(out, e) out = append(out, e)
} }

View file

@ -2,6 +2,7 @@ package vfs
import ( import (
"context" "context"
"fmt"
"io" "io"
"io/fs" "io/fs"
"path" "path"
@ -413,7 +414,14 @@ func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*tor
r := file.NewReader() r := file.NewReader()
r.SetReadahead(1024 * 1024 * 16) // TODO configurable r.SetReadahead(1024 * 1024 * 16) // TODO configurable
// r.SetResponsive() _, err := r.ReadContext(ctx, make([]byte, 128))
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed initial file read: %w", err)
}
_, err = r.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("failed seeking to start, after initial read: %w", err)
}
return &torrentFile{ return &torrentFile{
name: name, name: name,

View file

@ -1,9 +1,11 @@
package vfs package vfs
import ( import (
"context"
"path" "path"
"strings" "strings"
"sync" "sync"
"time"
) )
const Separator = "/" const Separator = "/"
@ -58,3 +60,12 @@ func OnceValueWOErr[T any](f func() (T, error)) func() (T, error) {
return r1, err return r1, err
} }
} }
func subTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline) / 2
return context.WithTimeout(ctx, timeout)
}
return ctx, func() {}
}