From bcda69daadfc194d2da17ecf15a0290d79e5075e Mon Sep 17 00:00:00 2001 From: royalcat Date: Sat, 6 Apr 2024 16:51:36 +0300 Subject: [PATCH] sub timeout for fs init --- pkg/rlog/rlog.go | 4 ---- src/host/vfs/log.go | 37 +++++++++++++++++++++---------------- src/host/vfs/resolver.go | 23 ++++++++++++++++------- src/host/vfs/torrent.go | 10 +++++++++- src/host/vfs/utils.go | 11 +++++++++++ 5 files changed, 57 insertions(+), 28 deletions(-) diff --git a/pkg/rlog/rlog.go b/pkg/rlog/rlog.go index c8e1136..cbe6b75 100644 --- a/pkg/rlog/rlog.go +++ b/pkg/rlog/rlog.go @@ -34,10 +34,6 @@ func ComponentLog(name string) *slog.Logger { return defaultLogger.With(slog.String("component", name)) } -func ServiceLog(name string) *slog.Logger { - return ComponentLog("service/" + name) -} - func FunctionLog(log *slog.Logger, name string) *slog.Logger { return log.With(slog.String("function", name)) } diff --git a/src/host/vfs/log.go b/src/host/vfs/log.go index 1c19e1f..c5519ed 100644 --- a/src/host/vfs/log.go +++ b/src/host/vfs/log.go @@ -2,6 +2,7 @@ package vfs import ( "context" + "errors" "io/fs" "log/slog" "reflect" @@ -20,11 +21,15 @@ type LogFS struct { readTimeout time.Duration } +func isLoggableError(err error) bool { + return err != nil && !errors.Is(err, fs.ErrNotExist) +} + var _ Filesystem = (*LogFS)(nil) -func WrapLogFS(fs Filesystem) *LogFS { +func WrapLogFS(vfs Filesystem) *LogFS { return &LogFS{ - fs: fs, + fs: vfs, log: rlog.ComponentLog("fs"), timeout: time.Minute * 3, readTimeout: time.Minute, @@ -96,7 +101,7 @@ func (fs *LogFS) Open(ctx context.Context, filename string) (file File, err erro }() file, err = fs.fs.Open(ctx, filename) - if err != nil { + if isLoggableError(err) { fs.log.With("filename", filename).Error("Failed to open file") } file = WrapLogFile(file, filename, fs.log, fs.readTimeout) @@ -118,18 +123,18 @@ func (fs *LogFS) ReadDir(ctx context.Context, path string) (entries []fs.DirEntr }() entries, err = fs.fs.ReadDir(ctx, path) - if err != nil { + if isLoggableError(err) { fs.log.ErrorContext(ctx, "Failed to read dir", "path", path, "error", err.Error(), "fs-type", reflect.TypeOf(fs.fs).Name()) } return entries, err } // Stat implements Filesystem. -func (fs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo, err error) { - ctx, cancel := context.WithTimeout(ctx, fs.timeout) +func (lfs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo, err error) { + ctx, cancel := context.WithTimeout(ctx, lfs.timeout) defer cancel() ctx, span := tracer.Start(ctx, "Stat", - fs.traceAttrs(attribute.String("filename", filename)), + lfs.traceAttrs(attribute.String("filename", filename)), ) defer func() { if err != nil { @@ -138,9 +143,9 @@ func (fs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo, e span.End() }() - info, err = fs.fs.Stat(ctx, filename) - if err != nil { - fs.log.Error("Failed to stat", "filename", filename, "error", err) + info, err = lfs.fs.Stat(ctx, filename) + if isLoggableError(err) { + lfs.log.Error("Failed to stat", "filename", filename, "error", err) } return info, err } @@ -160,7 +165,7 @@ func (fs *LogFS) Unlink(ctx context.Context, filename string) (err error) { }() err = fs.fs.Unlink(ctx, filename) - if err != nil { + if isLoggableError(err) { fs.log.Error("Failed to stat", "filename", filename, "error", err) } return err @@ -210,7 +215,7 @@ func (f *LogFile) Close(ctx context.Context) (err error) { }() err = f.f.Close(ctx) - if err != nil { + if isLoggableError(err) { f.log.ErrorContext(ctx, "Failed to close", "error", err) } return err @@ -240,7 +245,7 @@ func (f *LogFile) Read(ctx context.Context, p []byte) (n int, err error) { }() n, err = f.f.Read(ctx, p) - if err != nil { + if isLoggableError(err) { f.log.Error("Failed to read", "error", err) } return n, err @@ -265,7 +270,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e }() n, err = f.f.ReadAt(ctx, p, off) - if err != nil { + if isLoggableError(err) { f.log.Error("Failed to read", "offset", off, "error", err) } return n, err @@ -279,8 +284,8 @@ func (f *LogFile) Size() int64 { // Stat implements File. func (f *LogFile) Info() (fs.FileInfo, error) { info, err := f.f.Info() - if err != nil { - f.log.Error("Failed to read", "error", err) + if isLoggableError(err) { + f.log.Error("Failed to info", "error", err) } return info, err } diff --git a/src/host/vfs/resolver.go b/src/host/vfs/resolver.go index 6573ab5..de028e5 100644 --- a/src/host/vfs/resolver.go +++ b/src/host/vfs/resolver.go @@ -119,17 +119,26 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er return nil, err } defer file.Close(ctx) - nestedfs, err := r.resolver.nestedFs(ctx, filepath, file) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - r.log.ErrorContext(ctx, "creating fs timed out", "filename", e.Name()) - continue - } + err = func() error { + factoryCtx, cancel := subTimeout(ctx) + defer cancel() + nestedfs, err := r.resolver.nestedFs(factoryCtx, filepath, file) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + r.log.ErrorContext(ctx, "creating fs timed out", "filename", e.Name()) + return nil + } + + return err + } + out = append(out, nestedfs) + return nil + }() + if err != nil { return nil, err } - out = append(out, nestedfs) } else { out = append(out, e) } diff --git a/src/host/vfs/torrent.go b/src/host/vfs/torrent.go index 88bd057..dd75760 100644 --- a/src/host/vfs/torrent.go +++ b/src/host/vfs/torrent.go @@ -2,6 +2,7 @@ package vfs import ( "context" + "fmt" "io" "io/fs" "path" @@ -413,7 +414,14 @@ func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*tor r := file.NewReader() r.SetReadahead(1024 * 1024 * 16) // TODO configurable - // r.SetResponsive() + _, err := r.ReadContext(ctx, make([]byte, 128)) + if err != nil && err != io.EOF { + return nil, fmt.Errorf("failed initial file read: %w", err) + } + _, err = r.Seek(0, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("failed seeking to start, after initial read: %w", err) + } return &torrentFile{ name: name, diff --git a/src/host/vfs/utils.go b/src/host/vfs/utils.go index 0cfca90..bbe8cbb 100644 --- a/src/host/vfs/utils.go +++ b/src/host/vfs/utils.go @@ -1,9 +1,11 @@ package vfs import ( + "context" "path" "strings" "sync" + "time" ) const Separator = "/" @@ -58,3 +60,12 @@ func OnceValueWOErr[T any](f func() (T, error)) func() (T, error) { return r1, err } } + +func subTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + if deadline, ok := ctx.Deadline(); ok { + timeout := time.Until(deadline) / 2 + return context.WithTimeout(ctx, timeout) + } + + return ctx, func() {} +}