diff --git a/cmd/tstor/main.go b/cmd/tstor/main.go index 5d6ff4a..de5c6a9 100644 --- a/cmd/tstor/main.go +++ b/cmd/tstor/main.go @@ -119,9 +119,15 @@ func run(configPath string) error { return err } - vfs.Walk(ctx, sfs, "/", func(path string, info fs.FileInfo, err error) error { - return nil - }) + go func() { + log := log.WithComponent("background-scanner") + err := vfs.Walk(ctx, sfs, "/", func(path string, info fs.FileInfo, err error) error { + return nil + }) + if err != nil { + log.Error(ctx, "error walking filesystem", rlog.Error(err)) + } + }() if conf.Mounts.Fuse.Enabled { mh := fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path) diff --git a/daemons/qbittorrent/client.go b/daemons/qbittorrent/client.go index 8263ca9..f86b294 100644 --- a/daemons/qbittorrent/client.go +++ b/daemons/qbittorrent/client.go @@ -7,17 +7,23 @@ import ( "time" "git.kmsign.ru/royalcat/tstor/pkg/qbittorrent" + "github.com/creativecreature/sturdyc" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/royalcat/btrgo/btrsync" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" ) +var meter = otel.Meter("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent") + type cacheClient struct { qb qbittorrent.Client propertiesCache *expirable.LRU[string, qbittorrent.TorrentProperties] - torrentsCache *expirable.LRU[string, qbittorrent.TorrentInfo] pieceCache btrsync.MapOf[pieceKey, int] + + infoClient *sturdyc.Client[*qbittorrent.TorrentInfo] } type pieceKey struct { @@ -26,7 +32,6 @@ type pieceKey struct { } func wrapClient(qb qbittorrent.Client) *cacheClient { - const ( cacheSize = 5000 cacheTTL = time.Minute @@ -35,34 +40,45 @@ func wrapClient(qb qbittorrent.Client) *cacheClient { return &cacheClient{ qb: qb, propertiesCache: expirable.NewLRU[string, qbittorrent.TorrentProperties](cacheSize, nil, cacheTTL), - torrentsCache: expirable.NewLRU[string, qbittorrent.TorrentInfo](cacheSize, nil, cacheTTL), - pieceCache: btrsync.MapOf[pieceKey, int]{}, + + infoClient: sturdyc.New[*qbittorrent.TorrentInfo](cacheSize, 1, cacheTTL, 10, + sturdyc.WithEarlyRefreshes(time.Minute, time.Minute*5, time.Second*10), + sturdyc.WithRefreshCoalescing(100, time.Second/4), + sturdyc.WithMetrics(newSturdycMetrics()), + ), + + pieceCache: btrsync.MapOf[pieceKey, int]{}, } } func (f *cacheClient) getInfo(ctx context.Context, hash string) (*qbittorrent.TorrentInfo, error) { - if v, ok := f.torrentsCache.Get(hash); ok { - return &v, nil - } + out, err := f.infoClient.GetOrFetchBatch(ctx, []string{hash}, + f.infoClient.BatchKeyFn(""), + func(ctx context.Context, ids []string) (map[string]*qbittorrent.TorrentInfo, error) { + infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{ + Hashes: ids, + }) + if err != nil { + return nil, fmt.Errorf("error to get torrents: %w", err) + } - infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{ - Hashes: []string{hash}, - }) + out := make(map[string]*qbittorrent.TorrentInfo) + for _, info := range infos { + out[info.Hash] = info + } + + return out, nil + }, + ) if err != nil { - return nil, fmt.Errorf("error to check torrent existence: %w", err) + return nil, err } - if len(infos) == 0 { + if out[hash] == nil { return nil, nil } - if len(infos) > 1 { - return nil, fmt.Errorf("multiple torrents with the same hash") - } - - f.torrentsCache.Add(hash, *infos[0]) - - return infos[0], nil + return out[hash], nil } func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) { @@ -162,3 +178,86 @@ func (f *cacheClient) waitPieceToComplete(ctx context.Context, hash string, piec } } } + +type sturdycMetrics struct { + ctx context.Context + + cacheHit metric.Int64Counter + cacheMiss metric.Int64Counter + refresh metric.Int64Counter + missing metric.Int64Counter + forcedEviction metric.Int64Counter + entryEviction metric.Int64Counter + batchSize metric.Int64Histogram + + observeCacheSize func() int +} + +var _ sturdyc.MetricsRecorder = (*sturdycMetrics)(nil) + +func newSturdycMetrics() *sturdycMetrics { + m := &sturdycMetrics{ + ctx: context.Background(), + cacheHit: must(meter.Int64Counter("sturdyc_cache_hit")), + cacheMiss: must(meter.Int64Counter("sturdyc_cache_miss")), + refresh: must(meter.Int64Counter("sturdyc_cache_refresh")), + missing: must(meter.Int64Counter("sturdyc_cache_missing")), + forcedEviction: must(meter.Int64Counter("sturdyc_cache_forced_eviction")), + entryEviction: must(meter.Int64Counter("sturdyc_cache_entry_eviction")), + batchSize: must(meter.Int64Histogram("sturdyc_cache_batch_size")), + } + + must(meter.Int64ObservableGauge("sturdyc_cache_size", + metric.WithInt64Callback(func(ctx context.Context, io metric.Int64Observer) error { + if m.observeCacheSize == nil { + return nil + } + io.Observe(int64(m.observeCacheSize())) + return nil + }))) + + return m +} + +func (s *sturdycMetrics) CacheHit() { + s.cacheHit.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) CacheMiss() { + s.cacheMiss.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) Refresh() { + s.refresh.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) MissingRecord() { + s.missing.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) ForcedEviction() { + s.forcedEviction.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) CacheBatchRefreshSize(size int) { + s.batchSize.Record(s.ctx, int64(size)) +} + +func (s *sturdycMetrics) ObserveCacheSize(callback func() int) { + s.observeCacheSize = callback +} + +func (s *sturdycMetrics) EntriesEvicted(evictd int) { + s.entryEviction.Add(s.ctx, int64(evictd)) +} + +func (s *sturdycMetrics) ShardIndex(int) { + return +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/daemons/qbittorrent/daemon.go b/daemons/qbittorrent/daemon.go index cd53747..a7b88a3 100644 --- a/daemons/qbittorrent/daemon.go +++ b/daemons/qbittorrent/daemon.go @@ -222,7 +222,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf }) if err != nil { d.log.Error(ctx, "error adding torrent", rlog.Error(err)) - return err + return fmt.Errorf("error adding torrent: %w", err) } var props *qbittorrent.TorrentProperties @@ -247,7 +247,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf // info := existing[0] props, err := d.client.getProperties(ctx, ih.HexString()) if err != nil { - return err + return fmt.Errorf("error getting torrent properties: %w for infohash: %s", err, ih.HexString()) } d.registeredTorrents.Add(props.Hash) @@ -256,7 +256,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath)) err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath) if err != nil { - return err + return fmt.Errorf("error moving torrent: %w", err) } } diff --git a/daemons/qbittorrent/fs.go b/daemons/qbittorrent/fs.go index d383276..ca5ecbb 100644 --- a/daemons/qbittorrent/fs.go +++ b/daemons/qbittorrent/fs.go @@ -274,6 +274,9 @@ func (f *File) canExpectSoon(ctx context.Context) (bool, error) { if err != nil { return false, err } + if info == nil { + return false, nil + } return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil } @@ -295,7 +298,7 @@ func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (boo return true, nil } -func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error { +func (f *File) waitRangeAvailable(ctx context.Context, offset int64, size int) error { complete, err := f.isRangeComplete(ctx, offset, size) if err != nil { return err @@ -338,7 +341,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) { f.mu.Lock() defer f.mu.Unlock() - if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil { + if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil { return 0, err } @@ -349,7 +352,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) { // ReadAt implements vfs.File. func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { - if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil { + if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil { return 0, err } diff --git a/go.mod b/go.mod index 7a74b23..5a19c1b 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/billziss-gh/cgofuse v1.5.0 github.com/bodgit/sevenzip v1.5.1 github.com/cespare/xxhash/v2 v2.3.0 + github.com/creativecreature/sturdyc v1.0.6 github.com/deckarep/golang-set/v2 v2.6.0 github.com/dgraph-io/badger/v4 v4.2.0 github.com/dgraph-io/ristretto v0.1.1 @@ -48,6 +49,7 @@ require ( github.com/rs/zerolog v1.32.0 github.com/samber/slog-multi v1.0.2 github.com/samber/slog-zerolog v1.0.0 + github.com/sourcegraph/conc v0.3.0 github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.27.4 github.com/vektah/gqlparser/v2 v2.5.17 @@ -180,6 +182,8 @@ require ( go.opentelemetry.io/contrib v1.21.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.9.0 // indirect go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/mod v0.20.0 // indirect diff --git a/go.sum b/go.sum index 456409e..1c40729 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creativecreature/sturdyc v1.0.6 h1:hSYm0j7L0Vug3ho8ozFKK9iihFMYgk/cOaQXmL2G/Ho= +github.com/creativecreature/sturdyc v1.0.6/go.mod h1:Qwi5+41ERVF0708Mymjrko+JlLnmJ2/T9Wb/Xsax3f4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -570,6 +572,8 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4= github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -657,8 +661,12 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go4.org v0.0.0-20200411211856-f5505b9728dd h1:BNJlw5kRTzdmyfh5U8F93HA2OwkP7ZGwA51eJ/0wKOU= go4.org v0.0.0-20200411211856-f5505b9728dd/go.mod h1:CIiUVy99QCPfoE13bO4EZaz5GZMZXMSBGhxRdsvzbkg= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/src/export/webdav/fs.go b/src/export/webdav/fs.go index e5da3fe..f8e5407 100644 --- a/src/export/webdav/fs.go +++ b/src/export/webdav/fs.go @@ -51,7 +51,7 @@ func (wd *WebDAV) RemoveAll(ctx context.Context, name string) error { } func (wd *WebDAV) Rename(ctx context.Context, oldName, newName string) error { - return webdav.ErrNotImplemented + return wd.fs.Rename(ctx, oldName, newName) } func (wd *WebDAV) lookupFile(ctx context.Context, name string) (vfs.File, error) { diff --git a/src/vfs/archive.go b/src/vfs/archive.go index 8e39cf0..6bac4a9 100644 --- a/src/vfs/archive.go +++ b/src/vfs/archive.go @@ -41,7 +41,7 @@ var ArchiveFactories = map[string]FsFactory{ }, } -type archiveLoader func(ctx context.Context, archivePath string, r ctxio.ReaderAt, size int64) (map[string]fileEntry, error) +type archiveLoader func(ctx context.Context, archivePath string, r File, size int64) (map[string]fileEntry, error) var _ Filesystem = &ArchiveFS{} @@ -88,8 +88,8 @@ func (a *ArchiveFS) FsName() string { return "archivefs" } -func NewArchive(ctx context.Context, archivePath, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) { - archiveFiles, err := loader(ctx, archivePath, r, size) +func NewArchive(ctx context.Context, archivePath, name string, f File, size int64, loader archiveLoader) (*ArchiveFS, error) { + archiveFiles, err := loader(ctx, archivePath, f, size) if err != nil { return nil, err } @@ -281,7 +281,12 @@ type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error var _ archiveLoader = ZipLoader -func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { +func ZipLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) { + hash, err := FileHash(ctx, f) + if err != nil { + return nil, err + } + reader := ctxio.IoReaderAt(ctx, f) zr, err := zip.NewReader(reader, size) if err != nil { @@ -314,7 +319,7 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i info := zipFile.FileInfo() - rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: zipFile.Name}, info.Size(), af) + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af) out[AbsPath(zipFile.Name)] = fileEntry{ FileInfo: info, @@ -329,7 +334,12 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i var _ archiveLoader = SevenZipLoader -func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { +func SevenZipLoader(ctx context.Context, archivePath string, ctxreader File, size int64) (map[string]fileEntry, error) { + hash, err := FileHash(ctx, ctxreader) + if err != nil { + return nil, err + } + reader := ctxio.IoReaderAt(ctx, ctxreader) r, err := sevenzip.NewReader(reader, size) if err != nil { @@ -361,7 +371,7 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea info := f.FileInfo() - rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: f.Name}, info.Size(), af) + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af) out[AbsPath(f.Name)] = fileEntry{ FileInfo: f.FileInfo(), @@ -376,8 +386,13 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea var _ archiveLoader = RarLoader -func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) { - reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) +func RarLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) { + hash, err := FileHash(ctx, f) + if err != nil { + return nil, err + } + + reader := ioutils.WrapIoReadSeeker(ctx, f, size) r, err := rardecode.NewReader(reader) if err != nil { @@ -396,7 +411,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt name := header.Name af := func(ctx context.Context) (ctxio.ReadCloser, error) { - reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size) + reader := ioutils.WrapIoReadSeeker(ctx, f, size) r, err := rardecode.NewReader(reader) if err != nil { return nil, err @@ -413,7 +428,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt return nil, fmt.Errorf("file with name '%s' not found", name) } - rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: header.Name}, header.UnPackedSize, af) + rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af) out[AbsPath(header.Name)] = fileEntry{ FileInfo: NewFileInfo(header.Name, header.UnPackedSize), diff --git a/src/vfs/archive_cache.go b/src/vfs/archive_cache.go index 41237e7..47e6fc8 100644 --- a/src/vfs/archive_cache.go +++ b/src/vfs/archive_cache.go @@ -18,8 +18,8 @@ const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage const defaultBlockCount = cacheSize / blockSize type archiveFileIndex struct { - archive string - filename string + archiveHash Hash + filename string } type blockIndex struct { @@ -107,7 +107,7 @@ func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) ( a.readerMutex.Lock() defer a.readerMutex.Unlock() - if b, ok := blockCache.Get(bI); ok { // check again, maybe another goroutine already read this block + if b, ok := blockCache.Get(bI); ok && b.len != 0 { // check again, maybe another goroutine already read this block return b, nil } diff --git a/src/vfs/archive_test.go b/src/vfs/archive_test.go index b209264..fcac485 100644 --- a/src/vfs/archive_test.go +++ b/src/vfs/archive_test.go @@ -4,10 +4,11 @@ import ( "archive/zip" "bytes" "context" + "io" + "io/fs" "testing" "git.kmsign.ru/royalcat/tstor/src/vfs" - "github.com/royalcat/ctxio" "github.com/stretchr/testify/require" ) @@ -62,24 +63,24 @@ func TestZipFilesystem(t *testing.T) { f, err := zfs.Open(ctx, "/path/to/test/file/1.txt") require.NoError(err) n, err := f.Read(ctx, out) - require.NoError(err) + require.ErrorIs(err, io.EOF) require.Equal(5, n) require.Equal([]byte("Hello"), out) outSpace := make([]byte, 1) n, err = f.Read(ctx, outSpace) - require.NoError(err) + require.ErrorIs(err, io.EOF) require.Equal(1, n) require.Equal([]byte(" "), outSpace) n, err = f.Read(ctx, out) - require.NoError(err) + require.ErrorIs(err, io.EOF) require.Equal(5, n) require.Equal([]byte("World"), out) } -func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) { +func createTestZip(require *require.Assertions) (vfs.File, int64) { buf := bytes.NewBuffer([]byte{}) zWriter := zip.NewWriter(buf) @@ -95,17 +96,59 @@ func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) { return newCBR(buf.Bytes()), int64(buf.Len()) } -type closeableByteReader struct { - data *bytes.Reader -} - func newCBR(b []byte) *closeableByteReader { return &closeableByteReader{ data: bytes.NewReader(b), } } +var _ vfs.File = &closeableByteReader{} + +type closeableByteReader struct { + data *bytes.Reader +} + // ReadAt implements ctxio.ReaderAt. func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { return c.data.ReadAt(p, off) } + +// Close implements vfs.File. +func (c *closeableByteReader) Close(ctx context.Context) error { + panic("unimplemented") +} + +// Info implements vfs.File. +func (c *closeableByteReader) Info() (fs.FileInfo, error) { + panic("unimplemented") +} + +// IsDir implements vfs.File. +func (c *closeableByteReader) IsDir() bool { + panic("unimplemented") +} + +// Name implements vfs.File. +func (c *closeableByteReader) Name() string { + panic("unimplemented") +} + +// Read implements vfs.File. +func (c *closeableByteReader) Read(ctx context.Context, p []byte) (n int, err error) { + return c.data.Read(p) +} + +// Seek implements vfs.File. +func (c *closeableByteReader) Seek(offset int64, whence int) (int64, error) { + return c.data.Seek(offset, whence) +} + +// Size implements vfs.File. +func (c *closeableByteReader) Size() int64 { + return c.data.Size() +} + +// Type implements vfs.File. +func (c *closeableByteReader) Type() fs.FileMode { + panic("unimplemented") +} diff --git a/src/vfs/resolver.go b/src/vfs/resolver.go index 9d4850e..7347215 100644 --- a/src/vfs/resolver.go +++ b/src/vfs/resolver.go @@ -14,6 +14,7 @@ import ( "time" "git.kmsign.ru/royalcat/tstor/pkg/rlog" + "github.com/sourcegraph/conc/iter" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" @@ -111,8 +112,8 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e if err != nil { return nil, err } - out := make([]fs.DirEntry, 0, len(entries)) - for _, e := range entries { + out, err := iter.MapErr(entries, func(pe *fs.DirEntry) (fs.DirEntry, error) { + e := *pe if r.resolver.IsNestedFs(e.Name()) { filepath := path.Join("/", name, e.Name()) file, err := r.rootFS.Open(ctx, filepath) @@ -125,16 +126,22 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e } if err != nil { log.Error(ctx, "error creating nested fs", rlog.Error(err)) - out = append(out, e) - continue + return nil, fmt.Errorf("error creating nested fs: %w", err) } - - out = append(out, nestedfs) + return nestedfs, nil } else { - out = append(out, e) + return e, nil } + }) + + if err != nil { + log.Error(ctx, "error mapping entries", rlog.Error(err)) + err = nil } - return out, nil + + out = slices.DeleteFunc(out, func(e fs.DirEntry) bool { return e == nil }) + + return out, err } // Stat implements Filesystem. @@ -228,14 +235,14 @@ type FsFactory func(ctx context.Context, sourcePath string, f File) (Filesystem, func NewResolver(factories map[string]FsFactory) *Resolver { return &Resolver{ factories: factories, - fsmap: map[Hash]Filesystem{}, + fsmap: map[string]Filesystem{}, } } type Resolver struct { m sync.Mutex factories map[string]FsFactory - fsmap map[Hash]Filesystem // filesystem cache + fsmap map[string]Filesystem // filesystem cache // TODO: add fsmap clean } @@ -255,15 +262,10 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File return nil, file.Close(ctx) } - fileHash, err := FileHash(ctx, file) - if err != nil { - return nil, fmt.Errorf("error calculating file hash: %w", err) - } - r.m.Lock() defer r.m.Unlock() - if nestedFs, ok := r.fsmap[fileHash]; ok { + if nestedFs, ok := r.fsmap[fsPath]; ok { return nestedFs, file.Close(ctx) } @@ -276,7 +278,7 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File if err != nil { return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err) } - r.fsmap[fileHash] = nestedFs + r.fsmap[fsPath] = nestedFs return nestedFs, nil @@ -319,10 +321,10 @@ PARTS_LOOP: if err != nil { return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err) } - fileHash, err := FileHash(ctx, file) - if err != nil { - return "", nil, "", fmt.Errorf("error calculating file hash: %w", err) - } + // fileHash, err := FileHash(ctx, file) + // if err != nil { + // return "", nil, "", fmt.Errorf("error calculating file hash: %w", err) + // } err = file.Close(ctx) if err != nil { return "", nil, "", fmt.Errorf("error closing file: %w", err) @@ -335,7 +337,7 @@ PARTS_LOOP: r.m.Lock() defer r.m.Unlock() - if nestedFs, ok := r.fsmap[fileHash]; ok { + if nestedFs, ok := r.fsmap[fsPath]; ok { span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name()))) return fsPath, nestedFs, nestedFsPath, nil } else { @@ -352,7 +354,7 @@ PARTS_LOOP: if err != nil { return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err) } - r.fsmap[fileHash] = nestedFs + r.fsmap[fsPath] = nestedFs span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))