Compare commits

...

4 commits

Author SHA1 Message Date
2d9dcd87fa qbittorrent info cache 2024-12-09 23:46:53 +03:00
36c501347c background scanner 2024-12-09 23:46:15 +03:00
94ca4cf599 webdav rename 2024-12-09 23:44:23 +03:00
b77ce50a7b multithreader read dir 2024-12-09 23:44:01 +03:00
11 changed files with 255 additions and 75 deletions

View file

@ -119,9 +119,15 @@ func run(configPath string) error {
return err
}
vfs.Walk(ctx, sfs, "/", func(path string, info fs.FileInfo, err error) error {
return nil
})
go func() {
log := log.WithComponent("background-scanner")
err := vfs.Walk(ctx, sfs, "/", func(path string, info fs.FileInfo, err error) error {
return nil
})
if err != nil {
log.Error(ctx, "error walking filesystem", rlog.Error(err))
}
}()
if conf.Mounts.Fuse.Enabled {
mh := fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path)

View file

@ -7,17 +7,23 @@ import (
"time"
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
"github.com/creativecreature/sturdyc"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/royalcat/btrgo/btrsync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)
var meter = otel.Meter("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent")
type cacheClient struct {
qb qbittorrent.Client
propertiesCache *expirable.LRU[string, qbittorrent.TorrentProperties]
torrentsCache *expirable.LRU[string, qbittorrent.TorrentInfo]
pieceCache btrsync.MapOf[pieceKey, int]
infoClient *sturdyc.Client[*qbittorrent.TorrentInfo]
}
type pieceKey struct {
@ -26,7 +32,6 @@ type pieceKey struct {
}
func wrapClient(qb qbittorrent.Client) *cacheClient {
const (
cacheSize = 5000
cacheTTL = time.Minute
@ -35,34 +40,45 @@ func wrapClient(qb qbittorrent.Client) *cacheClient {
return &cacheClient{
qb: qb,
propertiesCache: expirable.NewLRU[string, qbittorrent.TorrentProperties](cacheSize, nil, cacheTTL),
torrentsCache: expirable.NewLRU[string, qbittorrent.TorrentInfo](cacheSize, nil, cacheTTL),
pieceCache: btrsync.MapOf[pieceKey, int]{},
infoClient: sturdyc.New[*qbittorrent.TorrentInfo](cacheSize, 1, cacheTTL, 10,
sturdyc.WithEarlyRefreshes(time.Minute, time.Minute*5, time.Second*10),
sturdyc.WithRefreshCoalescing(100, time.Second/4),
sturdyc.WithMetrics(newSturdycMetrics()),
),
pieceCache: btrsync.MapOf[pieceKey, int]{},
}
}
func (f *cacheClient) getInfo(ctx context.Context, hash string) (*qbittorrent.TorrentInfo, error) {
if v, ok := f.torrentsCache.Get(hash); ok {
return &v, nil
}
out, err := f.infoClient.GetOrFetchBatch(ctx, []string{hash},
f.infoClient.BatchKeyFn(""),
func(ctx context.Context, ids []string) (map[string]*qbittorrent.TorrentInfo, error) {
infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
Hashes: ids,
})
if err != nil {
return nil, fmt.Errorf("error to get torrents: %w", err)
}
infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
Hashes: []string{hash},
})
out := make(map[string]*qbittorrent.TorrentInfo)
for _, info := range infos {
out[info.Hash] = info
}
return out, nil
},
)
if err != nil {
return nil, fmt.Errorf("error to check torrent existence: %w", err)
return nil, err
}
if len(infos) == 0 {
if out[hash] == nil {
return nil, nil
}
if len(infos) > 1 {
return nil, fmt.Errorf("multiple torrents with the same hash")
}
f.torrentsCache.Add(hash, *infos[0])
return infos[0], nil
return out[hash], nil
}
func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) {
@ -162,3 +178,86 @@ func (f *cacheClient) waitPieceToComplete(ctx context.Context, hash string, piec
}
}
}
type sturdycMetrics struct {
ctx context.Context
cacheHit metric.Int64Counter
cacheMiss metric.Int64Counter
refresh metric.Int64Counter
missing metric.Int64Counter
forcedEviction metric.Int64Counter
entryEviction metric.Int64Counter
batchSize metric.Int64Histogram
observeCacheSize func() int
}
var _ sturdyc.MetricsRecorder = (*sturdycMetrics)(nil)
func newSturdycMetrics() *sturdycMetrics {
m := &sturdycMetrics{
ctx: context.Background(),
cacheHit: must(meter.Int64Counter("sturdyc_cache_hit")),
cacheMiss: must(meter.Int64Counter("sturdyc_cache_miss")),
refresh: must(meter.Int64Counter("sturdyc_cache_refresh")),
missing: must(meter.Int64Counter("sturdyc_cache_missing")),
forcedEviction: must(meter.Int64Counter("sturdyc_cache_forced_eviction")),
entryEviction: must(meter.Int64Counter("sturdyc_cache_entry_eviction")),
batchSize: must(meter.Int64Histogram("sturdyc_cache_batch_size")),
}
must(meter.Int64ObservableGauge("sturdyc_cache_size",
metric.WithInt64Callback(func(ctx context.Context, io metric.Int64Observer) error {
if m.observeCacheSize == nil {
return nil
}
io.Observe(int64(m.observeCacheSize()))
return nil
})))
return m
}
func (s *sturdycMetrics) CacheHit() {
s.cacheHit.Add(s.ctx, 1)
}
func (s *sturdycMetrics) CacheMiss() {
s.cacheMiss.Add(s.ctx, 1)
}
func (s *sturdycMetrics) Refresh() {
s.refresh.Add(s.ctx, 1)
}
func (s *sturdycMetrics) MissingRecord() {
s.missing.Add(s.ctx, 1)
}
func (s *sturdycMetrics) ForcedEviction() {
s.forcedEviction.Add(s.ctx, 1)
}
func (s *sturdycMetrics) CacheBatchRefreshSize(size int) {
s.batchSize.Record(s.ctx, int64(size))
}
func (s *sturdycMetrics) ObserveCacheSize(callback func() int) {
s.observeCacheSize = callback
}
func (s *sturdycMetrics) EntriesEvicted(evictd int) {
s.entryEviction.Add(s.ctx, int64(evictd))
}
func (s *sturdycMetrics) ShardIndex(int) {
return
}
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}

View file

@ -222,7 +222,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
})
if err != nil {
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
return err
return fmt.Errorf("error adding torrent: %w", err)
}
var props *qbittorrent.TorrentProperties
@ -247,7 +247,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
// info := existing[0]
props, err := d.client.getProperties(ctx, ih.HexString())
if err != nil {
return err
return fmt.Errorf("error getting torrent properties: %w for infohash: %s", err, ih.HexString())
}
d.registeredTorrents.Add(props.Hash)
@ -256,7 +256,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)
if err != nil {
return err
return fmt.Errorf("error moving torrent: %w", err)
}
}

View file

@ -274,6 +274,9 @@ func (f *File) canExpectSoon(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
if info == nil {
return false, nil
}
return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil
}
@ -295,7 +298,7 @@ func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (boo
return true, nil
}
func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error {
func (f *File) waitRangeAvailable(ctx context.Context, offset int64, size int) error {
complete, err := f.isRangeComplete(ctx, offset, size)
if err != nil {
return err
@ -338,7 +341,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) {
f.mu.Lock()
defer f.mu.Unlock()
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil {
return 0, err
}
@ -349,7 +352,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) {
// ReadAt implements vfs.File.
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil {
return 0, err
}

4
go.mod
View file

@ -14,6 +14,7 @@ require (
github.com/billziss-gh/cgofuse v1.5.0
github.com/bodgit/sevenzip v1.5.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/creativecreature/sturdyc v1.0.6
github.com/deckarep/golang-set/v2 v2.6.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dgraph-io/ristretto v0.1.1
@ -48,6 +49,7 @@ require (
github.com/rs/zerolog v1.32.0
github.com/samber/slog-multi v1.0.2
github.com/samber/slog-zerolog v1.0.0
github.com/sourcegraph/conc v0.3.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.4
github.com/vektah/gqlparser/v2 v2.5.17
@ -180,6 +182,8 @@ require (
go.opentelemetry.io/contrib v1.21.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/mod v0.20.0 // indirect

8
go.sum
View file

@ -152,6 +152,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creativecreature/sturdyc v1.0.6 h1:hSYm0j7L0Vug3ho8ozFKK9iihFMYgk/cOaQXmL2G/Ho=
github.com/creativecreature/sturdyc v1.0.6/go.mod h1:Qwi5+41ERVF0708Mymjrko+JlLnmJ2/T9Wb/Xsax3f4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -570,6 +572,8 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4=
github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@ -657,8 +661,12 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
go4.org v0.0.0-20200411211856-f5505b9728dd h1:BNJlw5kRTzdmyfh5U8F93HA2OwkP7ZGwA51eJ/0wKOU=
go4.org v0.0.0-20200411211856-f5505b9728dd/go.mod h1:CIiUVy99QCPfoE13bO4EZaz5GZMZXMSBGhxRdsvzbkg=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View file

@ -51,7 +51,7 @@ func (wd *WebDAV) RemoveAll(ctx context.Context, name string) error {
}
func (wd *WebDAV) Rename(ctx context.Context, oldName, newName string) error {
return webdav.ErrNotImplemented
return wd.fs.Rename(ctx, oldName, newName)
}
func (wd *WebDAV) lookupFile(ctx context.Context, name string) (vfs.File, error) {

View file

@ -41,7 +41,7 @@ var ArchiveFactories = map[string]FsFactory{
},
}
type archiveLoader func(ctx context.Context, archivePath string, r ctxio.ReaderAt, size int64) (map[string]fileEntry, error)
type archiveLoader func(ctx context.Context, archivePath string, r File, size int64) (map[string]fileEntry, error)
var _ Filesystem = &ArchiveFS{}
@ -88,8 +88,8 @@ func (a *ArchiveFS) FsName() string {
return "archivefs"
}
func NewArchive(ctx context.Context, archivePath, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) {
archiveFiles, err := loader(ctx, archivePath, r, size)
func NewArchive(ctx context.Context, archivePath, name string, f File, size int64, loader archiveLoader) (*ArchiveFS, error) {
archiveFiles, err := loader(ctx, archivePath, f, size)
if err != nil {
return nil, err
}
@ -281,7 +281,12 @@ type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error
var _ archiveLoader = ZipLoader
func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
func ZipLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) {
hash, err := FileHash(ctx, f)
if err != nil {
return nil, err
}
reader := ctxio.IoReaderAt(ctx, f)
zr, err := zip.NewReader(reader, size)
if err != nil {
@ -314,7 +319,7 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i
info := zipFile.FileInfo()
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: zipFile.Name}, info.Size(), af)
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af)
out[AbsPath(zipFile.Name)] = fileEntry{
FileInfo: info,
@ -329,7 +334,12 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i
var _ archiveLoader = SevenZipLoader
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader File, size int64) (map[string]fileEntry, error) {
hash, err := FileHash(ctx, ctxreader)
if err != nil {
return nil, err
}
reader := ctxio.IoReaderAt(ctx, ctxreader)
r, err := sevenzip.NewReader(reader, size)
if err != nil {
@ -361,7 +371,7 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea
info := f.FileInfo()
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: f.Name}, info.Size(), af)
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af)
out[AbsPath(f.Name)] = fileEntry{
FileInfo: f.FileInfo(),
@ -376,8 +386,13 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea
var _ archiveLoader = RarLoader
func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
func RarLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) {
hash, err := FileHash(ctx, f)
if err != nil {
return nil, err
}
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
r, err := rardecode.NewReader(reader)
if err != nil {
@ -396,7 +411,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt
name := header.Name
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
r, err := rardecode.NewReader(reader)
if err != nil {
return nil, err
@ -413,7 +428,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt
return nil, fmt.Errorf("file with name '%s' not found", name)
}
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: header.Name}, header.UnPackedSize, af)
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af)
out[AbsPath(header.Name)] = fileEntry{
FileInfo: NewFileInfo(header.Name, header.UnPackedSize),

View file

@ -18,8 +18,8 @@ const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage
const defaultBlockCount = cacheSize / blockSize
type archiveFileIndex struct {
archive string
filename string
archiveHash Hash
filename string
}
type blockIndex struct {
@ -107,7 +107,7 @@ func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (
a.readerMutex.Lock()
defer a.readerMutex.Unlock()
if b, ok := blockCache.Get(bI); ok { // check again, maybe another goroutine already read this block
if b, ok := blockCache.Get(bI); ok && b.len != 0 { // check again, maybe another goroutine already read this block
return b, nil
}

View file

@ -4,10 +4,11 @@ import (
"archive/zip"
"bytes"
"context"
"io"
"io/fs"
"testing"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/royalcat/ctxio"
"github.com/stretchr/testify/require"
)
@ -62,24 +63,24 @@ func TestZipFilesystem(t *testing.T) {
f, err := zfs.Open(ctx, "/path/to/test/file/1.txt")
require.NoError(err)
n, err := f.Read(ctx, out)
require.NoError(err)
require.ErrorIs(err, io.EOF)
require.Equal(5, n)
require.Equal([]byte("Hello"), out)
outSpace := make([]byte, 1)
n, err = f.Read(ctx, outSpace)
require.NoError(err)
require.ErrorIs(err, io.EOF)
require.Equal(1, n)
require.Equal([]byte(" "), outSpace)
n, err = f.Read(ctx, out)
require.NoError(err)
require.ErrorIs(err, io.EOF)
require.Equal(5, n)
require.Equal([]byte("World"), out)
}
func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) {
func createTestZip(require *require.Assertions) (vfs.File, int64) {
buf := bytes.NewBuffer([]byte{})
zWriter := zip.NewWriter(buf)
@ -95,17 +96,59 @@ func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) {
return newCBR(buf.Bytes()), int64(buf.Len())
}
type closeableByteReader struct {
data *bytes.Reader
}
func newCBR(b []byte) *closeableByteReader {
return &closeableByteReader{
data: bytes.NewReader(b),
}
}
var _ vfs.File = &closeableByteReader{}
type closeableByteReader struct {
data *bytes.Reader
}
// ReadAt implements ctxio.ReaderAt.
func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return c.data.ReadAt(p, off)
}
// Close implements vfs.File.
func (c *closeableByteReader) Close(ctx context.Context) error {
panic("unimplemented")
}
// Info implements vfs.File.
func (c *closeableByteReader) Info() (fs.FileInfo, error) {
panic("unimplemented")
}
// IsDir implements vfs.File.
func (c *closeableByteReader) IsDir() bool {
panic("unimplemented")
}
// Name implements vfs.File.
func (c *closeableByteReader) Name() string {
panic("unimplemented")
}
// Read implements vfs.File.
func (c *closeableByteReader) Read(ctx context.Context, p []byte) (n int, err error) {
return c.data.Read(p)
}
// Seek implements vfs.File.
func (c *closeableByteReader) Seek(offset int64, whence int) (int64, error) {
return c.data.Seek(offset, whence)
}
// Size implements vfs.File.
func (c *closeableByteReader) Size() int64 {
return c.data.Size()
}
// Type implements vfs.File.
func (c *closeableByteReader) Type() fs.FileMode {
panic("unimplemented")
}

View file

@ -14,6 +14,7 @@ import (
"time"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/sourcegraph/conc/iter"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
@ -111,8 +112,8 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e
if err != nil {
return nil, err
}
out := make([]fs.DirEntry, 0, len(entries))
for _, e := range entries {
out, err := iter.MapErr(entries, func(pe *fs.DirEntry) (fs.DirEntry, error) {
e := *pe
if r.resolver.IsNestedFs(e.Name()) {
filepath := path.Join("/", name, e.Name())
file, err := r.rootFS.Open(ctx, filepath)
@ -125,16 +126,22 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e
}
if err != nil {
log.Error(ctx, "error creating nested fs", rlog.Error(err))
out = append(out, e)
continue
return nil, fmt.Errorf("error creating nested fs: %w", err)
}
out = append(out, nestedfs)
return nestedfs, nil
} else {
out = append(out, e)
return e, nil
}
})
if err != nil {
log.Error(ctx, "error mapping entries", rlog.Error(err))
err = nil
}
return out, nil
out = slices.DeleteFunc(out, func(e fs.DirEntry) bool { return e == nil })
return out, err
}
// Stat implements Filesystem.
@ -228,14 +235,14 @@ type FsFactory func(ctx context.Context, sourcePath string, f File) (Filesystem,
func NewResolver(factories map[string]FsFactory) *Resolver {
return &Resolver{
factories: factories,
fsmap: map[Hash]Filesystem{},
fsmap: map[string]Filesystem{},
}
}
type Resolver struct {
m sync.Mutex
factories map[string]FsFactory
fsmap map[Hash]Filesystem // filesystem cache
fsmap map[string]Filesystem // filesystem cache
// TODO: add fsmap clean
}
@ -255,15 +262,10 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
return nil, file.Close(ctx)
}
fileHash, err := FileHash(ctx, file)
if err != nil {
return nil, fmt.Errorf("error calculating file hash: %w", err)
}
r.m.Lock()
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fileHash]; ok {
if nestedFs, ok := r.fsmap[fsPath]; ok {
return nestedFs, file.Close(ctx)
}
@ -276,7 +278,7 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
if err != nil {
return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err)
}
r.fsmap[fileHash] = nestedFs
r.fsmap[fsPath] = nestedFs
return nestedFs, nil
@ -319,10 +321,10 @@ PARTS_LOOP:
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
}
fileHash, err := FileHash(ctx, file)
if err != nil {
return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
}
// fileHash, err := FileHash(ctx, file)
// if err != nil {
// return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
// }
err = file.Close(ctx)
if err != nil {
return "", nil, "", fmt.Errorf("error closing file: %w", err)
@ -335,7 +337,7 @@ PARTS_LOOP:
r.m.Lock()
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fileHash]; ok {
if nestedFs, ok := r.fsmap[fsPath]; ok {
span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
return fsPath, nestedFs, nestedFsPath, nil
} else {
@ -352,7 +354,7 @@ PARTS_LOOP:
if err != nil {
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
}
r.fsmap[fileHash] = nestedFs
r.fsmap[fsPath] = nestedFs
span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))