new kv, ctx in nfs handler

This commit is contained in:
royalcat 2024-06-17 00:34:46 +03:00
parent 609d69fb5a
commit bc4b39b1c1
41 changed files with 270 additions and 222 deletions

View file

@ -2,6 +2,7 @@ package nfs
import (
"context"
"errors"
"fmt"
"path"
"strings"
@ -10,6 +11,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
@ -50,6 +52,7 @@ var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvh
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) {
opts := kvbadger.DefaultOptions(path.Join(config.CachePath, "handlers"))
opts.DefaultTTL = time.Hour
opts.BadgerOptions.Logger = log.BadgerLogger("nfs", "kvhandler")
activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts)
if err != nil {
@ -98,8 +101,7 @@ type CachingHandler struct {
// ToHandle takes a file and represents it with an opaque handle to reference it.
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
// but we can generalize with a stateful local cache of handed out IDs.
func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte {
ctx := context.Background()
func (c *CachingHandler) ToHandle(ctx context.Context, _ nfs.Filesystem, path []string) []byte {
var id uuid.UUID
cacheKey := handle(path).String()
@ -123,31 +125,29 @@ func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte {
}
// FromHandle converts from an opaque handle to the file it represents
func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error) {
func (c *CachingHandler) FromHandle(ctx context.Context, fh []byte) (nfs.Filesystem, []string, error) {
c.mu.Lock()
defer c.mu.Unlock()
ctx := context.Background()
id, err := uuid.FromBytes(fh)
if err != nil {
return nil, nil, err
}
paths, found, err := c.activeHandles.Get(ctx, id)
paths, err := c.activeHandles.Get(ctx, id)
if err != nil {
if errors.Is(err, kv.ErrKeyNotFound) {
return nil, nil, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
return nil, nil, fmt.Errorf("kv error: %w", err)
}
if found {
return c.fs, paths, nil
}
return c.fs, paths, nil
return nil, nil, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) error {
ctx := context.Background()
func (c *CachingHandler) InvalidateHandle(ctx context.Context, fs nfs.Filesystem, handle []byte) error {
//Remove from cache
id, err := uuid.FromBytes(handle)
if err != nil {

View file

@ -5,12 +5,19 @@ import (
"log/slog"
"strings"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/dgraph-io/badger/v4"
)
var _ badger.Logger = (*Badger)(nil)
func BadgerLogger(name ...string) badger.Logger {
return &badgerLogger{
L: rlog.Component(append(name, "badger")...).Slog(),
}
}
type Badger struct {
var _ badger.Logger = (*badgerLogger)(nil)
type badgerLogger struct {
L *slog.Logger
}
@ -18,18 +25,18 @@ func fmtBadgerLog(m string, f ...any) string {
return fmt.Sprintf(strings.ReplaceAll(m, "\n", ""), f...)
}
func (l *Badger) Errorf(m string, f ...interface{}) {
func (l *badgerLogger) Errorf(m string, f ...interface{}) {
l.L.Error(fmtBadgerLog(m, f...))
}
func (l *Badger) Warningf(m string, f ...interface{}) {
func (l *badgerLogger) Warningf(m string, f ...interface{}) {
l.L.Warn(fmtBadgerLog(m, f...))
}
func (l *Badger) Infof(m string, f ...interface{}) {
func (l *badgerLogger) Infof(m string, f ...interface{}) {
l.L.Info(fmtBadgerLog(m, f...))
}
func (l *Badger) Debugf(m string, f ...interface{}) {
func (l *badgerLogger) Debugf(m string, f ...interface{}) {
l.L.Debug(fmtBadgerLog(m, f...))
}

View file

@ -8,20 +8,33 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types"
"github.com/royalcat/kv"
)
type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error
}
type FileProperties struct {
Excluded bool `json:"excluded"`
Priority types.PiecePriority `json:"priority"`
}
type Controller struct {
torrentFilePath string
t *torrent.Torrent
rep *filesMappingsStore
storage TorrentFileDeleter
fileProperties kv.Store[string, FileProperties]
log *rlog.Logger
}
func newController(t *torrent.Torrent, rep *filesMappingsStore) *Controller {
func newController(t *torrent.Torrent, fileProperties kv.Store[string, FileProperties], storage TorrentFileDeleter) *Controller {
return &Controller{
t: t,
rep: rep,
log: rlog.Component("torrent/controller").With(slog.String("infohash", t.InfoHash().HexString())),
t: t,
storage: storage,
fileProperties: fileProperties,
log: rlog.Component("torrent-client", "controller").With(slog.String("infohash", t.InfoHash().HexString())),
}
}
@ -63,7 +76,11 @@ func (s *Controller) Length() int64 {
}
func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
fileMappings, err := s.rep.FileMappings(ctx, s.t.InfoHash())
fps := map[string]FileProperties{}
err := s.fileProperties.Range(ctx, func(k string, v FileProperties) error {
fps[k] = v
return nil
})
if err != nil {
return nil, err
}
@ -84,9 +101,10 @@ func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
if strings.Contains(p, "/.pad/") {
return true
}
if target, ok := fileMappings[p]; ok && target == "" {
if props, ok := fps[p]; ok && props.Excluded {
return true
}
return false
})
@ -102,7 +120,13 @@ func Map[T, U any](ts []T, f func(T) U) []U {
}
func (s *Controller) ExcludeFile(ctx context.Context, f *torrent.File) error {
return s.rep.ExcludeFile(ctx, f)
log := s.log.With(slog.String("file", f.Path()))
log.Info(ctx, "excluding file")
return s.fileProperties.Edit(ctx, f.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
v.Excluded = true
return v, nil
})
}
func (s *Controller) isFileComplete(startIndex int, endIndex int) bool {
@ -132,21 +156,46 @@ func (s *Controller) ValidateTorrent(ctx context.Context) error {
return nil
}
func (c *Controller) SetFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error {
log := c.log.With(slog.String("file", file.Path()), slog.Int("priority", int(priority)))
log.Info(ctx, "set pritority for file")
err := c.fileProperties.Edit(ctx, file.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
v.Priority = priority
return v, nil
})
if err != nil {
return err
}
file.SetPriority(priority)
return nil
}
func (c *Controller) initializeTorrentPriories(ctx context.Context) error {
log := c.log.WithComponent("initializeTorrentPriories")
// files, err := c.Files(ctx)
// if err != nil {
// return err
// }
files, err := c.Files(ctx)
if err != nil {
return err
}
// for _, file := range files {
// if file == nil {
// continue
// }
for _, file := range files {
if file == nil {
continue
}
// file.SetPriority(torrent.PiecePriorityNormal)
// }
props, err := c.fileProperties.Get(ctx, file.Path())
if err != nil {
if err == kv.ErrKeyNotFound {
continue
}
log.Error(ctx, "failed to get file priority", rlog.Error(err))
}
file.SetPriority(props.Priority)
}
log.Info(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name()))

View file

@ -15,6 +15,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/tkv"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel"
@ -39,12 +40,12 @@ type DirAquire struct {
}
type Daemon struct {
client *torrent.Client
excludedFiles *filesMappingsStore
infoBytes *infoBytesStore
Storage *fileStorage
fis *fileItemStore
dirsAquire kv.Store[string, DirAquire]
client *torrent.Client
infoBytes *infoBytesStore
Storage *fileStorage
fis *fileItemStore
dirsAquire kv.Store[string, DirAquire]
fileProperties kv.Store[string, FileProperties]
loadMutex sync.Mutex
torrentLoaded chan struct{}
@ -77,12 +78,13 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return nil, err
}
s.excludedFiles, err = newFileMappingsStore(conf.MetadataFolder, s.Storage)
s.fileProperties, err = tkv.NewKV[string, FileProperties](conf.MetadataFolder, "file-properties")
if err != nil {
return nil, err
}
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
if err != nil {
return nil, err
}
@ -98,7 +100,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
}
s.client.AddDhtNodes(conf.DHTNodes)
s.dirsAquire, err = NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire")
s.dirsAquire, err = tkv.NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire")
if err != nil {
return nil, err
}
@ -122,7 +124,7 @@ func (s *Daemon) Close(ctx context.Context) error {
s.client.Close(),
s.Storage.Close(),
s.dirsAquire.Close(ctx),
s.excludedFiles.Close(ctx),
// s.excludedFiles.Close(ctx),
s.infoBytes.Close(),
s.fis.Close(),
)...)
@ -214,7 +216,7 @@ func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, erro
// }
}
ctl := newController(t, s.excludedFiles)
ctl := s.newController(t)
err = ctl.initializeTorrentPriories(ctx)
if err != nil {
@ -306,12 +308,23 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
})
}
func storeByTorrent[K kv.Bytes, V any](s kv.Store[K, V], infohash infohash.T) kv.Store[K, V] {
return kv.PrefixBytes[K, V](s, K(infohash.HexString()+"/"))
}
func (s *Daemon) newController(t *torrent.Torrent) *Controller {
return newController(t,
storeByTorrent(s.fileProperties, t.InfoHash()),
s.Storage,
)
}
func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) {
<-s.torrentLoaded
out := []*Controller{}
for _, v := range s.client.Torrents() {
out = append(out, newController(v, s.excludedFiles))
out = append(out, s.newController(v))
}
return out, nil
}
@ -324,7 +337,7 @@ func (s *Daemon) GetTorrent(infohashHex string) (*Controller, error) {
return nil, nil
}
return newController(t, s.excludedFiles), nil
return s.newController(t), nil
}
func slicesUnique[S ~[]E, E comparable](in S) S {

View file

@ -1,60 +0,0 @@
package torrent
import (
"context"
"path/filepath"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
)
func newFileMappingsStore(metaDir string, storage TorrentFileDeleter) (*filesMappingsStore, error) {
opts := kvbadger.DefaultOptions(filepath.Join(metaDir, "file-mappings"))
str, err := kvbadger.NewBadgerKVBytes[string, string](opts)
if err != nil {
return nil, err
}
r := &filesMappingsStore{
mappings: str,
storage: storage,
}
return r, nil
}
type filesMappingsStore struct {
mappings kv.Store[string, string]
storage TorrentFileDeleter
}
type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error
}
func fileKey(file *torrent.File) string {
return file.Torrent().InfoHash().HexString() + "/" + file.Path()
}
func (r *filesMappingsStore) MapFile(ctx context.Context, file *torrent.File, target string) error {
return r.mappings.Set(ctx, fileKey(file), target)
}
func (r *filesMappingsStore) ExcludeFile(ctx context.Context, file *torrent.File) error {
return r.mappings.Set(ctx, fileKey(file), "")
}
func (r *filesMappingsStore) FileMappings(ctx context.Context, ih infohash.T) (map[string]string, error) {
out := map[string]string{}
err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) error {
out[k] = v
return nil
})
return out, err
}
func (r *filesMappingsStore) Close(ctx context.Context) error {
return r.mappings.Close(ctx)
}

View file

@ -3,7 +3,6 @@ package torrent
import (
"bytes"
"encoding/gob"
"log/slog"
"time"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
@ -19,10 +18,8 @@ type fileItemStore struct {
}
func newFileItemStore(path string, itemsTTL time.Duration) (*fileItemStore, error) {
l := slog.With("component", "item-store")
opts := badger.DefaultOptions(path).
WithLogger(&dlog.Badger{L: l}).
WithLogger(dlog.BadgerLogger("torrent-client", "item-store")).
WithValueLogFileSize(1<<26 - 1)
db, err := badger.Open(opts)

View file

@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"log/slog"
"path/filepath"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
@ -20,11 +19,9 @@ type infoBytesStore struct {
}
func newInfoBytesStore(metaDir string) (*infoBytesStore, error) {
l := slog.With("component", "badger", "db", "info-bytes")
opts := badger.
DefaultOptions(filepath.Join(metaDir, "infobytes")).
WithLogger(&dlog.Badger{L: l})
WithLogger(dlog.BadgerLogger("torrent-client", "infobytes"))
db, err := badger.Open(opts)
if err != nil {
return nil, err

View file

@ -1,22 +0,0 @@
package torrent
import (
"path"
"git.kmsign.ru/royalcat/tstor/pkg/kvtrace"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
"go.opentelemetry.io/otel/attribute"
)
func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
opts := kvbadger.DefaultOptions(path.Join(dbdir, name))
store, err = kvbadger.NewBadgerKVBytesKey[K, V](opts)
if err != nil {
return nil, err
}
store = kvtrace.WrapTracing(store, attribute.String("collection", name), attribute.String("database", "badger"))
return store, err
}

View file

@ -33,11 +33,9 @@ type badgerPieceCompletion struct {
var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil)
func newPieceCompletion(dir string) (storage.PieceCompletion, error) {
l := slog.With("component", "badger", "db", "piece-completion")
opts := badger.
DefaultOptions(dir).
WithLogger(&dlog.Badger{L: l})
WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion"))
db, err := badger.Open(opts)
if err != nil {
return nil, err

View file

@ -6,6 +6,7 @@ import (
"path"
"time"
"git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
"golang.org/x/exp/maps"
@ -15,7 +16,8 @@ func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error)
db, err := badger.OpenManaged(
badger.
DefaultOptions(path.Join(metaDir, "stats-history")).
WithNumVersionsToKeep(int(^uint(0) >> 1)), // Infinity
WithNumVersionsToKeep(int(^uint(0) >> 1)).
WithLogger(log.BadgerLogger("stats")), // Infinity
)
if err != nil {
return nil, err

View file

@ -16,6 +16,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
)
// OpenTorrent implements storage.ClientImplCloser.
@ -92,11 +93,8 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info
name := info.BestName()
aq, found, err := s.dirsAquire.Get(ctx, info.BestName())
if err != nil {
return false, false, err
}
if !found {
aq, err := s.dirsAquire.Get(ctx, info.BestName())
if errors.Is(err, kv.ErrKeyNotFound) {
err = s.dirsAquire.Set(ctx, name, DirAquire{
Name: name,
Hashes: slices.Compact([]infohash.T{ih}),
@ -107,6 +105,8 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info
log.Debug(ctx, "acquiring was not found, so created")
return true, false, nil
} else if err != nil {
return false, false, err
}
if slices.Contains(aq.Hashes, ih) {
@ -136,7 +136,6 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info
}
}
if slices.Contains(aq.Hashes, ih) {
log.Debug(ctx, "hash is compatable")
return true, false, nil

View file

@ -11,6 +11,7 @@ import (
"git.kmsign.ru/royalcat/tstor/src/tasks"
"github.com/royalcat/ctxio"
"github.com/royalcat/ctxprogress"
"github.com/royalcat/kv"
)
type Controller struct {
@ -86,13 +87,13 @@ func (c *Controller) Update(ctx context.Context, updater tasks.Updater) error {
}
func (c *Controller) Info(ctx context.Context) (ytdlp.Info, error) {
info, found, err := c.cachedinfo.Get(ctx)
if err != nil {
return info, err
}
if found {
info, err := c.cachedinfo.Get(ctx)
if err == nil {
return info, nil
}
if err != kv.ErrKeyNotFound {
return info, err
}
info, err = c.Info(ctx)
if err != nil {