Compare commits
4 commits
95016d54c1
...
2d9dcd87fa
Author | SHA1 | Date | |
---|---|---|---|
2d9dcd87fa | |||
36c501347c | |||
94ca4cf599 | |||
b77ce50a7b |
11 changed files with 255 additions and 75 deletions
|
@ -119,9 +119,15 @@ func run(configPath string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
vfs.Walk(ctx, sfs, "/", func(path string, info fs.FileInfo, err error) error {
|
||||
return nil
|
||||
})
|
||||
go func() {
|
||||
log := log.WithComponent("background-scanner")
|
||||
err := vfs.Walk(ctx, sfs, "/", func(path string, info fs.FileInfo, err error) error {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(ctx, "error walking filesystem", rlog.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
if conf.Mounts.Fuse.Enabled {
|
||||
mh := fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path)
|
||||
|
|
|
@ -7,17 +7,23 @@ import (
|
|||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||
"github.com/creativecreature/sturdyc"
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/royalcat/btrgo/btrsync"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
var meter = otel.Meter("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent")
|
||||
|
||||
type cacheClient struct {
|
||||
qb qbittorrent.Client
|
||||
|
||||
propertiesCache *expirable.LRU[string, qbittorrent.TorrentProperties]
|
||||
torrentsCache *expirable.LRU[string, qbittorrent.TorrentInfo]
|
||||
|
||||
pieceCache btrsync.MapOf[pieceKey, int]
|
||||
|
||||
infoClient *sturdyc.Client[*qbittorrent.TorrentInfo]
|
||||
}
|
||||
|
||||
type pieceKey struct {
|
||||
|
@ -26,7 +32,6 @@ type pieceKey struct {
|
|||
}
|
||||
|
||||
func wrapClient(qb qbittorrent.Client) *cacheClient {
|
||||
|
||||
const (
|
||||
cacheSize = 5000
|
||||
cacheTTL = time.Minute
|
||||
|
@ -35,34 +40,45 @@ func wrapClient(qb qbittorrent.Client) *cacheClient {
|
|||
return &cacheClient{
|
||||
qb: qb,
|
||||
propertiesCache: expirable.NewLRU[string, qbittorrent.TorrentProperties](cacheSize, nil, cacheTTL),
|
||||
torrentsCache: expirable.NewLRU[string, qbittorrent.TorrentInfo](cacheSize, nil, cacheTTL),
|
||||
pieceCache: btrsync.MapOf[pieceKey, int]{},
|
||||
|
||||
infoClient: sturdyc.New[*qbittorrent.TorrentInfo](cacheSize, 1, cacheTTL, 10,
|
||||
sturdyc.WithEarlyRefreshes(time.Minute, time.Minute*5, time.Second*10),
|
||||
sturdyc.WithRefreshCoalescing(100, time.Second/4),
|
||||
sturdyc.WithMetrics(newSturdycMetrics()),
|
||||
),
|
||||
|
||||
pieceCache: btrsync.MapOf[pieceKey, int]{},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *cacheClient) getInfo(ctx context.Context, hash string) (*qbittorrent.TorrentInfo, error) {
|
||||
if v, ok := f.torrentsCache.Get(hash); ok {
|
||||
return &v, nil
|
||||
}
|
||||
out, err := f.infoClient.GetOrFetchBatch(ctx, []string{hash},
|
||||
f.infoClient.BatchKeyFn(""),
|
||||
func(ctx context.Context, ids []string) (map[string]*qbittorrent.TorrentInfo, error) {
|
||||
infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
|
||||
Hashes: ids,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error to get torrents: %w", err)
|
||||
}
|
||||
|
||||
infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
|
||||
Hashes: []string{hash},
|
||||
})
|
||||
out := make(map[string]*qbittorrent.TorrentInfo)
|
||||
for _, info := range infos {
|
||||
out[info.Hash] = info
|
||||
}
|
||||
|
||||
return out, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error to check torrent existence: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(infos) == 0 {
|
||||
if out[hash] == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if len(infos) > 1 {
|
||||
return nil, fmt.Errorf("multiple torrents with the same hash")
|
||||
}
|
||||
|
||||
f.torrentsCache.Add(hash, *infos[0])
|
||||
|
||||
return infos[0], nil
|
||||
return out[hash], nil
|
||||
}
|
||||
|
||||
func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) {
|
||||
|
@ -162,3 +178,86 @@ func (f *cacheClient) waitPieceToComplete(ctx context.Context, hash string, piec
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type sturdycMetrics struct {
|
||||
ctx context.Context
|
||||
|
||||
cacheHit metric.Int64Counter
|
||||
cacheMiss metric.Int64Counter
|
||||
refresh metric.Int64Counter
|
||||
missing metric.Int64Counter
|
||||
forcedEviction metric.Int64Counter
|
||||
entryEviction metric.Int64Counter
|
||||
batchSize metric.Int64Histogram
|
||||
|
||||
observeCacheSize func() int
|
||||
}
|
||||
|
||||
var _ sturdyc.MetricsRecorder = (*sturdycMetrics)(nil)
|
||||
|
||||
func newSturdycMetrics() *sturdycMetrics {
|
||||
m := &sturdycMetrics{
|
||||
ctx: context.Background(),
|
||||
cacheHit: must(meter.Int64Counter("sturdyc_cache_hit")),
|
||||
cacheMiss: must(meter.Int64Counter("sturdyc_cache_miss")),
|
||||
refresh: must(meter.Int64Counter("sturdyc_cache_refresh")),
|
||||
missing: must(meter.Int64Counter("sturdyc_cache_missing")),
|
||||
forcedEviction: must(meter.Int64Counter("sturdyc_cache_forced_eviction")),
|
||||
entryEviction: must(meter.Int64Counter("sturdyc_cache_entry_eviction")),
|
||||
batchSize: must(meter.Int64Histogram("sturdyc_cache_batch_size")),
|
||||
}
|
||||
|
||||
must(meter.Int64ObservableGauge("sturdyc_cache_size",
|
||||
metric.WithInt64Callback(func(ctx context.Context, io metric.Int64Observer) error {
|
||||
if m.observeCacheSize == nil {
|
||||
return nil
|
||||
}
|
||||
io.Observe(int64(m.observeCacheSize()))
|
||||
return nil
|
||||
})))
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) CacheHit() {
|
||||
s.cacheHit.Add(s.ctx, 1)
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) CacheMiss() {
|
||||
s.cacheMiss.Add(s.ctx, 1)
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) Refresh() {
|
||||
s.refresh.Add(s.ctx, 1)
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) MissingRecord() {
|
||||
s.missing.Add(s.ctx, 1)
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) ForcedEviction() {
|
||||
s.forcedEviction.Add(s.ctx, 1)
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) CacheBatchRefreshSize(size int) {
|
||||
s.batchSize.Record(s.ctx, int64(size))
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) ObserveCacheSize(callback func() int) {
|
||||
s.observeCacheSize = callback
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) EntriesEvicted(evictd int) {
|
||||
s.entryEviction.Add(s.ctx, int64(evictd))
|
||||
}
|
||||
|
||||
func (s *sturdycMetrics) ShardIndex(int) {
|
||||
return
|
||||
}
|
||||
|
||||
func must[T any](v T, err error) T {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
|
|||
})
|
||||
if err != nil {
|
||||
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
|
||||
return err
|
||||
return fmt.Errorf("error adding torrent: %w", err)
|
||||
}
|
||||
|
||||
var props *qbittorrent.TorrentProperties
|
||||
|
@ -247,7 +247,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
|
|||
// info := existing[0]
|
||||
props, err := d.client.getProperties(ctx, ih.HexString())
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("error getting torrent properties: %w for infohash: %s", err, ih.HexString())
|
||||
}
|
||||
|
||||
d.registeredTorrents.Add(props.Hash)
|
||||
|
@ -256,7 +256,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
|
|||
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
|
||||
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("error moving torrent: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -274,6 +274,9 @@ func (f *File) canExpectSoon(ctx context.Context) (bool, error) {
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if info == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil
|
||||
}
|
||||
|
@ -295,7 +298,7 @@ func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (boo
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error {
|
||||
func (f *File) waitRangeAvailable(ctx context.Context, offset int64, size int) error {
|
||||
complete, err := f.isRangeComplete(ctx, offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -338,7 +341,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) {
|
|||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
|
||||
if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
@ -349,7 +352,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) {
|
|||
|
||||
// ReadAt implements vfs.File.
|
||||
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
|
||||
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
|
||||
if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
|
4
go.mod
4
go.mod
|
@ -14,6 +14,7 @@ require (
|
|||
github.com/billziss-gh/cgofuse v1.5.0
|
||||
github.com/bodgit/sevenzip v1.5.1
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/creativecreature/sturdyc v1.0.6
|
||||
github.com/deckarep/golang-set/v2 v2.6.0
|
||||
github.com/dgraph-io/badger/v4 v4.2.0
|
||||
github.com/dgraph-io/ristretto v0.1.1
|
||||
|
@ -48,6 +49,7 @@ require (
|
|||
github.com/rs/zerolog v1.32.0
|
||||
github.com/samber/slog-multi v1.0.2
|
||||
github.com/samber/slog-zerolog v1.0.0
|
||||
github.com/sourcegraph/conc v0.3.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/urfave/cli/v2 v2.27.4
|
||||
github.com/vektah/gqlparser/v2 v2.5.17
|
||||
|
@ -180,6 +182,8 @@ require (
|
|||
go.opentelemetry.io/contrib v1.21.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/multierr v1.9.0 // indirect
|
||||
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
|
||||
golang.org/x/crypto v0.28.0 // indirect
|
||||
golang.org/x/mod v0.20.0 // indirect
|
||||
|
|
8
go.sum
8
go.sum
|
@ -152,6 +152,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
|
|||
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/creativecreature/sturdyc v1.0.6 h1:hSYm0j7L0Vug3ho8ozFKK9iihFMYgk/cOaQXmL2G/Ho=
|
||||
github.com/creativecreature/sturdyc v1.0.6/go.mod h1:Qwi5+41ERVF0708Mymjrko+JlLnmJ2/T9Wb/Xsax3f4=
|
||||
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
|
||||
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
@ -570,6 +572,8 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
|
|||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4=
|
||||
github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg=
|
||||
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
|
||||
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
|
@ -657,8 +661,12 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY
|
|||
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
|
||||
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
|
||||
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
|
||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
|
||||
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
|
||||
go4.org v0.0.0-20200411211856-f5505b9728dd h1:BNJlw5kRTzdmyfh5U8F93HA2OwkP7ZGwA51eJ/0wKOU=
|
||||
go4.org v0.0.0-20200411211856-f5505b9728dd/go.mod h1:CIiUVy99QCPfoE13bO4EZaz5GZMZXMSBGhxRdsvzbkg=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
|
|
|
@ -51,7 +51,7 @@ func (wd *WebDAV) RemoveAll(ctx context.Context, name string) error {
|
|||
}
|
||||
|
||||
func (wd *WebDAV) Rename(ctx context.Context, oldName, newName string) error {
|
||||
return webdav.ErrNotImplemented
|
||||
return wd.fs.Rename(ctx, oldName, newName)
|
||||
}
|
||||
|
||||
func (wd *WebDAV) lookupFile(ctx context.Context, name string) (vfs.File, error) {
|
||||
|
|
|
@ -41,7 +41,7 @@ var ArchiveFactories = map[string]FsFactory{
|
|||
},
|
||||
}
|
||||
|
||||
type archiveLoader func(ctx context.Context, archivePath string, r ctxio.ReaderAt, size int64) (map[string]fileEntry, error)
|
||||
type archiveLoader func(ctx context.Context, archivePath string, r File, size int64) (map[string]fileEntry, error)
|
||||
|
||||
var _ Filesystem = &ArchiveFS{}
|
||||
|
||||
|
@ -88,8 +88,8 @@ func (a *ArchiveFS) FsName() string {
|
|||
return "archivefs"
|
||||
}
|
||||
|
||||
func NewArchive(ctx context.Context, archivePath, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) {
|
||||
archiveFiles, err := loader(ctx, archivePath, r, size)
|
||||
func NewArchive(ctx context.Context, archivePath, name string, f File, size int64, loader archiveLoader) (*ArchiveFS, error) {
|
||||
archiveFiles, err := loader(ctx, archivePath, f, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -281,7 +281,12 @@ type archiveFileReaderFactory func(ctx context.Context) (ctxio.ReadCloser, error
|
|||
|
||||
var _ archiveLoader = ZipLoader
|
||||
|
||||
func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
|
||||
func ZipLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) {
|
||||
hash, err := FileHash(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader := ctxio.IoReaderAt(ctx, f)
|
||||
zr, err := zip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
|
@ -314,7 +319,7 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i
|
|||
|
||||
info := zipFile.FileInfo()
|
||||
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: zipFile.Name}, info.Size(), af)
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: zipFile.Name}, info.Size(), af)
|
||||
|
||||
out[AbsPath(zipFile.Name)] = fileEntry{
|
||||
FileInfo: info,
|
||||
|
@ -329,7 +334,12 @@ func ZipLoader(ctx context.Context, archivePath string, f ctxio.ReaderAt, size i
|
|||
|
||||
var _ archiveLoader = SevenZipLoader
|
||||
|
||||
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
|
||||
func SevenZipLoader(ctx context.Context, archivePath string, ctxreader File, size int64) (map[string]fileEntry, error) {
|
||||
hash, err := FileHash(ctx, ctxreader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader := ctxio.IoReaderAt(ctx, ctxreader)
|
||||
r, err := sevenzip.NewReader(reader, size)
|
||||
if err != nil {
|
||||
|
@ -361,7 +371,7 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea
|
|||
|
||||
info := f.FileInfo()
|
||||
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: f.Name}, info.Size(), af)
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: f.Name}, info.Size(), af)
|
||||
|
||||
out[AbsPath(f.Name)] = fileEntry{
|
||||
FileInfo: f.FileInfo(),
|
||||
|
@ -376,8 +386,13 @@ func SevenZipLoader(ctx context.Context, archivePath string, ctxreader ctxio.Rea
|
|||
|
||||
var _ archiveLoader = RarLoader
|
||||
|
||||
func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt, size int64) (map[string]fileEntry, error) {
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||
func RarLoader(ctx context.Context, archivePath string, f File, size int64) (map[string]fileEntry, error) {
|
||||
hash, err := FileHash(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
|
||||
|
||||
r, err := rardecode.NewReader(reader)
|
||||
if err != nil {
|
||||
|
@ -396,7 +411,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt
|
|||
|
||||
name := header.Name
|
||||
af := func(ctx context.Context) (ctxio.ReadCloser, error) {
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, ctxreader, size)
|
||||
reader := ioutils.WrapIoReadSeeker(ctx, f, size)
|
||||
r, err := rardecode.NewReader(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -413,7 +428,7 @@ func RarLoader(ctx context.Context, archivePath string, ctxreader ctxio.ReaderAt
|
|||
return nil, fmt.Errorf("file with name '%s' not found", name)
|
||||
}
|
||||
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archive: archivePath, filename: header.Name}, header.UnPackedSize, af)
|
||||
rr := newRandomReaderFromLinear(archiveFileIndex{archiveHash: hash, filename: header.Name}, header.UnPackedSize, af)
|
||||
|
||||
out[AbsPath(header.Name)] = fileEntry{
|
||||
FileInfo: NewFileInfo(header.Name, header.UnPackedSize),
|
||||
|
|
|
@ -18,8 +18,8 @@ const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage
|
|||
const defaultBlockCount = cacheSize / blockSize
|
||||
|
||||
type archiveFileIndex struct {
|
||||
archive string
|
||||
filename string
|
||||
archiveHash Hash
|
||||
filename string
|
||||
}
|
||||
|
||||
type blockIndex struct {
|
||||
|
@ -107,7 +107,7 @@ func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (
|
|||
a.readerMutex.Lock()
|
||||
defer a.readerMutex.Unlock()
|
||||
|
||||
if b, ok := blockCache.Get(bI); ok { // check again, maybe another goroutine already read this block
|
||||
if b, ok := blockCache.Get(bI); ok && b.len != 0 { // check again, maybe another goroutine already read this block
|
||||
return b, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,10 +4,11 @@ import (
|
|||
"archive/zip"
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/fs"
|
||||
"testing"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
"github.com/royalcat/ctxio"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -62,24 +63,24 @@ func TestZipFilesystem(t *testing.T) {
|
|||
f, err := zfs.Open(ctx, "/path/to/test/file/1.txt")
|
||||
require.NoError(err)
|
||||
n, err := f.Read(ctx, out)
|
||||
require.NoError(err)
|
||||
require.ErrorIs(err, io.EOF)
|
||||
require.Equal(5, n)
|
||||
require.Equal([]byte("Hello"), out)
|
||||
|
||||
outSpace := make([]byte, 1)
|
||||
n, err = f.Read(ctx, outSpace)
|
||||
require.NoError(err)
|
||||
require.ErrorIs(err, io.EOF)
|
||||
require.Equal(1, n)
|
||||
require.Equal([]byte(" "), outSpace)
|
||||
|
||||
n, err = f.Read(ctx, out)
|
||||
require.NoError(err)
|
||||
require.ErrorIs(err, io.EOF)
|
||||
require.Equal(5, n)
|
||||
require.Equal([]byte("World"), out)
|
||||
|
||||
}
|
||||
|
||||
func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) {
|
||||
func createTestZip(require *require.Assertions) (vfs.File, int64) {
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
|
||||
zWriter := zip.NewWriter(buf)
|
||||
|
@ -95,17 +96,59 @@ func createTestZip(require *require.Assertions) (ctxio.ReaderAt, int64) {
|
|||
return newCBR(buf.Bytes()), int64(buf.Len())
|
||||
}
|
||||
|
||||
type closeableByteReader struct {
|
||||
data *bytes.Reader
|
||||
}
|
||||
|
||||
func newCBR(b []byte) *closeableByteReader {
|
||||
return &closeableByteReader{
|
||||
data: bytes.NewReader(b),
|
||||
}
|
||||
}
|
||||
|
||||
var _ vfs.File = &closeableByteReader{}
|
||||
|
||||
type closeableByteReader struct {
|
||||
data *bytes.Reader
|
||||
}
|
||||
|
||||
// ReadAt implements ctxio.ReaderAt.
|
||||
func (c *closeableByteReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
|
||||
return c.data.ReadAt(p, off)
|
||||
}
|
||||
|
||||
// Close implements vfs.File.
|
||||
func (c *closeableByteReader) Close(ctx context.Context) error {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Info implements vfs.File.
|
||||
func (c *closeableByteReader) Info() (fs.FileInfo, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// IsDir implements vfs.File.
|
||||
func (c *closeableByteReader) IsDir() bool {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Name implements vfs.File.
|
||||
func (c *closeableByteReader) Name() string {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Read implements vfs.File.
|
||||
func (c *closeableByteReader) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||
return c.data.Read(p)
|
||||
}
|
||||
|
||||
// Seek implements vfs.File.
|
||||
func (c *closeableByteReader) Seek(offset int64, whence int) (int64, error) {
|
||||
return c.data.Seek(offset, whence)
|
||||
}
|
||||
|
||||
// Size implements vfs.File.
|
||||
func (c *closeableByteReader) Size() int64 {
|
||||
return c.data.Size()
|
||||
}
|
||||
|
||||
// Type implements vfs.File.
|
||||
func (c *closeableByteReader) Type() fs.FileMode {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
"github.com/sourcegraph/conc/iter"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/exp/maps"
|
||||
|
@ -111,8 +112,8 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]fs.DirEntry, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
out, err := iter.MapErr(entries, func(pe *fs.DirEntry) (fs.DirEntry, error) {
|
||||
e := *pe
|
||||
if r.resolver.IsNestedFs(e.Name()) {
|
||||
filepath := path.Join("/", name, e.Name())
|
||||
file, err := r.rootFS.Open(ctx, filepath)
|
||||
|
@ -125,16 +126,22 @@ func (r *ResolverFS) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e
|
|||
}
|
||||
if err != nil {
|
||||
log.Error(ctx, "error creating nested fs", rlog.Error(err))
|
||||
out = append(out, e)
|
||||
continue
|
||||
return nil, fmt.Errorf("error creating nested fs: %w", err)
|
||||
}
|
||||
|
||||
out = append(out, nestedfs)
|
||||
return nestedfs, nil
|
||||
} else {
|
||||
out = append(out, e)
|
||||
return e, nil
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Error(ctx, "error mapping entries", rlog.Error(err))
|
||||
err = nil
|
||||
}
|
||||
return out, nil
|
||||
|
||||
out = slices.DeleteFunc(out, func(e fs.DirEntry) bool { return e == nil })
|
||||
|
||||
return out, err
|
||||
}
|
||||
|
||||
// Stat implements Filesystem.
|
||||
|
@ -228,14 +235,14 @@ type FsFactory func(ctx context.Context, sourcePath string, f File) (Filesystem,
|
|||
func NewResolver(factories map[string]FsFactory) *Resolver {
|
||||
return &Resolver{
|
||||
factories: factories,
|
||||
fsmap: map[Hash]Filesystem{},
|
||||
fsmap: map[string]Filesystem{},
|
||||
}
|
||||
}
|
||||
|
||||
type Resolver struct {
|
||||
m sync.Mutex
|
||||
factories map[string]FsFactory
|
||||
fsmap map[Hash]Filesystem // filesystem cache
|
||||
fsmap map[string]Filesystem // filesystem cache
|
||||
// TODO: add fsmap clean
|
||||
}
|
||||
|
||||
|
@ -255,15 +262,10 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
|
|||
return nil, file.Close(ctx)
|
||||
}
|
||||
|
||||
fileHash, err := FileHash(ctx, file)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error calculating file hash: %w", err)
|
||||
}
|
||||
|
||||
r.m.Lock()
|
||||
defer r.m.Unlock()
|
||||
|
||||
if nestedFs, ok := r.fsmap[fileHash]; ok {
|
||||
if nestedFs, ok := r.fsmap[fsPath]; ok {
|
||||
return nestedFs, file.Close(ctx)
|
||||
}
|
||||
|
||||
|
@ -276,7 +278,7 @@ func (r *Resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("error calling nest factory: %s with error: %w", fsPath, err)
|
||||
}
|
||||
r.fsmap[fileHash] = nestedFs
|
||||
r.fsmap[fsPath] = nestedFs
|
||||
|
||||
return nestedFs, nil
|
||||
|
||||
|
@ -319,10 +321,10 @@ PARTS_LOOP:
|
|||
if err != nil {
|
||||
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
|
||||
}
|
||||
fileHash, err := FileHash(ctx, file)
|
||||
if err != nil {
|
||||
return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
|
||||
}
|
||||
// fileHash, err := FileHash(ctx, file)
|
||||
// if err != nil {
|
||||
// return "", nil, "", fmt.Errorf("error calculating file hash: %w", err)
|
||||
// }
|
||||
err = file.Close(ctx)
|
||||
if err != nil {
|
||||
return "", nil, "", fmt.Errorf("error closing file: %w", err)
|
||||
|
@ -335,7 +337,7 @@ PARTS_LOOP:
|
|||
r.m.Lock()
|
||||
defer r.m.Unlock()
|
||||
|
||||
if nestedFs, ok := r.fsmap[fileHash]; ok {
|
||||
if nestedFs, ok := r.fsmap[fsPath]; ok {
|
||||
span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
|
||||
return fsPath, nestedFs, nestedFsPath, nil
|
||||
} else {
|
||||
|
@ -352,7 +354,7 @@ PARTS_LOOP:
|
|||
if err != nil {
|
||||
return "", nil, "", fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
|
||||
}
|
||||
r.fsmap[fileHash] = nestedFs
|
||||
r.fsmap[fsPath] = nestedFs
|
||||
|
||||
span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue