diff --git a/daemons/qbittorrent/client.go b/daemons/qbittorrent/client.go index 8263ca9..f86b294 100644 --- a/daemons/qbittorrent/client.go +++ b/daemons/qbittorrent/client.go @@ -7,17 +7,23 @@ import ( "time" "git.kmsign.ru/royalcat/tstor/pkg/qbittorrent" + "github.com/creativecreature/sturdyc" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/royalcat/btrgo/btrsync" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" ) +var meter = otel.Meter("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent") + type cacheClient struct { qb qbittorrent.Client propertiesCache *expirable.LRU[string, qbittorrent.TorrentProperties] - torrentsCache *expirable.LRU[string, qbittorrent.TorrentInfo] pieceCache btrsync.MapOf[pieceKey, int] + + infoClient *sturdyc.Client[*qbittorrent.TorrentInfo] } type pieceKey struct { @@ -26,7 +32,6 @@ type pieceKey struct { } func wrapClient(qb qbittorrent.Client) *cacheClient { - const ( cacheSize = 5000 cacheTTL = time.Minute @@ -35,34 +40,45 @@ func wrapClient(qb qbittorrent.Client) *cacheClient { return &cacheClient{ qb: qb, propertiesCache: expirable.NewLRU[string, qbittorrent.TorrentProperties](cacheSize, nil, cacheTTL), - torrentsCache: expirable.NewLRU[string, qbittorrent.TorrentInfo](cacheSize, nil, cacheTTL), - pieceCache: btrsync.MapOf[pieceKey, int]{}, + + infoClient: sturdyc.New[*qbittorrent.TorrentInfo](cacheSize, 1, cacheTTL, 10, + sturdyc.WithEarlyRefreshes(time.Minute, time.Minute*5, time.Second*10), + sturdyc.WithRefreshCoalescing(100, time.Second/4), + sturdyc.WithMetrics(newSturdycMetrics()), + ), + + pieceCache: btrsync.MapOf[pieceKey, int]{}, } } func (f *cacheClient) getInfo(ctx context.Context, hash string) (*qbittorrent.TorrentInfo, error) { - if v, ok := f.torrentsCache.Get(hash); ok { - return &v, nil - } + out, err := f.infoClient.GetOrFetchBatch(ctx, []string{hash}, + f.infoClient.BatchKeyFn(""), + func(ctx context.Context, ids []string) (map[string]*qbittorrent.TorrentInfo, error) { + infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{ + Hashes: ids, + }) + if err != nil { + return nil, fmt.Errorf("error to get torrents: %w", err) + } - infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{ - Hashes: []string{hash}, - }) + out := make(map[string]*qbittorrent.TorrentInfo) + for _, info := range infos { + out[info.Hash] = info + } + + return out, nil + }, + ) if err != nil { - return nil, fmt.Errorf("error to check torrent existence: %w", err) + return nil, err } - if len(infos) == 0 { + if out[hash] == nil { return nil, nil } - if len(infos) > 1 { - return nil, fmt.Errorf("multiple torrents with the same hash") - } - - f.torrentsCache.Add(hash, *infos[0]) - - return infos[0], nil + return out[hash], nil } func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) { @@ -162,3 +178,86 @@ func (f *cacheClient) waitPieceToComplete(ctx context.Context, hash string, piec } } } + +type sturdycMetrics struct { + ctx context.Context + + cacheHit metric.Int64Counter + cacheMiss metric.Int64Counter + refresh metric.Int64Counter + missing metric.Int64Counter + forcedEviction metric.Int64Counter + entryEviction metric.Int64Counter + batchSize metric.Int64Histogram + + observeCacheSize func() int +} + +var _ sturdyc.MetricsRecorder = (*sturdycMetrics)(nil) + +func newSturdycMetrics() *sturdycMetrics { + m := &sturdycMetrics{ + ctx: context.Background(), + cacheHit: must(meter.Int64Counter("sturdyc_cache_hit")), + cacheMiss: must(meter.Int64Counter("sturdyc_cache_miss")), + refresh: must(meter.Int64Counter("sturdyc_cache_refresh")), + missing: must(meter.Int64Counter("sturdyc_cache_missing")), + forcedEviction: must(meter.Int64Counter("sturdyc_cache_forced_eviction")), + entryEviction: must(meter.Int64Counter("sturdyc_cache_entry_eviction")), + batchSize: must(meter.Int64Histogram("sturdyc_cache_batch_size")), + } + + must(meter.Int64ObservableGauge("sturdyc_cache_size", + metric.WithInt64Callback(func(ctx context.Context, io metric.Int64Observer) error { + if m.observeCacheSize == nil { + return nil + } + io.Observe(int64(m.observeCacheSize())) + return nil + }))) + + return m +} + +func (s *sturdycMetrics) CacheHit() { + s.cacheHit.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) CacheMiss() { + s.cacheMiss.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) Refresh() { + s.refresh.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) MissingRecord() { + s.missing.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) ForcedEviction() { + s.forcedEviction.Add(s.ctx, 1) +} + +func (s *sturdycMetrics) CacheBatchRefreshSize(size int) { + s.batchSize.Record(s.ctx, int64(size)) +} + +func (s *sturdycMetrics) ObserveCacheSize(callback func() int) { + s.observeCacheSize = callback +} + +func (s *sturdycMetrics) EntriesEvicted(evictd int) { + s.entryEviction.Add(s.ctx, int64(evictd)) +} + +func (s *sturdycMetrics) ShardIndex(int) { + return +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/daemons/qbittorrent/daemon.go b/daemons/qbittorrent/daemon.go index cd53747..a7b88a3 100644 --- a/daemons/qbittorrent/daemon.go +++ b/daemons/qbittorrent/daemon.go @@ -222,7 +222,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf }) if err != nil { d.log.Error(ctx, "error adding torrent", rlog.Error(err)) - return err + return fmt.Errorf("error adding torrent: %w", err) } var props *qbittorrent.TorrentProperties @@ -247,7 +247,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf // info := existing[0] props, err := d.client.getProperties(ctx, ih.HexString()) if err != nil { - return err + return fmt.Errorf("error getting torrent properties: %w for infohash: %s", err, ih.HexString()) } d.registeredTorrents.Add(props.Hash) @@ -256,7 +256,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath)) err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath) if err != nil { - return err + return fmt.Errorf("error moving torrent: %w", err) } } diff --git a/daemons/qbittorrent/fs.go b/daemons/qbittorrent/fs.go index d383276..ca5ecbb 100644 --- a/daemons/qbittorrent/fs.go +++ b/daemons/qbittorrent/fs.go @@ -274,6 +274,9 @@ func (f *File) canExpectSoon(ctx context.Context) (bool, error) { if err != nil { return false, err } + if info == nil { + return false, nil + } return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil } @@ -295,7 +298,7 @@ func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (boo return true, nil } -func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error { +func (f *File) waitRangeAvailable(ctx context.Context, offset int64, size int) error { complete, err := f.isRangeComplete(ctx, offset, size) if err != nil { return err @@ -338,7 +341,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) { f.mu.Lock() defer f.mu.Unlock() - if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil { + if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil { return 0, err } @@ -349,7 +352,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) { // ReadAt implements vfs.File. func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { - if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil { + if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil { return 0, err }