This commit is contained in:
parent
36c501347c
commit
2d9dcd87fa
3 changed files with 127 additions and 25 deletions
|
@ -7,17 +7,23 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
|
||||||
|
"github.com/creativecreature/sturdyc"
|
||||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||||
"github.com/royalcat/btrgo/btrsync"
|
"github.com/royalcat/btrgo/btrsync"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var meter = otel.Meter("git.kmsign.ru/royalcat/tstor/daemons/qbittorrent")
|
||||||
|
|
||||||
type cacheClient struct {
|
type cacheClient struct {
|
||||||
qb qbittorrent.Client
|
qb qbittorrent.Client
|
||||||
|
|
||||||
propertiesCache *expirable.LRU[string, qbittorrent.TorrentProperties]
|
propertiesCache *expirable.LRU[string, qbittorrent.TorrentProperties]
|
||||||
torrentsCache *expirable.LRU[string, qbittorrent.TorrentInfo]
|
|
||||||
|
|
||||||
pieceCache btrsync.MapOf[pieceKey, int]
|
pieceCache btrsync.MapOf[pieceKey, int]
|
||||||
|
|
||||||
|
infoClient *sturdyc.Client[*qbittorrent.TorrentInfo]
|
||||||
}
|
}
|
||||||
|
|
||||||
type pieceKey struct {
|
type pieceKey struct {
|
||||||
|
@ -26,7 +32,6 @@ type pieceKey struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func wrapClient(qb qbittorrent.Client) *cacheClient {
|
func wrapClient(qb qbittorrent.Client) *cacheClient {
|
||||||
|
|
||||||
const (
|
const (
|
||||||
cacheSize = 5000
|
cacheSize = 5000
|
||||||
cacheTTL = time.Minute
|
cacheTTL = time.Minute
|
||||||
|
@ -35,34 +40,45 @@ func wrapClient(qb qbittorrent.Client) *cacheClient {
|
||||||
return &cacheClient{
|
return &cacheClient{
|
||||||
qb: qb,
|
qb: qb,
|
||||||
propertiesCache: expirable.NewLRU[string, qbittorrent.TorrentProperties](cacheSize, nil, cacheTTL),
|
propertiesCache: expirable.NewLRU[string, qbittorrent.TorrentProperties](cacheSize, nil, cacheTTL),
|
||||||
torrentsCache: expirable.NewLRU[string, qbittorrent.TorrentInfo](cacheSize, nil, cacheTTL),
|
|
||||||
|
infoClient: sturdyc.New[*qbittorrent.TorrentInfo](cacheSize, 1, cacheTTL, 10,
|
||||||
|
sturdyc.WithEarlyRefreshes(time.Minute, time.Minute*5, time.Second*10),
|
||||||
|
sturdyc.WithRefreshCoalescing(100, time.Second/4),
|
||||||
|
sturdyc.WithMetrics(newSturdycMetrics()),
|
||||||
|
),
|
||||||
|
|
||||||
pieceCache: btrsync.MapOf[pieceKey, int]{},
|
pieceCache: btrsync.MapOf[pieceKey, int]{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *cacheClient) getInfo(ctx context.Context, hash string) (*qbittorrent.TorrentInfo, error) {
|
func (f *cacheClient) getInfo(ctx context.Context, hash string) (*qbittorrent.TorrentInfo, error) {
|
||||||
if v, ok := f.torrentsCache.Get(hash); ok {
|
out, err := f.infoClient.GetOrFetchBatch(ctx, []string{hash},
|
||||||
return &v, nil
|
f.infoClient.BatchKeyFn(""),
|
||||||
}
|
func(ctx context.Context, ids []string) (map[string]*qbittorrent.TorrentInfo, error) {
|
||||||
|
|
||||||
infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
|
infos, err := f.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{
|
||||||
Hashes: []string{hash},
|
Hashes: ids,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error to check torrent existence: %w", err)
|
return nil, fmt.Errorf("error to get torrents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(infos) == 0 {
|
out := make(map[string]*qbittorrent.TorrentInfo)
|
||||||
|
for _, info := range infos {
|
||||||
|
out[info.Hash] = info
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if out[hash] == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(infos) > 1 {
|
return out[hash], nil
|
||||||
return nil, fmt.Errorf("multiple torrents with the same hash")
|
|
||||||
}
|
|
||||||
|
|
||||||
f.torrentsCache.Add(hash, *infos[0])
|
|
||||||
|
|
||||||
return infos[0], nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) {
|
func (f *cacheClient) getProperties(ctx context.Context, hash string) (*qbittorrent.TorrentProperties, error) {
|
||||||
|
@ -162,3 +178,86 @@ func (f *cacheClient) waitPieceToComplete(ctx context.Context, hash string, piec
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sturdycMetrics struct {
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
cacheHit metric.Int64Counter
|
||||||
|
cacheMiss metric.Int64Counter
|
||||||
|
refresh metric.Int64Counter
|
||||||
|
missing metric.Int64Counter
|
||||||
|
forcedEviction metric.Int64Counter
|
||||||
|
entryEviction metric.Int64Counter
|
||||||
|
batchSize metric.Int64Histogram
|
||||||
|
|
||||||
|
observeCacheSize func() int
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ sturdyc.MetricsRecorder = (*sturdycMetrics)(nil)
|
||||||
|
|
||||||
|
func newSturdycMetrics() *sturdycMetrics {
|
||||||
|
m := &sturdycMetrics{
|
||||||
|
ctx: context.Background(),
|
||||||
|
cacheHit: must(meter.Int64Counter("sturdyc_cache_hit")),
|
||||||
|
cacheMiss: must(meter.Int64Counter("sturdyc_cache_miss")),
|
||||||
|
refresh: must(meter.Int64Counter("sturdyc_cache_refresh")),
|
||||||
|
missing: must(meter.Int64Counter("sturdyc_cache_missing")),
|
||||||
|
forcedEviction: must(meter.Int64Counter("sturdyc_cache_forced_eviction")),
|
||||||
|
entryEviction: must(meter.Int64Counter("sturdyc_cache_entry_eviction")),
|
||||||
|
batchSize: must(meter.Int64Histogram("sturdyc_cache_batch_size")),
|
||||||
|
}
|
||||||
|
|
||||||
|
must(meter.Int64ObservableGauge("sturdyc_cache_size",
|
||||||
|
metric.WithInt64Callback(func(ctx context.Context, io metric.Int64Observer) error {
|
||||||
|
if m.observeCacheSize == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
io.Observe(int64(m.observeCacheSize()))
|
||||||
|
return nil
|
||||||
|
})))
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) CacheHit() {
|
||||||
|
s.cacheHit.Add(s.ctx, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) CacheMiss() {
|
||||||
|
s.cacheMiss.Add(s.ctx, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) Refresh() {
|
||||||
|
s.refresh.Add(s.ctx, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) MissingRecord() {
|
||||||
|
s.missing.Add(s.ctx, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) ForcedEviction() {
|
||||||
|
s.forcedEviction.Add(s.ctx, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) CacheBatchRefreshSize(size int) {
|
||||||
|
s.batchSize.Record(s.ctx, int64(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) ObserveCacheSize(callback func() int) {
|
||||||
|
s.observeCacheSize = callback
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) EntriesEvicted(evictd int) {
|
||||||
|
s.entryEviction.Add(s.ctx, int64(evictd))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sturdycMetrics) ShardIndex(int) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func must[T any](v T, err error) T {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
|
@ -222,7 +222,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
|
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
|
||||||
return err
|
return fmt.Errorf("error adding torrent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var props *qbittorrent.TorrentProperties
|
var props *qbittorrent.TorrentProperties
|
||||||
|
@ -247,7 +247,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
|
||||||
// info := existing[0]
|
// info := existing[0]
|
||||||
props, err := d.client.getProperties(ctx, ih.HexString())
|
props, err := d.client.getProperties(ctx, ih.HexString())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("error getting torrent properties: %w for infohash: %s", err, ih.HexString())
|
||||||
}
|
}
|
||||||
|
|
||||||
d.registeredTorrents.Add(props.Hash)
|
d.registeredTorrents.Add(props.Hash)
|
||||||
|
@ -256,7 +256,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
|
||||||
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
|
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
|
||||||
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)
|
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("error moving torrent: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -274,6 +274,9 @@ func (f *File) canExpectSoon(ctx context.Context) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
if info == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil
|
return info.Completed == info.Size || info.State == qbittorrent.TorrentStateCheckingUP || info.State == qbittorrent.TorrentStateDownloading || info.State == qbittorrent.TorrentStateForcedDL, nil
|
||||||
}
|
}
|
||||||
|
@ -295,7 +298,7 @@ func (f *File) isRangeComplete(ctx context.Context, offset int64, size int) (boo
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *File) waitPieceAvailable(ctx context.Context, offset int64, size int) error {
|
func (f *File) waitRangeAvailable(ctx context.Context, offset int64, size int) error {
|
||||||
complete, err := f.isRangeComplete(ctx, offset, size)
|
complete, err := f.isRangeComplete(ctx, offset, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -338,7 +341,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) {
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
defer f.mu.Unlock()
|
||||||
|
|
||||||
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
|
if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,7 +352,7 @@ func (f *File) Read(ctx context.Context, p []byte) (int, error) {
|
||||||
|
|
||||||
// ReadAt implements vfs.File.
|
// ReadAt implements vfs.File.
|
||||||
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
|
func (f *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
|
||||||
if err := f.waitPieceAvailable(ctx, f.offset, len(p)); err != nil {
|
if err := f.waitRangeAvailable(ctx, f.offset, len(p)); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue