archive rework

This commit is contained in:
royalcat 2025-01-02 21:22:44 +03:00
parent 0ae11aa283
commit 49f1e4f345
3 changed files with 105 additions and 84 deletions

View file

@ -51,10 +51,8 @@ type fileEntry struct {
}
type ArchiveFS struct {
name string
size int64
name string
size int64
files map[string]fileEntry
}

View file

@ -3,12 +3,15 @@ package vfs
import (
"context"
"errors"
"fmt"
"io"
"sync"
"github.com/dgraph-io/ristretto"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// TODO переделать кеш в демон
@ -84,26 +87,27 @@ func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64
aligntOff := (off / blockSize) * blockSize
bI := blockIndex{index: a.index, off: aligntOff}
block, ok := blockCache.Get(bI)
if ok {
n = copy(p, block.data[off-aligntOff:block.len])
if block.len < int(blockSize) {
err = ctxio.EOF
}
return n, err
}
span.AddEvent("cache miss, reading from file")
block, err = a.readBlock(ctx, bI)
block, err := a.readBlock(ctx, bI)
if err != nil && err != ctxio.EOF {
return 0, err
}
if off-aligntOff >= int64(block.len) {
return 0, ctxio.EOF
}
return copy(p, block.data[off-aligntOff:block.len]), err
}
func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (block, error) {
ctx, span := tracer.Start(ctx, "archive.RandomReader.readBlock")
defer span.End()
// check block in cache before locking
if b, ok := blockCache.Get(bI); ok && b.len != 0 {
return b, nil
}
a.readerMutex.Lock()
defer a.readerMutex.Unlock()
@ -112,6 +116,18 @@ func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (
}
if a.reader == nil || a.readen > bI.off {
span.AddEvent("reader not valid, creating new reader", trace.WithAttributes(
attribute.Bool("reader_initialized", a.reader != nil),
attribute.Int64("readen", a.readen),
attribute.Int64("target_offset", bI.off),
))
if a.reader != nil {
if err := a.reader.Close(ctx); err != nil {
return block{}, fmt.Errorf("failed to close previous reader: %w", err)
}
}
var err error
a.reader, err = a.readerFactory(context.TODO())
if err != nil {

View file

@ -7,13 +7,13 @@ import (
"io/fs"
"log/slog"
"path"
"reflect"
"slices"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/goware/singleflight"
"github.com/royalcat/btrgo/btrsync"
"github.com/sourcegraph/conc/iter"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -235,14 +235,18 @@ type FsFactory func(ctx context.Context, sourcePath string, f File) (Filesystem,
func NewResolver(factories map[string]FsFactory) *Resolver {
return &Resolver{
factories: factories,
fsmap: map[string]Filesystem{},
fsmap: btrsync.MapOf[string, Filesystem]{},
}
}
type Resolver struct {
m sync.Mutex
// m sync.Mutex
factories map[string]FsFactory
fsmap map[string]Filesystem // filesystem cache
fsmap btrsync.MapOf[string, Filesystem] // filesystem cache
fsCreateGroup singleflight.Group[string, Filesystem]
// TODO: add fsmap clean
}
@ -257,35 +261,6 @@ func (r *Resolver) IsNestedFs(f string) bool {
return false
}
func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) {
if file.IsDir() {
return nil, file.Close(ctx)
}
r.m.Lock()
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fsPath]; ok {
return nestedFs, file.Close(ctx)
}
for ext, nestFactory := range r.factories {
if !strings.HasSuffix(fsPath, ext) {
continue
}
nestedFs, err := nestFactory(ctx, fsPath, file)
if err != nil {
return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err)
}
r.fsmap[fsPath] = nestedFs
return nestedFs, nil
}
return nil, file.Close(ctx)
}
// open requeue raw open, without resolver call
func (r *Resolver) ResolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) {
ctx, span := tracer.Start(ctx, "ResolvePath")
@ -296,14 +271,12 @@ func (r *Resolver) ResolvePath(ctx context.Context, name string, rawOpen openFil
parts := strings.Split(name, Separator)
nestOn := -1
var nestFactory FsFactory
PARTS_LOOP:
for i, part := range parts {
for ext, factory := range r.factories {
for ext := range r.factories {
if strings.HasSuffix(part, ext) {
nestOn = i + 1
nestFactory = factory
break PARTS_LOOP
}
}
@ -321,46 +294,80 @@ PARTS_LOOP:
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
// fileHash, err := FileHash(ctx, file)
// if err != nil {
// return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
// }
err = file.Close(ctx)
nestedFs, err = r.nestedFs(ctx, fsPath, file)
if err != nil {
return "", nil, "", fmt.Errorf("error closing file: %w", err)
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}
// we dont need lock until now
// it must be before fsmap read to exclude race condition:
// read -> write
// read -> write
r.m.Lock()
defer r.m.Unlock()
// err = file.Close(ctx)
// if err != nil {
// return "", nil, "", fmt.Errorf("error closing file: %w", err)
// }
if nestedFs, ok := r.fsmap[fsPath]; ok {
span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
return fsPath, nestedFs, nestedFsPath, nil
} else {
ctx, span := tracer.Start(ctx, "CreateFS")
defer span.End()
return fsPath, nestedFs, nestedFsPath, err
fsFile, err := rawOpen(ctx, fsPath)
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
// it is factory responsibility to close file handler then needed
// // we dont need lock until now
// // it must be before fsmap read to exclude race condition:
// // read -> write
// // read -> write
// r.m.Lock()
// defer r.m.Unlock()
nestedFs, err := nestFactory(ctx, name, fsFile)
if err != nil {
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}
r.fsmap[fsPath] = nestedFs
// if nestedFs, ok := r.fsmap[fsPath]; ok {
// span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
// return fsPath, nestedFs, nestedFsPath, nil
// } else {
// ctx, span := tracer.Start(ctx, "CreateFS")
// defer span.End()
span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
// fsFile, err := rawOpen(ctx, fsPath)
// if err != nil {
// return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
// }
// // it is factory responsibility to close file handler then needed
return fsPath, nestedFs, nestedFsPath, nil
// nestedFs, err := nestFactory(ctx, name, fsFile)
// if err != nil {
// return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
// }
// r.fsmap[fsPath] = nestedFs
// span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
// return fsPath, nestedFs, nestedFsPath, nil
}
func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (Filesystem, error) {
if file.IsDir() {
return nil, file.Close(ctx)
}
fs, err, _ := r.fsCreateGroup.Do(fsPath, func() (Filesystem, error) {
if nestedFs, ok := r.fsmap.Load(fsPath); ok {
return nestedFs, file.Close(ctx)
}
for ext, nestFactory := range r.factories {
if !strings.HasSuffix(fsPath, ext) {
continue
}
nestedFs, err := nestFactory(ctx, fsPath, file)
if err != nil {
return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err)
}
r.fsmap.Store(fsPath, nestedFs)
return nestedFs, nil
}
return nil, file.Close(ctx)
})
return fs, err
}
var ErrNotExist = fs.ErrNotExist