Compare commits
3 commits
3a2b615f55
...
77d19e08fc
Author | SHA1 | Date | |
---|---|---|---|
77d19e08fc | |||
2d72790c1a | |||
bf2dac5cf1 |
5 changed files with 43 additions and 13 deletions
|
@ -109,7 +109,10 @@ func run(configPath string) error {
|
|||
vfs.NewCtxBillyFs("/", ctxbilly.WrapFileSystem(sourceFs)),
|
||||
tsrv, ytdlpsrv,
|
||||
)
|
||||
sfs = vfs.WrapLogFS(sfs)
|
||||
sfs, err = vfs.WrapLogFS(sfs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if conf.Mounts.Fuse.Enabled {
|
||||
mh := fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path)
|
||||
|
|
|
@ -52,6 +52,7 @@ func onRead(ctx context.Context, w *response, userHandle Handler) error {
|
|||
}
|
||||
return &NFSStatusError{NFSStatusAccess, err}
|
||||
}
|
||||
defer fh.Close(ctx)
|
||||
|
||||
resp := nfsReadResponse{}
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
|
@ -51,7 +50,6 @@ var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvh
|
|||
// NewKvHandler provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
|
||||
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) {
|
||||
opts := kvbadger.DefaultOptions(path.Join(config.CachePath, "handlers"))
|
||||
opts.DefaultTTL = time.Hour
|
||||
opts.BadgerOptions.Logger = log.BadgerLogger("nfs", "kvhandler")
|
||||
|
||||
activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts)
|
||||
|
@ -156,8 +154,7 @@ func (c *CachingHandler) InvalidateHandle(ctx context.Context, fs nfs.Filesystem
|
|||
return c.activeHandles.Delete(ctx, id)
|
||||
}
|
||||
|
||||
// const maxInt = int(^uint(0) >> 1)
|
||||
const maxHandlers = 8129
|
||||
const maxHandlers = int(^uint(0) >> 1)
|
||||
|
||||
// HandleLimit exports how many file handles can be safely stored by this cache.
|
||||
func (c *CachingHandler) HandleLimit() int {
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/royalcat/ctxio"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
type File interface {
|
||||
|
@ -24,8 +23,6 @@ type File interface {
|
|||
|
||||
var ErrNotImplemented = errors.New("not implemented")
|
||||
|
||||
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/vfs")
|
||||
|
||||
type Filesystem interface {
|
||||
// Open opens the named file for reading. If successful, methods on the
|
||||
// returned file can be used for reading; the associated file descriptor has
|
||||
|
|
|
@ -3,19 +3,32 @@ package vfs
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var (
|
||||
meter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/vfs")
|
||||
tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/vfs")
|
||||
)
|
||||
|
||||
type fsTelemetry struct {
|
||||
openedFiles metric.Int64UpDownCounter
|
||||
}
|
||||
|
||||
type LogFS struct {
|
||||
fs Filesystem
|
||||
log *rlog.Logger
|
||||
tel *fsTelemetry
|
||||
|
||||
timeout time.Duration
|
||||
readTimeout time.Duration
|
||||
|
@ -27,13 +40,19 @@ func isLoggableError(err error) bool {
|
|||
|
||||
var _ Filesystem = (*LogFS)(nil)
|
||||
|
||||
func WrapLogFS(vfs Filesystem) *LogFS {
|
||||
func WrapLogFS(vfs Filesystem) (*LogFS, error) {
|
||||
openedFiles, err := meter.Int64UpDownCounter("vfs.opened_files")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create opened_files metric: %w", err)
|
||||
}
|
||||
|
||||
return &LogFS{
|
||||
fs: vfs,
|
||||
log: rlog.Component("logfs"),
|
||||
tel: &fsTelemetry{openedFiles: openedFiles},
|
||||
timeout: time.Minute * 3,
|
||||
readTimeout: time.Minute,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ModTime implements Filesystem.
|
||||
|
@ -104,7 +123,12 @@ func (fs *LogFS) Open(ctx context.Context, filename string) (file File, err erro
|
|||
if isLoggableError(err) {
|
||||
fs.log.Error(ctx, "Failed to open file")
|
||||
}
|
||||
file = WrapLogFile(file, filename, fs.log, fs.readTimeout)
|
||||
file = wrapLogFile(file, filename, fs.log, fs.readTimeout, fs.tel)
|
||||
|
||||
if file != nil {
|
||||
fs.tel.openedFiles.Add(ctx, 1)
|
||||
}
|
||||
|
||||
return file, err
|
||||
}
|
||||
|
||||
|
@ -178,7 +202,9 @@ type LogFile struct {
|
|||
filename string
|
||||
f File
|
||||
|
||||
log *rlog.Logger
|
||||
log *rlog.Logger
|
||||
tel *fsTelemetry
|
||||
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
|
@ -194,11 +220,12 @@ func (f *LogFile) Type() fs.FileMode {
|
|||
|
||||
var _ File = (*LogFile)(nil)
|
||||
|
||||
func WrapLogFile(f File, filename string, log *rlog.Logger, timeout time.Duration) *LogFile {
|
||||
func wrapLogFile(f File, filename string, log *rlog.Logger, timeout time.Duration, tel *fsTelemetry) *LogFile {
|
||||
return &LogFile{
|
||||
filename: filename,
|
||||
f: f,
|
||||
log: log.With(slog.String("filename", filename)),
|
||||
tel: tel,
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
@ -221,6 +248,11 @@ func (f *LogFile) Close(ctx context.Context) (err error) {
|
|||
if isLoggableError(err) {
|
||||
f.log.Error(ctx, "Failed to close", rlog.Error(err))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
f.tel.openedFiles.Add(ctx, -1)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue