From bc4b39b1c123e7f8f9ddeb8e6829beb4c48c674b Mon Sep 17 00:00:00 2001 From: royalcat Date: Mon, 17 Jun 2024 00:34:46 +0300 Subject: [PATCH] new kv, ctx in nfs handler --- go.mod | 4 +- go.sum | 8 +-- pkg/go-nfs/conn.go | 10 ++- pkg/go-nfs/handler.go | 6 +- pkg/go-nfs/helpers/cachinghandler.go | 7 +- pkg/go-nfs/helpers/nullauthhandler.go | 8 +-- pkg/go-nfs/mount.go | 2 +- pkg/go-nfs/nfs_onaccess.go | 2 +- pkg/go-nfs/nfs_oncommit.go | 2 +- pkg/go-nfs/nfs_oncreate.go | 4 +- pkg/go-nfs/nfs_onfsinfo.go | 2 +- pkg/go-nfs/nfs_onfsstat.go | 2 +- pkg/go-nfs/nfs_ongetattr.go | 2 +- pkg/go-nfs/nfs_onlink.go | 4 +- pkg/go-nfs/nfs_onlookup.go | 6 +- pkg/go-nfs/nfs_onmkdir.go | 4 +- pkg/go-nfs/nfs_onmknod.go | 4 +- pkg/go-nfs/nfs_onpathconf.go | 2 +- pkg/go-nfs/nfs_onread.go | 2 +- pkg/go-nfs/nfs_onreaddir.go | 4 +- pkg/go-nfs/nfs_onreaddirplus.go | 4 +- pkg/go-nfs/nfs_onreadlink.go | 2 +- pkg/go-nfs/nfs_onremove.go | 6 +- pkg/go-nfs/nfs_onrename.go | 8 +-- pkg/go-nfs/nfs_onsetattr.go | 2 +- pkg/go-nfs/nfs_onsymlink.go | 4 +- pkg/go-nfs/nfs_onwrite.go | 2 +- pkg/kvsingle/single.go | 2 +- pkg/kvtrace/kvmetrics.go | 90 ++++++++++++++++++++----- src/export/nfs/kvhandler.go | 24 +++---- src/log/badger.go | 19 ++++-- src/sources/torrent/controller.go | 85 ++++++++++++++++++----- src/sources/torrent/daemon.go | 37 ++++++---- src/sources/torrent/file_mappings.go | 60 ----------------- src/sources/torrent/fileitem.go | 5 +- src/sources/torrent/infobytes.go | 5 +- src/sources/torrent/kv.go | 22 ------ src/sources/torrent/piece_completion.go | 4 +- src/sources/torrent/stats_store.go | 4 +- src/sources/torrent/storage_open.go | 11 ++- src/sources/ytdlp/controller.go | 11 +-- 41 files changed, 270 insertions(+), 222 deletions(-) delete mode 100644 src/sources/torrent/file_mappings.go delete mode 100644 src/sources/torrent/kv.go diff --git a/go.mod b/go.mod index 0baac54..f153548 100644 --- a/go.mod +++ b/go.mod @@ -35,8 +35,8 @@ require ( github.com/ravilushqa/otelgqlgen v0.15.0 github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff - github.com/royalcat/kv v0.0.0-20240612224509-6aa0da315950 - github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950 + github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6 + github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6 github.com/rs/zerolog v1.32.0 github.com/samber/slog-multi v1.0.2 github.com/samber/slog-zerolog v1.0.0 diff --git a/go.sum b/go.sum index 279280f..f782da3 100644 --- a/go.sum +++ b/go.sum @@ -541,14 +541,14 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkLUsgvVdYO/UOJkzDyPzESfrTqWfM= github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI= -github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3 h1:1Ow/NUAWFZLghFcdNuyHt5Avb+bEI11qG8ELr9/XmQQ= -github.com/royalcat/ctxprogress v0.0.0-20240511091748-6d9b327537c3/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA= -github.com/royalcat/kv v0.0.0-20240612224509-6aa0da315950 h1:zHYwRhGWVkGQnjmStcnxTQ95Mtk5DL6w1PmdIn63EpI= -github.com/royalcat/kv v0.0.0-20240612224509-6aa0da315950/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= +github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6 h1:rGXhPFpOVLeOO/Da2qxBNZY5yaQdTCGxQV2dUDXXf7U= +github.com/royalcat/kv v0.0.0-20240615090409-961d9afa99b6/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950 h1:rKG2P4TNLgA4/Jl7LPayifjcw4txVGVSPkpHVhn3wnw= github.com/royalcat/kv/kvbadger v0.0.0-20240612224509-6aa0da315950/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= +github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6 h1:/TWa41uAL8Vk0MkvZc03EjA1/bS2otK5q0/+6bSWKJI= +github.com/royalcat/kv/kvbadger v0.0.0-20240615090409-961d9afa99b6/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8= github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/pkg/go-nfs/conn.go b/pkg/go-nfs/conn.go index 5210f0e..b29277d 100644 --- a/pkg/go-nfs/conn.go +++ b/pkg/go-nfs/conn.go @@ -117,9 +117,16 @@ func (c *conn) serializeWrites(ctx context.Context) { // Handle a request. errors from this method indicate a failure to read or // write on the network stream, and trigger a disconnection of the connection. -func (c *conn) handle(ctx context.Context, w *response) error { +func (c *conn) handle(ctx context.Context, w *response) (err error) { ctx, span := tracer.Start(ctx, fmt.Sprintf("nfs.handle.%s", NFSProcedure(w.req.Header.Proc).String())) defer span.End() + defer func() { + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() handler := c.Server.handlerFor(w.req.Header.Prog, w.req.Header.Proc) if handler == nil { @@ -146,7 +153,6 @@ func (c *conn) handle(ctx context.Context, w *response) error { } } - span.SetStatus(codes.Ok, "") return nil } diff --git a/pkg/go-nfs/handler.go b/pkg/go-nfs/handler.go index cac4d72..6bfb0cd 100644 --- a/pkg/go-nfs/handler.go +++ b/pkg/go-nfs/handler.go @@ -25,9 +25,9 @@ type Handler interface { // represent file objects as opaque references // Can be safely implemented via helpers/cachinghandler. - ToHandle(fs Filesystem, path []string) []byte - FromHandle(fh []byte) (Filesystem, []string, error) - InvalidateHandle(Filesystem, []byte) error + ToHandle(ctx context.Context, fs Filesystem, path []string) []byte + FromHandle(ctx context.Context, fh []byte) (Filesystem, []string, error) + InvalidateHandle(context.Context, Filesystem, []byte) error // How many handles can be safely maintained by the handler. HandleLimit() int diff --git a/pkg/go-nfs/helpers/cachinghandler.go b/pkg/go-nfs/helpers/cachinghandler.go index 06d5934..0604863 100644 --- a/pkg/go-nfs/helpers/cachinghandler.go +++ b/pkg/go-nfs/helpers/cachinghandler.go @@ -1,6 +1,7 @@ package helpers import ( + "context" "crypto/sha256" "encoding/binary" "io/fs" @@ -51,7 +52,7 @@ type entry struct { // ToHandle takes a file and represents it with an opaque handle to reference it. // In stateless nfs (when it's serving a unix fs) this can be the device + inode // but we can generalize with a stateful local cache of handed out IDs. -func (c *CachingHandler) ToHandle(f nfs.Filesystem, path []string) []byte { +func (c *CachingHandler) ToHandle(ctx context.Context, f nfs.Filesystem, path []string) []byte { joinedPath := f.Join(path...) if handle := c.searchReverseCache(f, joinedPath); handle != nil { @@ -79,7 +80,7 @@ func (c *CachingHandler) ToHandle(f nfs.Filesystem, path []string) []byte { } // FromHandle converts from an opaque handle to the file it represents -func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error) { +func (c *CachingHandler) FromHandle(ctx context.Context, fh []byte) (nfs.Filesystem, []string, error) { id, err := uuid.FromBytes(fh) if err != nil { return nil, []string{}, err @@ -134,7 +135,7 @@ func (c *CachingHandler) evictReverseCache(path string, handle uuid.UUID) { } } -func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) error { +func (c *CachingHandler) InvalidateHandle(ctx context.Context, fs nfs.Filesystem, handle []byte) error { //Remove from cache id, _ := uuid.FromBytes(handle) entry, ok := c.activeHandles.Get(id) diff --git a/pkg/go-nfs/helpers/nullauthhandler.go b/pkg/go-nfs/helpers/nullauthhandler.go index 87e4658..9b80458 100644 --- a/pkg/go-nfs/helpers/nullauthhandler.go +++ b/pkg/go-nfs/helpers/nullauthhandler.go @@ -35,21 +35,21 @@ func (h *NullAuthHandler) Change(fs nfs.Filesystem) nfs.Change { } // FSStat provides information about a filesystem. -func (h *NullAuthHandler) FSStat(ctx context.Context, f nfs.Filesystem, s *nfs.FSStat) error { +func (h *NullAuthHandler) FSStat(context.Context, nfs.Filesystem, *nfs.FSStat) error { return nil } // ToHandle handled by CachingHandler -func (h *NullAuthHandler) ToHandle(f nfs.Filesystem, s []string) []byte { +func (h *NullAuthHandler) ToHandle(context.Context, nfs.Filesystem, []string) []byte { return []byte{} } // FromHandle handled by CachingHandler -func (h *NullAuthHandler) FromHandle([]byte) (nfs.Filesystem, []string, error) { +func (h *NullAuthHandler) FromHandle(context.Context, []byte) (nfs.Filesystem, []string, error) { return nil, []string{}, nil } -func (c *NullAuthHandler) InvalidateHandle(nfs.Filesystem, []byte) error { +func (c *NullAuthHandler) InvalidateHandle(context.Context, nfs.Filesystem, []byte) error { return nil } diff --git a/pkg/go-nfs/mount.go b/pkg/go-nfs/mount.go index e95d098..019c098 100644 --- a/pkg/go-nfs/mount.go +++ b/pkg/go-nfs/mount.go @@ -39,7 +39,7 @@ func onMount(ctx context.Context, w *response, userHandle Handler) error { return err } - rootHndl := userHandle.ToHandle(handle, []string{}) + rootHndl := userHandle.ToHandle(ctx, handle, []string{}) if status == MountStatusOk { _ = xdr.Write(writer, rootHndl) diff --git a/pkg/go-nfs/nfs_onaccess.go b/pkg/go-nfs/nfs_onaccess.go index 6674734..40f91d6 100644 --- a/pkg/go-nfs/nfs_onaccess.go +++ b/pkg/go-nfs/nfs_onaccess.go @@ -14,7 +14,7 @@ func onAccess(ctx context.Context, w *response, userHandle Handler) error { if err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(roothandle) + fs, path, err := userHandle.FromHandle(ctx, roothandle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_oncommit.go b/pkg/go-nfs/nfs_oncommit.go index e2616d6..229df6d 100644 --- a/pkg/go-nfs/nfs_oncommit.go +++ b/pkg/go-nfs/nfs_oncommit.go @@ -18,7 +18,7 @@ func onCommit(ctx context.Context, w *response, userHandle Handler) error { } // The conn will drain the unread offset and count arguments. - fs, path, err := userHandle.FromHandle(handle) + fs, path, err := userHandle.FromHandle(ctx, handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_oncreate.go b/pkg/go-nfs/nfs_oncreate.go index 3a181d0..2dbf29b 100644 --- a/pkg/go-nfs/nfs_oncreate.go +++ b/pkg/go-nfs/nfs_oncreate.go @@ -47,7 +47,7 @@ func onCreate(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusNotSupp, os.ErrInvalid} } - fs, path, err := userHandle.FromHandle(obj.Handle) + fs, path, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -87,7 +87,7 @@ func onCreate(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusAccess, err} } - fp := userHandle.ToHandle(fs, newFile) + fp := userHandle.ToHandle(ctx, fs, newFile) changer := userHandle.Change(fs) if err := attrs.Apply(ctx, changer, fs, newFilePath); err != nil { Log.Errorf("Error applying attributes: %v\n", err) diff --git a/pkg/go-nfs/nfs_onfsinfo.go b/pkg/go-nfs/nfs_onfsinfo.go index 152e366..5a35716 100644 --- a/pkg/go-nfs/nfs_onfsinfo.go +++ b/pkg/go-nfs/nfs_onfsinfo.go @@ -24,7 +24,7 @@ func onFSInfo(ctx context.Context, w *response, userHandle Handler) error { if err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(roothandle) + fs, path, err := userHandle.FromHandle(ctx, roothandle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_onfsstat.go b/pkg/go-nfs/nfs_onfsstat.go index 325a106..14001a7 100644 --- a/pkg/go-nfs/nfs_onfsstat.go +++ b/pkg/go-nfs/nfs_onfsstat.go @@ -13,7 +13,7 @@ func onFSStat(ctx context.Context, w *response, userHandle Handler) error { if err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(roothandle) + fs, path, err := userHandle.FromHandle(ctx, roothandle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_ongetattr.go b/pkg/go-nfs/nfs_ongetattr.go index b936c06..d7adf28 100644 --- a/pkg/go-nfs/nfs_ongetattr.go +++ b/pkg/go-nfs/nfs_ongetattr.go @@ -15,7 +15,7 @@ func onGetAttr(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(handle) + fs, path, err := userHandle.FromHandle(ctx, handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_onlink.go b/pkg/go-nfs/nfs_onlink.go index 460a969..8522d5c 100644 --- a/pkg/go-nfs/nfs_onlink.go +++ b/pkg/go-nfs/nfs_onlink.go @@ -27,7 +27,7 @@ func onLink(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(obj.Handle) + fs, path, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -49,7 +49,7 @@ func onLink(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusNotDir, nil} } - fp := userHandle.ToHandle(fs, append(path, string(obj.Filename))) + fp := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename))) changer := userHandle.Change(fs) if changer == nil { return &NFSStatusError{NFSStatusAccess, err} diff --git a/pkg/go-nfs/nfs_onlookup.go b/pkg/go-nfs/nfs_onlookup.go index 6507d03..417fde3 100644 --- a/pkg/go-nfs/nfs_onlookup.go +++ b/pkg/go-nfs/nfs_onlookup.go @@ -33,7 +33,7 @@ func onLookup(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusInval, err} } - fs, p, err := userHandle.FromHandle(obj.Handle) + fs, p, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -58,7 +58,7 @@ func onLookup(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusAccess, os.ErrPermission} } pPath := p[0 : len(p)-1] - pHandle := userHandle.ToHandle(fs, pPath) + pHandle := userHandle.ToHandle(ctx, fs, pPath) resp, err := lookupSuccessResponse(ctx, pHandle, pPath, p, fs) if err != nil { return &NFSStatusError{NFSStatusServerFault, err} @@ -74,7 +74,7 @@ func onLookup(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusNoEnt, os.ErrNotExist} } - newHandle := userHandle.ToHandle(fs, reqPath) + newHandle := userHandle.ToHandle(ctx, fs, reqPath) resp, err := lookupSuccessResponse(ctx, newHandle, reqPath, p, fs) if err != nil { return &NFSStatusError{NFSStatusServerFault, err} diff --git a/pkg/go-nfs/nfs_onmkdir.go b/pkg/go-nfs/nfs_onmkdir.go index d96bab1..4652fcf 100644 --- a/pkg/go-nfs/nfs_onmkdir.go +++ b/pkg/go-nfs/nfs_onmkdir.go @@ -26,7 +26,7 @@ func onMkdir(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(obj.Handle) + fs, path, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -59,7 +59,7 @@ func onMkdir(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusAccess, err} } - fp := userHandle.ToHandle(fs, newFolder) + fp := userHandle.ToHandle(ctx, fs, newFolder) changer := userHandle.Change(fs) if changer != nil { if err := attrs.Apply(ctx, changer, fs, newFolderPath); err != nil { diff --git a/pkg/go-nfs/nfs_onmknod.go b/pkg/go-nfs/nfs_onmknod.go index 81ca2fa..d39a2d1 100644 --- a/pkg/go-nfs/nfs_onmknod.go +++ b/pkg/go-nfs/nfs_onmknod.go @@ -37,7 +37,7 @@ func onMknod(ctx context.Context, w *response, userHandle Handler) error { } // see if the filesystem supports mknod - fs, path, err := userHandle.FromHandle(obj.Handle) + fs, path, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -69,7 +69,7 @@ func onMknod(ctx context.Context, w *response, userHandle Handler) error { } else if !parent.IsDir() { return &NFSStatusError{NFSStatusNotDir, nil} } - fp := userHandle.ToHandle(fs, append(path, string(obj.Filename))) + fp := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename))) switch nfs_ftype(ftype) { case FTYPE_NF3CHR: diff --git a/pkg/go-nfs/nfs_onpathconf.go b/pkg/go-nfs/nfs_onpathconf.go index 1771b60..c6535b4 100644 --- a/pkg/go-nfs/nfs_onpathconf.go +++ b/pkg/go-nfs/nfs_onpathconf.go @@ -15,7 +15,7 @@ func onPathConf(ctx context.Context, w *response, userHandle Handler) error { if err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(roothandle) + fs, path, err := userHandle.FromHandle(ctx, roothandle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_onread.go b/pkg/go-nfs/nfs_onread.go index ee8e199..2d9fd1e 100644 --- a/pkg/go-nfs/nfs_onread.go +++ b/pkg/go-nfs/nfs_onread.go @@ -37,7 +37,7 @@ func onRead(ctx context.Context, w *response, userHandle Handler) error { if err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(obj.Handle) + fs, path, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_onreaddir.go b/pkg/go-nfs/nfs_onreaddir.go index 7f90a63..dc90f61 100644 --- a/pkg/go-nfs/nfs_onreaddir.go +++ b/pkg/go-nfs/nfs_onreaddir.go @@ -41,7 +41,7 @@ func onReadDir(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusTooSmall, io.ErrShortBuffer} } - fs, p, err := userHandle.FromHandle(obj.Handle) + fs, p, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -140,7 +140,7 @@ func onReadDir(ctx context.Context, w *response, userHandle Handler) error { func getDirListingWithVerifier(ctx context.Context, userHandle Handler, fsHandle []byte, verifier uint64) ([]fs.FileInfo, uint64, error) { // figure out what directory it is. - fs, p, err := userHandle.FromHandle(fsHandle) + fs, p, err := userHandle.FromHandle(ctx, fsHandle) if err != nil { return nil, 0, &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_onreaddirplus.go b/pkg/go-nfs/nfs_onreaddirplus.go index 9fa49cb..f4f3b7a 100644 --- a/pkg/go-nfs/nfs_onreaddirplus.go +++ b/pkg/go-nfs/nfs_onreaddirplus.go @@ -46,7 +46,7 @@ func onReadDirPlus(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusTooSmall, nil} } - fs, p, err := userHandle.FromHandle(obj.Handle) + fs, p, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -102,7 +102,7 @@ func onReadDirPlus(ctx context.Context, w *response, userHandle Handler) error { } filePath := joinPath(p, c.Name()) - handle := userHandle.ToHandle(fs, filePath) + handle := userHandle.ToHandle(ctx, fs, filePath) attrs := ToFileAttribute(c, path.Join(filePath...)) entities = append(entities, readDirPlusEntity{ FileID: attrs.Fileid, diff --git a/pkg/go-nfs/nfs_onreadlink.go b/pkg/go-nfs/nfs_onreadlink.go index 691124e..15cdca2 100644 --- a/pkg/go-nfs/nfs_onreadlink.go +++ b/pkg/go-nfs/nfs_onreadlink.go @@ -15,7 +15,7 @@ func onReadLink(ctx context.Context, w *response, userHandle Handler) error { if err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(handle) + fs, path, err := userHandle.FromHandle(ctx, handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_onremove.go b/pkg/go-nfs/nfs_onremove.go index b6a892b..4618245 100644 --- a/pkg/go-nfs/nfs_onremove.go +++ b/pkg/go-nfs/nfs_onremove.go @@ -15,7 +15,7 @@ func onRemove(ctx context.Context, w *response, userHandle Handler) error { if err := xdr.Read(w.req.Body, &obj); err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(obj.Handle) + fs, path, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -49,7 +49,7 @@ func onRemove(ctx context.Context, w *response, userHandle Handler) error { preCacheData := ToFileAttribute(dirInfo, fullPath).AsCache() toDelete := fs.Join(append(path, string(obj.Filename))...) - toDeleteHandle := userHandle.ToHandle(fs, append(path, string(obj.Filename))) + toDeleteHandle := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename))) err = fs.Remove(ctx, toDelete) if err != nil { @@ -65,7 +65,7 @@ func onRemove(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusIO, err} } - if err := userHandle.InvalidateHandle(fs, toDeleteHandle); err != nil { + if err := userHandle.InvalidateHandle(ctx, fs, toDeleteHandle); err != nil { return &NFSStatusError{NFSStatusServerFault, err} } diff --git a/pkg/go-nfs/nfs_onrename.go b/pkg/go-nfs/nfs_onrename.go index 9ebcfb4..2286afb 100644 --- a/pkg/go-nfs/nfs_onrename.go +++ b/pkg/go-nfs/nfs_onrename.go @@ -20,7 +20,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error { if err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs, fromPath, err := userHandle.FromHandle(from.Handle) + fs, fromPath, err := userHandle.FromHandle(ctx, from.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -29,7 +29,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error { if err = xdr.Read(w.req.Body, &to); err != nil { return &NFSStatusError{NFSStatusInval, err} } - fs2, toPath, err := userHandle.FromHandle(to.Handle) + fs2, toPath, err := userHandle.FromHandle(ctx, to.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -78,7 +78,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error { } preDestData := ToFileAttribute(toDirInfo, toDirPath).AsCache() - oldHandle := userHandle.ToHandle(fs, append(fromPath, string(from.Filename))) + oldHandle := userHandle.ToHandle(ctx, fs, append(fromPath, string(from.Filename))) fromLoc := fs.Join(append(fromPath, string(from.Filename))...) toLoc := fs.Join(append(toPath, string(to.Filename))...) @@ -97,7 +97,7 @@ func onRename(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusIO, err} } - if err := userHandle.InvalidateHandle(fs, oldHandle); err != nil { + if err := userHandle.InvalidateHandle(ctx, fs, oldHandle); err != nil { return &NFSStatusError{NFSStatusServerFault, err} } diff --git a/pkg/go-nfs/nfs_onsetattr.go b/pkg/go-nfs/nfs_onsetattr.go index 366773b..983df45 100644 --- a/pkg/go-nfs/nfs_onsetattr.go +++ b/pkg/go-nfs/nfs_onsetattr.go @@ -17,7 +17,7 @@ func onSetAttr(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(handle) + fs, path, err := userHandle.FromHandle(ctx, handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/go-nfs/nfs_onsymlink.go b/pkg/go-nfs/nfs_onsymlink.go index 4b728c6..c196b27 100644 --- a/pkg/go-nfs/nfs_onsymlink.go +++ b/pkg/go-nfs/nfs_onsymlink.go @@ -26,7 +26,7 @@ func onSymlink(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(obj.Handle) + fs, path, err := userHandle.FromHandle(ctx, obj.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } @@ -53,7 +53,7 @@ func onSymlink(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusAccess, err} } - fp := userHandle.ToHandle(fs, append(path, string(obj.Filename))) + fp := userHandle.ToHandle(ctx, fs, append(path, string(obj.Filename))) changer := userHandle.Change(fs) if changer != nil { if err := attrs.Apply(ctx, changer, fs, newFilePath); err != nil { diff --git a/pkg/go-nfs/nfs_onwrite.go b/pkg/go-nfs/nfs_onwrite.go index a7e5dd0..7fac090 100644 --- a/pkg/go-nfs/nfs_onwrite.go +++ b/pkg/go-nfs/nfs_onwrite.go @@ -35,7 +35,7 @@ func onWrite(ctx context.Context, w *response, userHandle Handler) error { return &NFSStatusError{NFSStatusInval, err} } - fs, path, err := userHandle.FromHandle(req.Handle) + fs, path, err := userHandle.FromHandle(ctx, req.Handle) if err != nil { return &NFSStatusError{NFSStatusStale, err} } diff --git a/pkg/kvsingle/single.go b/pkg/kvsingle/single.go index 015f94d..6212e4e 100644 --- a/pkg/kvsingle/single.go +++ b/pkg/kvsingle/single.go @@ -15,7 +15,7 @@ func New[K, V any](db kv.Store[K, V], key K) *Value[K, V] { return &Value[K, V]{Key: key, db: db} } -func (s *Value[K, V]) Get(ctx context.Context) (V, bool, error) { +func (s *Value[K, V]) Get(ctx context.Context) (V, error) { return s.db.Get(ctx, s.Key) } diff --git a/pkg/kvtrace/kvmetrics.go b/pkg/kvtrace/kvmetrics.go index 4e633a5..c1f5755 100644 --- a/pkg/kvtrace/kvmetrics.go +++ b/pkg/kvtrace/kvmetrics.go @@ -6,85 +6,145 @@ import ( "github.com/royalcat/kv" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) var tracer = otel.Tracer("github.com/royalcat/kv/tracer") -type traceSrtore[K, V any] struct { +type traceStore[K, V any] struct { kv kv.Store[K, V] attrs []attribute.KeyValue } func WrapTracing[K, V any](kv kv.Store[K, V], attrs ...attribute.KeyValue) kv.Store[K, V] { - return &traceSrtore[K, V]{ + return &traceStore[K, V]{ kv: kv, attrs: attrs, } } // Close implements kv.Store. -func (m *traceSrtore[K, V]) Close(ctx context.Context) error { +func (m *traceStore[K, V]) Close(ctx context.Context) (err error) { ctx, span := tracer.Start(ctx, "Close", trace.WithAttributes(m.attrs...)) defer span.End() + defer func() { + if err != nil && err != kv.ErrKeyNotFound { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() return m.kv.Close(ctx) } // Delete implements kv.Store. -func (m *traceSrtore[K, V]) Delete(ctx context.Context, k K) error { +func (m *traceStore[K, V]) Delete(ctx context.Context, k K) (err error) { ctx, span := tracer.Start(ctx, "Delete", trace.WithAttributes(m.attrs...)) defer span.End() + defer func() { + if err != nil && err != kv.ErrKeyNotFound { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() return m.kv.Delete(ctx, k) } // Get implements kv.Store. -func (m *traceSrtore[K, V]) Get(ctx context.Context, k K) (v V, found bool, err error) { +func (m *traceStore[K, V]) Get(ctx context.Context, k K) (v V, err error) { ctx, span := tracer.Start(ctx, "Get", trace.WithAttributes(m.attrs...)) defer span.End() + defer func() { + if err != nil && err != kv.ErrKeyNotFound { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() return m.kv.Get(ctx, k) } +// Get implements kv.Store. +func (m *traceStore[K, V]) Edit(ctx context.Context, k K, edit kv.Edit[V]) (err error) { + ctx, span := tracer.Start(ctx, "Get", trace.WithAttributes(m.attrs...)) + defer span.End() + defer func() { + if err != nil && err != kv.ErrKeyNotFound { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() + + return m.kv.Edit(ctx, k, edit) +} + // Range implements kv.Store. -func (m *traceSrtore[K, V]) Range(ctx context.Context, iter kv.Iter[K, V]) error { +func (m *traceStore[K, V]) Range(ctx context.Context, iter kv.Iter[K, V]) (err error) { ctx, span := tracer.Start(ctx, "Range", trace.WithAttributes(m.attrs...)) defer span.End() + defer func() { + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() count := 0 iterCount := func(k K, v V) error { count++ return iter(k, v) } + defer func() { + span.SetAttributes(attribute.Int("count", count)) + }() - err := m.kv.Range(ctx, iterCount) - span.SetAttributes(attribute.Int("count", count)) - return err + return m.kv.Range(ctx, iterCount) } // RangeWithPrefix implements kv.Store. -func (m *traceSrtore[K, V]) RangeWithPrefix(ctx context.Context, k K, iter kv.Iter[K, V]) error { +func (m *traceStore[K, V]) RangeWithPrefix(ctx context.Context, k K, iter kv.Iter[K, V]) (err error) { ctx, span := tracer.Start(ctx, "RangeWithPrefix", trace.WithAttributes(m.attrs...)) defer span.End() + defer func() { + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() count := 0 iterCount := func(k K, v V) error { count++ return iter(k, v) } + defer func() { + span.SetAttributes(attribute.Int("count", count)) + }() - err := m.kv.Range(ctx, iterCount) - span.SetAttributes(attribute.Int("count", count)) - return err + return m.kv.Range(ctx, iterCount) } // Set implements kv.Store. -func (m *traceSrtore[K, V]) Set(ctx context.Context, k K, v V) error { +func (m *traceStore[K, V]) Set(ctx context.Context, k K, v V) (err error) { ctx, span := tracer.Start(ctx, "Set", trace.WithAttributes(m.attrs...)) defer span.End() + defer func() { + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + }() return m.kv.Set(ctx, k, v) } -var _ kv.Store[any, any] = (*traceSrtore[any, any])(nil) +var _ kv.Store[any, any] = (*traceStore[any, any])(nil) diff --git a/src/export/nfs/kvhandler.go b/src/export/nfs/kvhandler.go index b842cf3..bc06f1f 100644 --- a/src/export/nfs/kvhandler.go +++ b/src/export/nfs/kvhandler.go @@ -2,6 +2,7 @@ package nfs import ( "context" + "errors" "fmt" "path" "strings" @@ -10,6 +11,7 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/go-nfs" "git.kmsign.ru/royalcat/tstor/src/config" + "git.kmsign.ru/royalcat/tstor/src/log" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" @@ -50,6 +52,7 @@ var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvh func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) { opts := kvbadger.DefaultOptions(path.Join(config.CachePath, "handlers")) opts.DefaultTTL = time.Hour + opts.BadgerOptions.Logger = log.BadgerLogger("nfs", "kvhandler") activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts) if err != nil { @@ -98,8 +101,7 @@ type CachingHandler struct { // ToHandle takes a file and represents it with an opaque handle to reference it. // In stateless nfs (when it's serving a unix fs) this can be the device + inode // but we can generalize with a stateful local cache of handed out IDs. -func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte { - ctx := context.Background() +func (c *CachingHandler) ToHandle(ctx context.Context, _ nfs.Filesystem, path []string) []byte { var id uuid.UUID cacheKey := handle(path).String() @@ -123,31 +125,29 @@ func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte { } // FromHandle converts from an opaque handle to the file it represents -func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error) { +func (c *CachingHandler) FromHandle(ctx context.Context, fh []byte) (nfs.Filesystem, []string, error) { c.mu.Lock() defer c.mu.Unlock() - ctx := context.Background() - id, err := uuid.FromBytes(fh) if err != nil { return nil, nil, err } - paths, found, err := c.activeHandles.Get(ctx, id) + paths, err := c.activeHandles.Get(ctx, id) if err != nil { + if errors.Is(err, kv.ErrKeyNotFound) { + return nil, nil, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale} + } + return nil, nil, fmt.Errorf("kv error: %w", err) } - if found { - return c.fs, paths, nil - } + return c.fs, paths, nil - return nil, nil, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale} } -func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) error { - ctx := context.Background() +func (c *CachingHandler) InvalidateHandle(ctx context.Context, fs nfs.Filesystem, handle []byte) error { //Remove from cache id, err := uuid.FromBytes(handle) if err != nil { diff --git a/src/log/badger.go b/src/log/badger.go index f547dde..ca57c37 100644 --- a/src/log/badger.go +++ b/src/log/badger.go @@ -5,12 +5,19 @@ import ( "log/slog" "strings" + "git.kmsign.ru/royalcat/tstor/pkg/rlog" "github.com/dgraph-io/badger/v4" ) -var _ badger.Logger = (*Badger)(nil) +func BadgerLogger(name ...string) badger.Logger { + return &badgerLogger{ + L: rlog.Component(append(name, "badger")...).Slog(), + } +} -type Badger struct { +var _ badger.Logger = (*badgerLogger)(nil) + +type badgerLogger struct { L *slog.Logger } @@ -18,18 +25,18 @@ func fmtBadgerLog(m string, f ...any) string { return fmt.Sprintf(strings.ReplaceAll(m, "\n", ""), f...) } -func (l *Badger) Errorf(m string, f ...interface{}) { +func (l *badgerLogger) Errorf(m string, f ...interface{}) { l.L.Error(fmtBadgerLog(m, f...)) } -func (l *Badger) Warningf(m string, f ...interface{}) { +func (l *badgerLogger) Warningf(m string, f ...interface{}) { l.L.Warn(fmtBadgerLog(m, f...)) } -func (l *Badger) Infof(m string, f ...interface{}) { +func (l *badgerLogger) Infof(m string, f ...interface{}) { l.L.Info(fmtBadgerLog(m, f...)) } -func (l *Badger) Debugf(m string, f ...interface{}) { +func (l *badgerLogger) Debugf(m string, f ...interface{}) { l.L.Debug(fmtBadgerLog(m, f...)) } diff --git a/src/sources/torrent/controller.go b/src/sources/torrent/controller.go index e6acd4c..0c863a3 100644 --- a/src/sources/torrent/controller.go +++ b/src/sources/torrent/controller.go @@ -8,20 +8,33 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/rlog" "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/types" + "github.com/royalcat/kv" ) +type TorrentFileDeleter interface { + DeleteFile(file *torrent.File) error +} + +type FileProperties struct { + Excluded bool `json:"excluded"` + Priority types.PiecePriority `json:"priority"` +} + type Controller struct { torrentFilePath string t *torrent.Torrent - rep *filesMappingsStore + storage TorrentFileDeleter + fileProperties kv.Store[string, FileProperties] log *rlog.Logger } -func newController(t *torrent.Torrent, rep *filesMappingsStore) *Controller { +func newController(t *torrent.Torrent, fileProperties kv.Store[string, FileProperties], storage TorrentFileDeleter) *Controller { return &Controller{ - t: t, - rep: rep, - log: rlog.Component("torrent/controller").With(slog.String("infohash", t.InfoHash().HexString())), + t: t, + storage: storage, + fileProperties: fileProperties, + log: rlog.Component("torrent-client", "controller").With(slog.String("infohash", t.InfoHash().HexString())), } } @@ -63,7 +76,11 @@ func (s *Controller) Length() int64 { } func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) { - fileMappings, err := s.rep.FileMappings(ctx, s.t.InfoHash()) + fps := map[string]FileProperties{} + err := s.fileProperties.Range(ctx, func(k string, v FileProperties) error { + fps[k] = v + return nil + }) if err != nil { return nil, err } @@ -84,9 +101,10 @@ func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) { if strings.Contains(p, "/.pad/") { return true } - if target, ok := fileMappings[p]; ok && target == "" { + if props, ok := fps[p]; ok && props.Excluded { return true } + return false }) @@ -102,7 +120,13 @@ func Map[T, U any](ts []T, f func(T) U) []U { } func (s *Controller) ExcludeFile(ctx context.Context, f *torrent.File) error { - return s.rep.ExcludeFile(ctx, f) + log := s.log.With(slog.String("file", f.Path())) + log.Info(ctx, "excluding file") + + return s.fileProperties.Edit(ctx, f.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) { + v.Excluded = true + return v, nil + }) } func (s *Controller) isFileComplete(startIndex int, endIndex int) bool { @@ -132,21 +156,46 @@ func (s *Controller) ValidateTorrent(ctx context.Context) error { return nil } +func (c *Controller) SetFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error { + log := c.log.With(slog.String("file", file.Path()), slog.Int("priority", int(priority))) + log.Info(ctx, "set pritority for file") + + err := c.fileProperties.Edit(ctx, file.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) { + v.Priority = priority + return v, nil + }) + if err != nil { + return err + } + + file.SetPriority(priority) + return nil +} + func (c *Controller) initializeTorrentPriories(ctx context.Context) error { log := c.log.WithComponent("initializeTorrentPriories") - // files, err := c.Files(ctx) - // if err != nil { - // return err - // } + files, err := c.Files(ctx) + if err != nil { + return err + } - // for _, file := range files { - // if file == nil { - // continue - // } + for _, file := range files { + if file == nil { + continue + } - // file.SetPriority(torrent.PiecePriorityNormal) - // } + props, err := c.fileProperties.Get(ctx, file.Path()) + if err != nil { + if err == kv.ErrKeyNotFound { + continue + } + log.Error(ctx, "failed to get file priority", rlog.Error(err)) + } + + file.SetPriority(props.Priority) + + } log.Info(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name())) diff --git a/src/sources/torrent/daemon.go b/src/sources/torrent/daemon.go index cfd10ed..756fb98 100644 --- a/src/sources/torrent/daemon.go +++ b/src/sources/torrent/daemon.go @@ -15,6 +15,7 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/ctxbilly" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" + "git.kmsign.ru/royalcat/tstor/src/tkv" "git.kmsign.ru/royalcat/tstor/src/vfs" "github.com/royalcat/ctxio" "go.opentelemetry.io/otel" @@ -39,12 +40,12 @@ type DirAquire struct { } type Daemon struct { - client *torrent.Client - excludedFiles *filesMappingsStore - infoBytes *infoBytesStore - Storage *fileStorage - fis *fileItemStore - dirsAquire kv.Store[string, DirAquire] + client *torrent.Client + infoBytes *infoBytesStore + Storage *fileStorage + fis *fileItemStore + dirsAquire kv.Store[string, DirAquire] + fileProperties kv.Store[string, FileProperties] loadMutex sync.Mutex torrentLoaded chan struct{} @@ -77,12 +78,13 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, return nil, err } - s.excludedFiles, err = newFileMappingsStore(conf.MetadataFolder, s.Storage) + s.fileProperties, err = tkv.NewKV[string, FileProperties](conf.MetadataFolder, "file-properties") if err != nil { return nil, err } s.infoBytes, err = newInfoBytesStore(conf.MetadataFolder) + if err != nil { return nil, err } @@ -98,7 +100,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Daemon, } s.client.AddDhtNodes(conf.DHTNodes) - s.dirsAquire, err = NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire") + s.dirsAquire, err = tkv.NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire") if err != nil { return nil, err } @@ -122,7 +124,7 @@ func (s *Daemon) Close(ctx context.Context) error { s.client.Close(), s.Storage.Close(), s.dirsAquire.Close(ctx), - s.excludedFiles.Close(ctx), + // s.excludedFiles.Close(ctx), s.infoBytes.Close(), s.fis.Close(), )...) @@ -214,7 +216,7 @@ func (s *Daemon) LoadTorrent(ctx context.Context, f vfs.File) (*Controller, erro // } } - ctl := newController(t, s.excludedFiles) + ctl := s.newController(t) err = ctl.initializeTorrentPriories(ctx) if err != nil { @@ -306,12 +308,23 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error { }) } +func storeByTorrent[K kv.Bytes, V any](s kv.Store[K, V], infohash infohash.T) kv.Store[K, V] { + return kv.PrefixBytes[K, V](s, K(infohash.HexString()+"/")) +} + +func (s *Daemon) newController(t *torrent.Torrent) *Controller { + return newController(t, + storeByTorrent(s.fileProperties, t.InfoHash()), + s.Storage, + ) +} + func (s *Daemon) ListTorrents(ctx context.Context) ([]*Controller, error) { <-s.torrentLoaded out := []*Controller{} for _, v := range s.client.Torrents() { - out = append(out, newController(v, s.excludedFiles)) + out = append(out, s.newController(v)) } return out, nil } @@ -324,7 +337,7 @@ func (s *Daemon) GetTorrent(infohashHex string) (*Controller, error) { return nil, nil } - return newController(t, s.excludedFiles), nil + return s.newController(t), nil } func slicesUnique[S ~[]E, E comparable](in S) S { diff --git a/src/sources/torrent/file_mappings.go b/src/sources/torrent/file_mappings.go deleted file mode 100644 index c9a8433..0000000 --- a/src/sources/torrent/file_mappings.go +++ /dev/null @@ -1,60 +0,0 @@ -package torrent - -import ( - "context" - "path/filepath" - - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/types/infohash" - "github.com/royalcat/kv" - "github.com/royalcat/kv/kvbadger" -) - -func newFileMappingsStore(metaDir string, storage TorrentFileDeleter) (*filesMappingsStore, error) { - opts := kvbadger.DefaultOptions(filepath.Join(metaDir, "file-mappings")) - str, err := kvbadger.NewBadgerKVBytes[string, string](opts) - if err != nil { - return nil, err - } - - r := &filesMappingsStore{ - mappings: str, - storage: storage, - } - - return r, nil -} - -type filesMappingsStore struct { - mappings kv.Store[string, string] - storage TorrentFileDeleter -} - -type TorrentFileDeleter interface { - DeleteFile(file *torrent.File) error -} - -func fileKey(file *torrent.File) string { - return file.Torrent().InfoHash().HexString() + "/" + file.Path() -} - -func (r *filesMappingsStore) MapFile(ctx context.Context, file *torrent.File, target string) error { - return r.mappings.Set(ctx, fileKey(file), target) -} - -func (r *filesMappingsStore) ExcludeFile(ctx context.Context, file *torrent.File) error { - return r.mappings.Set(ctx, fileKey(file), "") -} - -func (r *filesMappingsStore) FileMappings(ctx context.Context, ih infohash.T) (map[string]string, error) { - out := map[string]string{} - err := r.mappings.RangeWithPrefix(ctx, ih.HexString(), func(k, v string) error { - out[k] = v - return nil - }) - return out, err -} - -func (r *filesMappingsStore) Close(ctx context.Context) error { - return r.mappings.Close(ctx) -} diff --git a/src/sources/torrent/fileitem.go b/src/sources/torrent/fileitem.go index 259cb29..816388b 100644 --- a/src/sources/torrent/fileitem.go +++ b/src/sources/torrent/fileitem.go @@ -3,7 +3,6 @@ package torrent import ( "bytes" "encoding/gob" - "log/slog" "time" dlog "git.kmsign.ru/royalcat/tstor/src/log" @@ -19,10 +18,8 @@ type fileItemStore struct { } func newFileItemStore(path string, itemsTTL time.Duration) (*fileItemStore, error) { - l := slog.With("component", "item-store") - opts := badger.DefaultOptions(path). - WithLogger(&dlog.Badger{L: l}). + WithLogger(dlog.BadgerLogger("torrent-client", "item-store")). WithValueLogFileSize(1<<26 - 1) db, err := badger.Open(opts) diff --git a/src/sources/torrent/infobytes.go b/src/sources/torrent/infobytes.go index 8163562..a2add10 100644 --- a/src/sources/torrent/infobytes.go +++ b/src/sources/torrent/infobytes.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "log/slog" "path/filepath" dlog "git.kmsign.ru/royalcat/tstor/src/log" @@ -20,11 +19,9 @@ type infoBytesStore struct { } func newInfoBytesStore(metaDir string) (*infoBytesStore, error) { - l := slog.With("component", "badger", "db", "info-bytes") - opts := badger. DefaultOptions(filepath.Join(metaDir, "infobytes")). - WithLogger(&dlog.Badger{L: l}) + WithLogger(dlog.BadgerLogger("torrent-client", "infobytes")) db, err := badger.Open(opts) if err != nil { return nil, err diff --git a/src/sources/torrent/kv.go b/src/sources/torrent/kv.go deleted file mode 100644 index 1759002..0000000 --- a/src/sources/torrent/kv.go +++ /dev/null @@ -1,22 +0,0 @@ -package torrent - -import ( - "path" - - "git.kmsign.ru/royalcat/tstor/pkg/kvtrace" - "github.com/royalcat/kv" - "github.com/royalcat/kv/kvbadger" - "go.opentelemetry.io/otel/attribute" -) - -func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) { - opts := kvbadger.DefaultOptions(path.Join(dbdir, name)) - store, err = kvbadger.NewBadgerKVBytesKey[K, V](opts) - if err != nil { - return nil, err - } - - store = kvtrace.WrapTracing(store, attribute.String("collection", name), attribute.String("database", "badger")) - - return store, err -} diff --git a/src/sources/torrent/piece_completion.go b/src/sources/torrent/piece_completion.go index 9766c3a..55709ab 100644 --- a/src/sources/torrent/piece_completion.go +++ b/src/sources/torrent/piece_completion.go @@ -33,11 +33,9 @@ type badgerPieceCompletion struct { var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil) func newPieceCompletion(dir string) (storage.PieceCompletion, error) { - l := slog.With("component", "badger", "db", "piece-completion") - opts := badger. DefaultOptions(dir). - WithLogger(&dlog.Badger{L: l}) + WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion")) db, err := badger.Open(opts) if err != nil { return nil, err diff --git a/src/sources/torrent/stats_store.go b/src/sources/torrent/stats_store.go index eb3c029..b3f4b3e 100644 --- a/src/sources/torrent/stats_store.go +++ b/src/sources/torrent/stats_store.go @@ -6,6 +6,7 @@ import ( "path" "time" + "git.kmsign.ru/royalcat/tstor/src/log" "github.com/anacrolix/torrent/types/infohash" "github.com/dgraph-io/badger/v4" "golang.org/x/exp/maps" @@ -15,7 +16,8 @@ func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) db, err := badger.OpenManaged( badger. DefaultOptions(path.Join(metaDir, "stats-history")). - WithNumVersionsToKeep(int(^uint(0) >> 1)), // Infinity + WithNumVersionsToKeep(int(^uint(0) >> 1)). + WithLogger(log.BadgerLogger("stats")), // Infinity ) if err != nil { return nil, err diff --git a/src/sources/torrent/storage_open.go b/src/sources/torrent/storage_open.go index 921aa96..3941696 100644 --- a/src/sources/torrent/storage_open.go +++ b/src/sources/torrent/storage_open.go @@ -16,6 +16,7 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/types/infohash" + "github.com/royalcat/kv" ) // OpenTorrent implements storage.ClientImplCloser. @@ -92,11 +93,8 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info name := info.BestName() - aq, found, err := s.dirsAquire.Get(ctx, info.BestName()) - if err != nil { - return false, false, err - } - if !found { + aq, err := s.dirsAquire.Get(ctx, info.BestName()) + if errors.Is(err, kv.ErrKeyNotFound) { err = s.dirsAquire.Set(ctx, name, DirAquire{ Name: name, Hashes: slices.Compact([]infohash.T{ih}), @@ -107,6 +105,8 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info log.Debug(ctx, "acquiring was not found, so created") return true, false, nil + } else if err != nil { + return false, false, err } if slices.Contains(aq.Hashes, ih) { @@ -136,7 +136,6 @@ func (s *Daemon) checkTorrentCompatable(ctx context.Context, ih infohash.T, info } } - if slices.Contains(aq.Hashes, ih) { log.Debug(ctx, "hash is compatable") return true, false, nil diff --git a/src/sources/ytdlp/controller.go b/src/sources/ytdlp/controller.go index 1a4dc6a..0462e4f 100644 --- a/src/sources/ytdlp/controller.go +++ b/src/sources/ytdlp/controller.go @@ -11,6 +11,7 @@ import ( "git.kmsign.ru/royalcat/tstor/src/tasks" "github.com/royalcat/ctxio" "github.com/royalcat/ctxprogress" + "github.com/royalcat/kv" ) type Controller struct { @@ -86,13 +87,13 @@ func (c *Controller) Update(ctx context.Context, updater tasks.Updater) error { } func (c *Controller) Info(ctx context.Context) (ytdlp.Info, error) { - info, found, err := c.cachedinfo.Get(ctx) - if err != nil { - return info, err - } - if found { + info, err := c.cachedinfo.Get(ctx) + if err == nil { return info, nil } + if err != kv.ErrKeyNotFound { + return info, err + } info, err = c.Info(ctx) if err != nil {