Compare commits

...

2 commits

Author SHA1 Message Date
bc4b39b1c1 new kv, ctx in nfs handler
Some checks failed
docker / build-docker (linux/amd64) (push) Successful in 2m36s
docker / build-docker (linux/386) (push) Successful in 2m45s
docker / build-docker (linux/arm/v7) (push) Has been cancelled
docker / build-docker (linux/arm64/v8) (push) Has been cancelled
docker / build-docker (linux/arm64) (push) Has been cancelled
2024-06-17 00:34:46 +03:00
609d69fb5a tkv 2024-06-17 00:23:29 +03:00
41 changed files with 273 additions and 201 deletions

4
go.mod
View file

@ -35,8 +35,8 @@ require (
github.com/ravilushqa/otelgqlgen v0.15.0
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff
github.com/royalcat/kv v0.0.0-20240612224509-6aa0da315950
github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950
github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6
github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6
github.com/rs/zerolog v1.32.0
github.com/samber/slog-multi v1.0.2
github.com/samber/slog-zerolog v1.0.0

8
go.sum
View file

@ -541,14 +541,14 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkLUsgvVdYO/UOJkzDyPzESfrTqWfM=
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI=
github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3 h1:1Ow/NUAWFZLghFcdNuyHt5Avb+bEI11qG8ELr9/XmQQ=
github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
github.com/royalcat/kv v0.0.0-20240612224509-6aa0da315950 h1:zHYwRhGWVkGQnjmStcnxTQ95Mtk5DL6w1PmdIn63EpI=
github.com/royalcat/kv v0.0.0-20240612224509-6aa0da315950/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6 h1:rGXhPFpOVLeOO/Da2qxBNZY5yaQdTCGxQV2dUDXXf7U=
github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950 h1:rKG2P4TNLgA4/Jl7LPayifjcw4txVGVSPkpHVhn3wnw=
github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6 h1:/TWa41uAL8Vk0MkvZc03EjA1/bS2otK5q0/+6bSWKJI=
github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8=
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=

View file

@ -117,9 +117,16 @@ func (c *conn) serializeWrites(ctx context.Context) {
// Handle a request. errors from this method indicate a failure to read or
// write on the network stream, and trigger a disconnection of the connection.
func (c *conn) handle(ctx context.Context, w *response) error {
func (c *conn) handle(ctx context.Context, w *response) (err error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("nfs.handle.%s", NFSProcedure(w.req.Header.Proc).String()))
defer span.End()
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
handler := c.Server.handlerFor(w.req.Header.Prog, w.req.Header.Proc)
if handler == nil {
@ -146,7 +153,6 @@ func (c *conn) handle(ctx context.Context, w *response) error {
}
}
span.SetStatus(codes.Ok, "")
return nil
}

View file

@ -25,9 +25,9 @@ type Handler interface {
// represent file objects as opaque references
// Can be safely implemented via helpers/cachinghandler.
ToHandle(fs Filesystem, path []string) []byte
FromHandle(fh []byte) (Filesystem, []string, error)
InvalidateHandle(Filesystem, []byte) error
ToHandle(ctx context.Context, fs Filesystem, path []string) []byte
FromHandle(ctx context.Context, fh []byte) (Filesystem, []string, error)
InvalidateHandle(context.Context, Filesystem, []byte) error
// How many handles can be safely maintained by the handler.
HandleLimit() int

View file

@ -1,6 +1,7 @@
package helpers
import (
"context"
"crypto/sha256"
"encoding/binary"
"io/fs"
@ -51,7 +52,7 @@ type entry struct {
// ToHandle takes a file and represents it with an opaque handle to reference it.
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
// but we can generalize with a stateful local cache of handed out IDs.
func (c *CachingHandler) ToHandle(f nfs.Filesystem, path []string) []byte {
func (c *CachingHandler) ToHandle(ctx context.Context, f nfs.Filesystem, path []string) []byte {
joinedPath := f.Join(path...)
if handle := c.searchReverseCache(f, joinedPath); handle != nil {
@ -79,7 +80,7 @@ func (c *CachingHandler) ToHandle(f nfs.Filesystem, path []string) []byte {
}
// FromHandle converts from an opaque handle to the file it represents
func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error) {
func (c *CachingHandler) FromHandle(ctx context.Context, fh []byte) (nfs.Filesystem, []string, error) {
id, err := uuid.FromBytes(fh)
if err != nil {
return nil, []string{}, err
@ -134,7 +135,7 @@ func (c *CachingHandler) evictReverseCache(path string, handle uuid.UUID) {
}
}
func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) error {
func (c *CachingHandler) InvalidateHandle(ctx context.Context, fs nfs.Filesystem, handle []byte) error {
//Remove from cache
id, _ := uuid.FromBytes(handle)
entry, ok := c.activeHandles.Get(id)

View file

@ -35,21 +35,21 @@ func (h *NullAuthHandler) Change(fs nfs.Filesystem) nfs.Change {
}
// FSStat provides information about a filesystem.
func (h *NullAuthHandler) FSStat(ctx context.Context, f nfs.Filesystem, s *nfs.FSStat) error {
func (h *NullAuthHandler) FSStat(context.Context, nfs.Filesystem, *nfs.FSStat) error {
return nil
}
// ToHandle handled by CachingHandler
func (h *NullAuthHandler) ToHandle(f nfs.Filesystem, s []string) []byte {
func (h *NullAuthHandler) ToHandle(context.Context, nfs.Filesystem, []string) []byte {
return []byte{}
}
// FromHandle handled by CachingHandler
func (h *NullAuthHandler) FromHandle([]byte) (nfs.Filesystem, []string, error) {
func (h *NullAuthHandler) FromHandle(context.Context, []byte) (nfs.Filesystem, []string, error) {
return nil, []string{}, nil
}
func (c *NullAuthHandler) InvalidateHandle(nfs.Filesystem, []byte) error {
func (c *NullAuthHandler) InvalidateHandle(context.Context, nfs.Filesystem, []byte) error {
return nil
}

View file

@ -39,7 +39,7 @@ func onMount(ctx context.Context, w *response, userHandle Handler) error {
return err
}
rootHndl := userHandle.ToHandle(handle, []string{})
rootHndl := userHandle.ToHandle(ctx, handle, []string{})
if status == MountStatusOk {
_ = xdr.Write(writer, rootHndl)

View file

@ -14,7 +14,7 @@ func onAccess(ctx context.Context, w *response, userHandle Handler) error {
if err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(roothandle)
fs, path, err := userHandle.FromHandle(ctx, roothandle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -18,7 +18,7 @@ func onCommit(ctx context.Context, w *response, userHandle Handler) error {
}
// The conn will drain the unread offset and count arguments.
fs, path, err := userHandle.FromHandle(handle)
fs, path, err := userHandle.FromHandle(ctx, handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -47,7 +47,7 @@ func onCreate(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusNotSupp, os.ErrInvalid}
}
fs, path, err := userHandle.FromHandle(obj.Handle)
fs, path, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -87,7 +87,7 @@ func onCreate(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusAccess, err}
}
fp := userHandle.ToHandle(fs, newFile)
fp := userHandle.ToHandle(ctx, fs, newFile)
changer := userHandle.Change(fs)
if err := attrs.Apply(ctx, changer, fs, newFilePath); err != nil {
Log.Errorf("Error applying attributes: %v\n", err)

View file

@ -24,7 +24,7 @@ func onFSInfo(ctx context.Context, w *response, userHandle Handler) error {
if err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(roothandle)
fs, path, err := userHandle.FromHandle(ctx, roothandle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -13,7 +13,7 @@ func onFSStat(ctx context.Context, w *response, userHandle Handler) error {
if err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(roothandle)
fs, path, err := userHandle.FromHandle(ctx, roothandle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -15,7 +15,7 @@ func onGetAttr(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(handle)
fs, path, err := userHandle.FromHandle(ctx, handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -27,7 +27,7 @@ func onLink(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(obj.Handle)
fs, path, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -49,7 +49,7 @@ func onLink(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusNotDir, nil}
}
fp := userHandle.ToHandle(fs, append(path, string(obj.Filename)))
fp := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename)))
changer := userHandle.Change(fs)
if changer == nil {
return &NFSStatusError{NFSStatusAccess, err}

View file

@ -33,7 +33,7 @@ func onLookup(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusInval, err}
}
fs, p, err := userHandle.FromHandle(obj.Handle)
fs, p, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -58,7 +58,7 @@ func onLookup(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusAccess, os.ErrPermission}
}
pPath := p[0 : len(p)-1]
pHandle := userHandle.ToHandle(fs, pPath)
pHandle := userHandle.ToHandle(ctx, fs, pPath)
resp, err := lookupSuccessResponse(ctx, pHandle, pPath, p, fs)
if err != nil {
return &NFSStatusError{NFSStatusServerFault, err}
@ -74,7 +74,7 @@ func onLookup(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusNoEnt, os.ErrNotExist}
}
newHandle := userHandle.ToHandle(fs, reqPath)
newHandle := userHandle.ToHandle(ctx, fs, reqPath)
resp, err := lookupSuccessResponse(ctx, newHandle, reqPath, p, fs)
if err != nil {
return &NFSStatusError{NFSStatusServerFault, err}

View file

@ -26,7 +26,7 @@ func onMkdir(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(obj.Handle)
fs, path, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -59,7 +59,7 @@ func onMkdir(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusAccess, err}
}
fp := userHandle.ToHandle(fs, newFolder)
fp := userHandle.ToHandle(ctx, fs, newFolder)
changer := userHandle.Change(fs)
if changer != nil {
if err := attrs.Apply(ctx, changer, fs, newFolderPath); err != nil {

View file

@ -37,7 +37,7 @@ func onMknod(ctx context.Context, w *response, userHandle Handler) error {
}
// see if the filesystem supports mknod
fs, path, err := userHandle.FromHandle(obj.Handle)
fs, path, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -69,7 +69,7 @@ func onMknod(ctx context.Context, w *response, userHandle Handler) error {
} else if !parent.IsDir() {
return &NFSStatusError{NFSStatusNotDir, nil}
}
fp := userHandle.ToHandle(fs, append(path, string(obj.Filename)))
fp := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename)))
switch nfs_ftype(ftype) {
case FTYPE_NF3CHR:

View file

@ -15,7 +15,7 @@ func onPathConf(ctx context.Context, w *response, userHandle Handler) error {
if err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(roothandle)
fs, path, err := userHandle.FromHandle(ctx, roothandle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -37,7 +37,7 @@ func onRead(ctx context.Context, w *response, userHandle Handler) error {
if err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(obj.Handle)
fs, path, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -41,7 +41,7 @@ func onReadDir(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusTooSmall, io.ErrShortBuffer}
}
fs, p, err := userHandle.FromHandle(obj.Handle)
fs, p, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -140,7 +140,7 @@ func onReadDir(ctx context.Context, w *response, userHandle Handler) error {
func getDirListingWithVerifier(ctx context.Context, userHandle Handler, fsHandle []byte, verifier uint64) ([]fs.FileInfo, uint64, error) {
// figure out what directory it is.
fs, p, err := userHandle.FromHandle(fsHandle)
fs, p, err := userHandle.FromHandle(ctx, fsHandle)
if err != nil {
return nil, 0, &NFSStatusError{NFSStatusStale, err}
}

View file

@ -46,7 +46,7 @@ func onReadDirPlus(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusTooSmall, nil}
}
fs, p, err := userHandle.FromHandle(obj.Handle)
fs, p, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -102,7 +102,7 @@ func onReadDirPlus(ctx context.Context, w *response, userHandle Handler) error {
}
filePath := joinPath(p, c.Name())
handle := userHandle.ToHandle(fs, filePath)
handle := userHandle.ToHandle(ctx, fs, filePath)
attrs := ToFileAttribute(c, path.Join(filePath...))
entities = append(entities, readDirPlusEntity{
FileID: attrs.Fileid,

View file

@ -15,7 +15,7 @@ func onReadLink(ctx context.Context, w *response, userHandle Handler) error {
if err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(handle)
fs, path, err := userHandle.FromHandle(ctx, handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -15,7 +15,7 @@ func onRemove(ctx context.Context, w *response, userHandle Handler) error {
if err := xdr.Read(w.req.Body, &obj); err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(obj.Handle)
fs, path, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -49,7 +49,7 @@ func onRemove(ctx context.Context, w *response, userHandle Handler) error {
preCacheData := ToFileAttribute(dirInfo, fullPath).AsCache()
toDelete := fs.Join(append(path, string(obj.Filename))...)
toDeleteHandle := userHandle.ToHandle(fs, append(path, string(obj.Filename)))
toDeleteHandle := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename)))
err = fs.Remove(ctx, toDelete)
if err != nil {
@ -65,7 +65,7 @@ func onRemove(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusIO, err}
}
if err := userHandle.InvalidateHandle(fs, toDeleteHandle); err != nil {
if err := userHandle.InvalidateHandle(ctx, fs, toDeleteHandle); err != nil {
return &NFSStatusError{NFSStatusServerFault, err}
}

View file

@ -20,7 +20,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error {
if err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs, fromPath, err := userHandle.FromHandle(from.Handle)
fs, fromPath, err := userHandle.FromHandle(ctx, from.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -29,7 +29,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error {
if err = xdr.Read(w.req.Body, &to); err != nil {
return &NFSStatusError{NFSStatusInval, err}
}
fs2, toPath, err := userHandle.FromHandle(to.Handle)
fs2, toPath, err := userHandle.FromHandle(ctx, to.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -78,7 +78,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error {
}
preDestData := ToFileAttribute(toDirInfo, toDirPath).AsCache()
oldHandle := userHandle.ToHandle(fs, append(fromPath, string(from.Filename)))
oldHandle := userHandle.ToHandle(ctx, fs, append(fromPath, string(from.Filename)))
fromLoc := fs.Join(append(fromPath, string(from.Filename))...)
toLoc := fs.Join(append(toPath, string(to.Filename))...)
@ -97,7 +97,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusIO, err}
}
if err := userHandle.InvalidateHandle(fs, oldHandle); err != nil {
if err := userHandle.InvalidateHandle(ctx, fs, oldHandle); err != nil {
return &NFSStatusError{NFSStatusServerFault, err}
}

View file

@ -17,7 +17,7 @@ func onSetAttr(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(handle)
fs, path, err := userHandle.FromHandle(ctx, handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -26,7 +26,7 @@ func onSymlink(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(obj.Handle)
fs, path, err := userHandle.FromHandle(ctx, obj.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}
@ -53,7 +53,7 @@ func onSymlink(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusAccess, err}
}
fp := userHandle.ToHandle(fs, append(path, string(obj.Filename)))
fp := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename)))
changer := userHandle.Change(fs)
if changer != nil {
if err := attrs.Apply(ctx, changer, fs, newFilePath); err != nil {

View file

@ -35,7 +35,7 @@ func onWrite(ctx context.Context, w *response, userHandle Handler) error {
return &NFSStatusError{NFSStatusInval, err}
}
fs, path, err := userHandle.FromHandle(req.Handle)
fs, path, err := userHandle.FromHandle(ctx, req.Handle)
if err != nil {
return &NFSStatusError{NFSStatusStale, err}
}

View file

@ -15,7 +15,7 @@ func New[K, V any](db kv.Store[K, V], key K) *Value[K, V] {
return &Value[K, V]{Key: key, db: db}
}
func (s *Value[K, V]) Get(ctx context.Context) (V, bool, error) {
func (s *Value[K, V]) Get(ctx context.Context) (V, error) {
return s.db.Get(ctx, s.Key)
}

View file

@ -6,85 +6,145 @@ import (
"github.com/royalcat/kv"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
var tracer = otel.Tracer("github.com/royalcat/kv/tracer")
type traceSrtore[K, V any] struct {
type traceStore[K, V any] struct {
kv kv.Store[K, V]
attrs []attribute.KeyValue
}
func WrapTracing[K, V any](kv kv.Store[K, V], attrs ...attribute.KeyValue) kv.Store[K, V] {
return &traceSrtore[K, V]{
return &traceStore[K, V]{
kv: kv,
attrs: attrs,
}
}
// Close implements kv.Store.
func (m *traceSrtore[K, V]) Close(ctx context.Context) error {
func (m *traceStore[K, V]) Close(ctx context.Context) (err error) {
ctx, span := tracer.Start(ctx, "Close", trace.WithAttributes(m.attrs...))
defer span.End()
defer func() {
if err != nil && err != kv.ErrKeyNotFound {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
return m.kv.Close(ctx)
}
// Delete implements kv.Store.
func (m *traceSrtore[K, V]) Delete(ctx context.Context, k K) error {
func (m *traceStore[K, V]) Delete(ctx context.Context, k K) (err error) {
ctx, span := tracer.Start(ctx, "Delete", trace.WithAttributes(m.attrs...))
defer span.End()
defer func() {
if err != nil && err != kv.ErrKeyNotFound {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
return m.kv.Delete(ctx, k)
}
// Get implements kv.Store.
func (m *traceSrtore[K, V]) Get(ctx context.Context, k K) (v V, found bool, err error) {
func (m *traceStore[K, V]) Get(ctx context.Context, k K) (v V, err error) {
ctx, span := tracer.Start(ctx, "Get", trace.WithAttributes(m.attrs...))
defer span.End()
defer func() {
if err != nil && err != kv.ErrKeyNotFound {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
return m.kv.Get(ctx, k)
}
// Get implements kv.Store.
func (m *traceStore[K, V]) Edit(ctx context.Context, k K, edit kv.Edit[V]) (err error) {
ctx, span := tracer.Start(ctx, "Get", trace.WithAttributes(m.attrs...))
defer span.End()
defer func() {
if err != nil && err != kv.ErrKeyNotFound {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
return m.kv.Edit(ctx, k, edit)
}
// Range implements kv.Store.
func (m *traceSrtore[K, V]) Range(ctx context.Context, iter kv.Iter[K, V]) error {
func (m *traceStore[K, V]) Range(ctx context.Context, iter kv.Iter[K, V]) (err error) {
ctx, span := tracer.Start(ctx, "Range", trace.WithAttributes(m.attrs...))
defer span.End()
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
count := 0
iterCount := func(k K, v V) error {
count++
return iter(k, v)
}
defer func() {
span.SetAttributes(attribute.Int("count", count))
}()
err := m.kv.Range(ctx, iterCount)
span.SetAttributes(attribute.Int("count", count))
return err
return m.kv.Range(ctx, iterCount)
}
// RangeWithPrefix implements kv.Store.
func (m *traceSrtore[K, V]) RangeWithPrefix(ctx context.Context, k K, iter kv.Iter[K, V]) error {
func (m *traceStore[K, V]) RangeWithPrefix(ctx context.Context, k K, iter kv.Iter[K, V]) (err error) {
ctx, span := tracer.Start(ctx, "RangeWithPrefix", trace.WithAttributes(m.attrs...))
defer span.End()
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
count := 0
iterCount := func(k K, v V) error {
count++
return iter(k, v)
}
defer func() {
span.SetAttributes(attribute.Int("count", count))
}()
err := m.kv.Range(ctx, iterCount)
span.SetAttributes(attribute.Int("count", count))
return err
return m.kv.Range(ctx, iterCount)
}
// Set implements kv.Store.
func (m *traceSrtore[K, V]) Set(ctx context.Context, k K, v V) error {
func (m *traceStore[K, V]) Set(ctx context.Context, k K, v V) (err error) {
ctx, span := tracer.Start(ctx, "Set", trace.WithAttributes(m.attrs...))
defer span.End()
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}()
return m.kv.Set(ctx, k, v)
}
var _ kv.Store[any, any] = (*traceSrtore[any, any])(nil)
var _ kv.Store[any, any] = (*traceStore[any, any])(nil)

View file

@ -2,6 +2,7 @@ package nfs
import (
"context"
"errors"
"fmt"
"path"
"strings"
@ -10,6 +11,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
@ -50,6 +52,7 @@ var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvh
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) {
opts := kvbadger.DefaultOptions(path.Join(config.CachePath, "handlers"))
opts.DefaultTTL = time.Hour
opts.BadgerOptions.Logger = log.BadgerLogger("nfs", "kvhandler")
activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts)
if err != nil {
@ -98,8 +101,7 @@ type CachingHandler struct {
// ToHandle takes a file and represents it with an opaque handle to reference it.
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
// but we can generalize with a stateful local cache of handed out IDs.
func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte {
ctx := context.Background()
func (c *CachingHandler) ToHandle(ctx context.Context, _ nfs.Filesystem, path []string) []byte {
var id uuid.UUID
cacheKey := handle(path).String()
@ -123,31 +125,29 @@ func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte {
}
// FromHandle converts from an opaque handle to the file it represents
func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error) {
func (c *CachingHandler) FromHandle(ctx context.Context, fh []byte) (nfs.Filesystem, []string, error) {
c.mu.Lock()
defer c.mu.Unlock()
ctx := context.Background()
id, err := uuid.FromBytes(fh)
if err != nil {
return nil, nil, err
}
paths, found, err := c.activeHandles.Get(ctx, id)
paths, err := c.activeHandles.Get(ctx, id)
if err != nil {
if errors.Is(err, kv.ErrKeyNotFound) {
return nil, nil, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
return nil, nil, fmt.Errorf("kv error: %w", err)
}
if found {
return c.fs, paths, nil
}
return c.fs, paths, nil
return nil, nil, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) error {
ctx := context.Background()
func (c *CachingHandler) InvalidateHandle(ctx context.Context, fs nfs.Filesystem, handle []byte) error {
//Remove from cache
id, err := uuid.FromBytes(handle)
if err != nil {

View file

@ -5,12 +5,19 @@ import (
"log/slog"
"strings"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/dgraph-io/badger/v4"
)
var _ badger.Logger = (*Badger)(nil)
func BadgerLogger(name ...string) badger.Logger {
return &badgerLogger{
L: rlog.Component(append(name, "badger")...).Slog(),
}
}
type Badger struct {
var _ badger.Logger = (*badgerLogger)(nil)
type badgerLogger struct {
L *slog.Logger
}
@ -18,18 +25,18 @@ func fmtBadgerLog(m string, f ...any) string {
return fmt.Sprintf(strings.ReplaceAll(m, "\n", ""), f...)
}
func (l *Badger) Errorf(m string, f ...interface{}) {
func (l *badgerLogger) Errorf(m string, f ...interface{}) {
l.L.Error(fmtBadgerLog(m, f...))
}
func (l *Badger) Warningf(m string, f ...interface{}) {
func (l *badgerLogger) Warningf(m string, f ...interface{}) {
l.L.Warn(fmtBadgerLog(m, f...))
}
func (l *Badger) Infof(m string, f ...interface{}) {
func (l *badgerLogger) Infof(m string, f ...interface{}) {
l.L.Info(fmtBadgerLog(m, f...))
}
func (l *Badger) Debugf(m string, f ...interface{}) {
func (l *badgerLogger) Debugf(m string, f ...interface{}) {
l.L.Debug(fmtBadgerLog(m, f...))
}

View file

@ -8,20 +8,33 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types"
"github.com/royalcat/kv"
)
type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error
}
type FileProperties struct {
Excluded bool `json:"excluded"`
Priority types.PiecePriority `json:"priority"`
}
type Controller struct {
torrentFilePath string
t *torrent.Torrent
rep *filesMappingsStore
storage TorrentFileDeleter
fileProperties kv.Store[string, FileProperties]
log *rlog.Logger
}
func newController(t *torrent.Torrent, rep *filesMappingsStore) *Controller {
func newController(t *torrent.Torrent, fileProperties kv.Store[string, FileProperties], storage TorrentFileDeleter) *Controller {
return &Controller{
t: t,
rep: rep,
log: rlog.Component("torrent/controller").With(slog.String("infohash", t.InfoHash().HexString())),
t: t,
storage: storage,
fileProperties: fileProperties,
log: rlog.Component("torrent-client", "controller").With(slog.String("infohash", t.InfoHash().HexString())),
}
}
@ -63,7 +76,11 @@ func (s *Controller) Length() int64 {
}
func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
fileMappings, err := s.rep.FileMappings(ctx, s.t.InfoHash())
fps := map[string]FileProperties{}
err := s.fileProperties.Range(ctx, func(k string, v FileProperties) error {
fps[k] = v
return nil
})
if err != nil {
return nil, err
}
@ -84,9 +101,10 @@ func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
if strings.Contains(p, "/.pad/") {
return true
}
if target, ok := fileMappings[p]; ok && target == "" {
if props, ok := fps[p]; ok && props.Excluded {
return true
}
return false
})
@ -102,7 +120,13 @@ func Map[T, U any](ts []T, f func(T) U) []U {
}
func (s *Controller) ExcludeFile(ctx context.Context, f *torrent.File) error {
return s.rep.ExcludeFile(ctx, f)
log := s.log.With(slog.String("file", f.Path()))
log.Info(ctx, "excluding file")
return s.fileProperties.Edit(ctx, f.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
v.Excluded = true
return v, nil
})
}
func (s *Controller) isFileComplete(startIndex int, endIndex int) bool {
@ -132,21 +156,46 @@ func (s *Controller) ValidateTorrent(ctx context.Context) error {
return nil
}
func (c *Controller) SetFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error {
log := c.log.With(slog.String("file", file.Path()), slog.Int("priority", int(priority)))
log.Info(ctx, "set pritority for file")
err := c.fileProperties.Edit(ctx, file.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
v.Priority = priority
return v, nil
})
if err != nil {
return err
}
file.SetPriority(priority)
return nil
}
func (c *Controller) initializeTorrentPriories(ctx context.Context) error {
log := c.log.WithComponent("initializeTorrentPriories")
// files, err := c.Files(ctx)
// if err != nil {
// return err
// }
files, err := c.Files(ctx)
if err != nil {
return err
}
// for _, file := range files {
// if file == nil {
// continue
// }
for _, file := range files {
if file == nil {
continue
}
// file.SetPriority(torrent.PiecePriorityNormal)
// }
props, err := c.fileProperties.Get(ctx, file.Path())
if err != nil {
if err == kv.ErrKeyNotFound {
continue
}
log.Error(ctx, "failed to get file priority", rlog.Error(err))
}
file.SetPriority(props.Priority)
}
log.Info(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name()))

View file

@ -15,6 +15,7 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/tkv"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel"
@ -39,12 +40,12 @@ type DirAquire struct {
}
type Daemon struct {
client *torrent.Client
excludedFiles *filesMappingsStore
infoBytes *infoBytesStore
Storage *fileStorage
fis *fileItemStore
dirsAquire kv.Store[string, DirAquire]
client *torrent.Client
infoBytes *infoBytesStore
Storage *fileStorage
fis *fileItemStore
dirsAquire kv.Store[string, DirAquire]
fileProperties kv.Store[string, FileProperties]
loadMutex sync.Mutex
torrentLoaded chan struct{}
@ -77,12 +78,13 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
return nil, err
}
s.excludedFiles, err = newFileMappingsStore(conf.MetadataFolder, s.Storage)
s.fileProperties, err = tkv.NewKV[string, FileProperties](conf.MetadataFolder, "file-properties")
if err != nil {
return nil, err
}
s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder)
if err != nil {
return nil, err
}
@ -98,7 +100,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon,
}
s.client.AddDhtNodes(conf.DHTNodes)
s.dirsAquire, err = NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire")
s.dirsAquire, err = tkv.NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire")
if err != nil {
return nil, err
}
@ -122,7 +124,7 @@ func (s *Daemon) Close(ctx context.Context) error {
s.client.Close(),
s.Storage.Close(),
s.dirsAquire.Close(ctx),
s.excludedFiles.Close(ctx),
// s.excludedFiles.Close(ctx),
s.infoBytes.Close(),
s.fis.Close(),
)...)
@ -214,7 +216,7 @@ func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, erro
// }
}
ctl := newController(t, s.excludedFiles)
ctl := s.newController(t)
err = ctl.initializeTorrentPriories(ctx)
if err != nil {
@ -306,12 +308,23 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
})
}
func storeByTorrent[K kv.Bytes, V any](s kv.Store[K, V], infohash infohash.T) kv.Store[K, V] {
return kv.PrefixBytes[K, V](s, K(infohash.HexString()+"/"))
}
func (s *Daemon) newController(t *torrent.Torrent) *Controller {
return newController(t,
storeByTorrent(s.fileProperties, t.InfoHash()),
s.Storage,
)
}
func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) {
<-s.torrentLoaded
out := []*Controller{}
for _, v := range s.client.Torrents() {
out = append(out, newController(v, s.excludedFiles))
out = append(out, s.newController(v))
}
return out, nil
}
@ -324,7 +337,7 @@ func (s *Daemon) GetTorrent(infohashHex string) (*Controller, error) {
return nil, nil
}
return newController(t, s.excludedFiles), nil
return s.newController(t), nil
}
func slicesUnique[S ~[]E, E comparable](in S) S {

View file

@ -1,60 +0,0 @@
package torrent
import (
"context"
"path/filepath"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
)
func newFileMappingsStore(metaDir string, storage TorrentFileDeleter) (*filesMappingsStore, error) {
opts := kvbadger.DefaultOptions(filepath.Join(metaDir, "file-mappings"))
str, err := kvbadger.NewBadgerKVBytes[string, string](opts)
if err != nil {
return nil, err
}
r := &filesMappingsStore{
mappings: str,
storage: storage,
}
return r, nil
}
type filesMappingsStore struct {
mappings kv.Store[string, string]
storage TorrentFileDeleter
}
type TorrentFileDeleter interface {
DeleteFile(file *torrent.File) error
}
func fileKey(file *torrent.File) string {
return file.Torrent().InfoHash().HexString() + "/" + file.Path()
}
func (r *filesMappingsStore) MapFile(ctx context.Context, file *torrent.File, target string) error {
return r.mappings.Set(ctx, fileKey(file), target)
}
func (r *filesMappingsStore) ExcludeFile(ctx context.Context, file *torrent.File) error {
return r.mappings.Set(ctx, fileKey(file), "")
}
func (r *filesMappingsStore) FileMappings(ctx context.Context, ih infohash.T) (map[string]string, error) {
out := map[string]string{}
err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) error {
out[k] = v
return nil
})
return out, err
}
func (r *filesMappingsStore) Close(ctx context.Context) error {
return r.mappings.Close(ctx)
}

View file

@ -3,7 +3,6 @@ package torrent
import (
"bytes"
"encoding/gob"
"log/slog"
"time"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
@ -19,10 +18,8 @@ type fileItemStore struct {
}
func newFileItemStore(path string, itemsTTL time.Duration) (*fileItemStore, error) {
l := slog.With("component", "item-store")
opts := badger.DefaultOptions(path).
WithLogger(&dlog.Badger{L: l}).
WithLogger(dlog.BadgerLogger("torrent-client", "item-store")).
WithValueLogFileSize(1<<26 - 1)
db, err := badger.Open(opts)

View file

@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"log/slog"
"path/filepath"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
@ -20,11 +19,9 @@ type infoBytesStore struct {
}
func newInfoBytesStore(metaDir string) (*infoBytesStore, error) {
l := slog.With("component", "badger", "db", "info-bytes")
opts := badger.
DefaultOptions(filepath.Join(metaDir, "infobytes")).
WithLogger(&dlog.Badger{L: l})
WithLogger(dlog.BadgerLogger("torrent-client", "infobytes"))
db, err := badger.Open(opts)
if err != nil {
return nil, err

View file

@ -33,11 +33,9 @@ type badgerPieceCompletion struct {
var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil)
func newPieceCompletion(dir string) (storage.PieceCompletion, error) {
l := slog.With("component", "badger", "db", "piece-completion")
opts := badger.
DefaultOptions(dir).
WithLogger(&dlog.Badger{L: l})
WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion"))
db, err := badger.Open(opts)
if err != nil {
return nil, err

View file

@ -6,6 +6,7 @@ import (
"path"
"time"
"git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/types/infohash"
"github.com/dgraph-io/badger/v4"
"golang.org/x/exp/maps"
@ -15,7 +16,8 @@ func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error)
db, err := badger.OpenManaged(
badger.
DefaultOptions(path.Join(metaDir, "stats-history")).
WithNumVersionsToKeep(int(^uint(0) >> 1)), // Infinity
WithNumVersionsToKeep(int(^uint(0) >> 1)).
WithLogger(log.BadgerLogger("stats")), // Infinity
)
if err != nil {
return nil, err

View file

@ -16,6 +16,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
)
// OpenTorrent implements storage.ClientImplCloser.
@ -92,11 +93,8 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info
name := info.BestName()
aq, found, err := s.dirsAquire.Get(ctx, info.BestName())
if err != nil {
return false, false, err
}
if !found {
aq, err := s.dirsAquire.Get(ctx, info.BestName())
if errors.Is(err, kv.ErrKeyNotFound) {
err = s.dirsAquire.Set(ctx, name, DirAquire{
Name: name,
Hashes: slices.Compact([]infohash.T{ih}),
@ -107,6 +105,8 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info
log.Debug(ctx, "acquiring was not found, so created")
return true, false, nil
} else if err != nil {
return false, false, err
}
if slices.Contains(aq.Hashes, ih) {
@ -136,7 +136,6 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info
}
}
if slices.Contains(aq.Hashes, ih) {
log.Debug(ctx, "hash is compatable")
return true, false, nil

View file

@ -11,6 +11,7 @@ import (
"git.kmsign.ru/royalcat/tstor/src/tasks"
"github.com/royalcat/ctxio"
"github.com/royalcat/ctxprogress"
"github.com/royalcat/kv"
)
type Controller struct {
@ -86,13 +87,13 @@ func (c *Controller) Update(ctx context.Context, updater tasks.Updater) error {
}
func (c *Controller) Info(ctx context.Context) (ytdlp.Info, error) {
info, found, err := c.cachedinfo.Get(ctx)
if err != nil {
return info, err
}
if found {
info, err := c.cachedinfo.Get(ctx)
if err == nil {
return info, nil
}
if err != kv.ErrKeyNotFound {
return info, err
}
info, err = c.Info(ctx)
if err != nil {

View file

@ -1,9 +1,10 @@
package torrent
package tkv
import (
"path"
"git.kmsign.ru/royalcat/tstor/pkg/kvtrace"
tlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
"go.opentelemetry.io/otel/attribute"
@ -11,6 +12,7 @@ import (
func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
opts := kvbadger.DefaultOptions(path.Join(dbdir, name))
opts.BadgerOptions.Logger = tlog.BadgerLogger(name, "badger")
store, err = kvbadger.NewBadgerKVBytesKey[K, V](opts)
if err != nil {
return nil, err