diff --git a/src/vfs/archive.go b/src/vfs/archive.go index 4296ee2..56e49b5 100644 --- a/src/vfs/archive.go +++ b/src/vfs/archive.go @@ -51,10 +51,8 @@ type fileEntry struct { } type ArchiveFS struct { - name string - - size int64 - + name string + size int64 files map[string]fileEntry } diff --git a/src/vfs/archive_cache.go b/src/vfs/archive_cache.go index 47e6fc8..a707ef6 100644 --- a/src/vfs/archive_cache.go +++ b/src/vfs/archive_cache.go @@ -3,12 +3,15 @@ package vfs import ( "context" "errors" + "fmt" "io" "sync" "github.com/dgraph-io/ristretto" lru "github.com/hashicorp/golang-lru/v2" "github.com/royalcat/ctxio" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // TODO переделать кеш в демон @@ -84,26 +87,27 @@ func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64 aligntOff := (off / blockSize) * blockSize bI := blockIndex{index: a.index, off: aligntOff} - block, ok := blockCache.Get(bI) - if ok { - n = copy(p, block.data[off-aligntOff:block.len]) - if block.len < int(blockSize) { - err = ctxio.EOF - } - - return n, err - } - - span.AddEvent("cache miss, reading from file") - - block, err = a.readBlock(ctx, bI) + block, err := a.readBlock(ctx, bI) if err != nil && err != ctxio.EOF { return 0, err } + + if off-aligntOff >= int64(block.len) { + return 0, ctxio.EOF + } + return copy(p, block.data[off-aligntOff:block.len]), err } func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (block, error) { + ctx, span := tracer.Start(ctx, "archive.RandomReader.readBlock") + defer span.End() + + // check block in cache before locking + if b, ok := blockCache.Get(bI); ok && b.len != 0 { + return b, nil + } + a.readerMutex.Lock() defer a.readerMutex.Unlock() @@ -112,6 +116,18 @@ func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) ( } if a.reader == nil || a.readen > bI.off { + span.AddEvent("reader not valid, creating new reader", trace.WithAttributes( + attribute.Bool("reader_initialized", a.reader != nil), + attribute.Int64("readen", a.readen), + attribute.Int64("target_offset", bI.off), + )) + + if a.reader != nil { + if err := a.reader.Close(ctx); err != nil { + return block{}, fmt.Errorf("failed to close previous reader: %w", err) + } + } + var err error a.reader, err = a.readerFactory(context.TODO()) if err != nil { diff --git a/src/vfs/resolver.go b/src/vfs/resolver.go index c762a01..b942295 100644 --- a/src/vfs/resolver.go +++ b/src/vfs/resolver.go @@ -7,13 +7,13 @@ import ( "io/fs" "log/slog" "path" - "reflect" "slices" "strings" - "sync" "time" "git.kmsign.ru/royalcat/tstor/pkg/rlog" + "github.com/goware/singleflight" + "github.com/royalcat/btrgo/btrsync" "github.com/sourcegraph/conc/iter" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -235,14 +235,18 @@ type FsFactory func(ctx context.Context, sourcePath string, f File) (Filesystem, func NewResolver(factories map[string]FsFactory) *Resolver { return &Resolver{ factories: factories, - fsmap: map[string]Filesystem{}, + fsmap: btrsync.MapOf[string, Filesystem]{}, } } type Resolver struct { - m sync.Mutex + // m sync.Mutex factories map[string]FsFactory - fsmap map[string]Filesystem // filesystem cache + + fsmap btrsync.MapOf[string, Filesystem] // filesystem cache + + fsCreateGroup singleflight.Group[string, Filesystem] + // TODO: add fsmap clean } @@ -257,35 +261,6 @@ func (r *Resolver) IsNestedFs(f string) bool { return false } -func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) { - if file.IsDir() { - return nil, file.Close(ctx) - } - - r.m.Lock() - defer r.m.Unlock() - - if nestedFs, ok := r.fsmap[fsPath]; ok { - return nestedFs, file.Close(ctx) - } - - for ext, nestFactory := range r.factories { - if !strings.HasSuffix(fsPath, ext) { - continue - } - - nestedFs, err := nestFactory(ctx, fsPath, file) - if err != nil { - return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err) - } - r.fsmap[fsPath] = nestedFs - - return nestedFs, nil - - } - return nil, file.Close(ctx) -} - // open requeue raw open, without resolver call func (r *Resolver) ResolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) { ctx, span := tracer.Start(ctx, "ResolvePath") @@ -296,14 +271,12 @@ func (r *Resolver) ResolvePath(ctx context.Context, name string, rawOpen openFil parts := strings.Split(name, Separator) nestOn := -1 - var nestFactory FsFactory PARTS_LOOP: for i, part := range parts { - for ext, factory := range r.factories { + for ext := range r.factories { if strings.HasSuffix(part, ext) { nestOn = i + 1 - nestFactory = factory break PARTS_LOOP } } @@ -321,46 +294,80 @@ PARTS_LOOP: if err != nil { return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) } - // fileHash, err := FileHash(ctx, file) - // if err != nil { - // return "", nil, "", fmt.Errorf("error calculating file hash: %w", err) - // } - err = file.Close(ctx) + + nestedFs, err = r.nestedFs(ctx, fsPath, file) if err != nil { - return "", nil, "", fmt.Errorf("error closing file: %w", err) + return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) } - // we dont need lock until now - // it must be before fsmap read to exclude race condition: - // read -> write - // read -> write - r.m.Lock() - defer r.m.Unlock() + // err = file.Close(ctx) + // if err != nil { + // return "", nil, "", fmt.Errorf("error closing file: %w", err) + // } - if nestedFs, ok := r.fsmap[fsPath]; ok { - span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name()))) - return fsPath, nestedFs, nestedFsPath, nil - } else { - ctx, span := tracer.Start(ctx, "CreateFS") - defer span.End() + return fsPath, nestedFs, nestedFsPath, err - fsFile, err := rawOpen(ctx, fsPath) - if err != nil { - return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) - } - // it is factory responsibility to close file handler then needed + // // we dont need lock until now + // // it must be before fsmap read to exclude race condition: + // // read -> write + // // read -> write + // r.m.Lock() + // defer r.m.Unlock() - nestedFs, err := nestFactory(ctx, name, fsFile) - if err != nil { - return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) - } - r.fsmap[fsPath] = nestedFs + // if nestedFs, ok := r.fsmap[fsPath]; ok { + // span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name()))) + // return fsPath, nestedFs, nestedFsPath, nil + // } else { + // ctx, span := tracer.Start(ctx, "CreateFS") + // defer span.End() - span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name()))) + // fsFile, err := rawOpen(ctx, fsPath) + // if err != nil { + // return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) + // } + // // it is factory responsibility to close file handler then needed - return fsPath, nestedFs, nestedFsPath, nil + // nestedFs, err := nestFactory(ctx, name, fsFile) + // if err != nil { + // return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) + // } + // r.fsmap[fsPath] = nestedFs + + // span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name()))) + + // return fsPath, nestedFs, nestedFsPath, nil + +} + +func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) { + if file.IsDir() { + return nil, file.Close(ctx) } + fs, err, _ := r.fsCreateGroup.Do(fsPath, func() (Filesystem, error) { + if nestedFs, ok := r.fsmap.Load(fsPath); ok { + return nestedFs, file.Close(ctx) + } + + for ext, nestFactory := range r.factories { + if !strings.HasSuffix(fsPath, ext) { + continue + } + + nestedFs, err := nestFactory(ctx, fsPath, file) + if err != nil { + return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err) + } + + r.fsmap.Store(fsPath, nestedFs) + + return nestedFs, nil + } + + return nil, file.Close(ctx) + }) + + return fs, err } var ErrNotExist = fs.ErrNotExist