From 0371af33442cebed8fcd87c96b5c7f7321476ad0 Mon Sep 17 00:00:00 2001 From: royalcat Date: Tue, 9 Jul 2024 00:19:04 +0300 Subject: [PATCH] file controller --- .gqlgen.yml | 2 +- go.mod | 4 +- go.sum | 8 +- graphql/sources/torrent_types.graphql | 3 +- pkg/kvsingle/single.go | 4 + src/delivery/graphql/generated.go | 198 +++++++++++------- src/delivery/graphql/model/mappers.go | 6 + src/delivery/graphql/model/models_gen.go | 9 +- .../resolver/torrent_mutation.resolvers.go | 14 +- .../resolver/torrent_types.resolvers.go | 22 +- src/delivery/router.go | 17 +- src/export/nfs/kvhandler.go | 3 +- src/log/badger.go | 8 +- src/sources/torrent/controller.go | 121 ++++------- src/sources/torrent/daemon.go | 8 +- src/sources/torrent/file_controller.go | 88 ++++++++ src/sources/torrent/fs.go | 10 +- src/sources/torrent/piece_completion.go | 175 ++++++++-------- src/telemetry/setup.go | 10 +- src/tkv/new.go | 6 +- ui/lib/api/schema.graphql | 4 +- 21 files changed, 440 insertions(+), 280 deletions(-) create mode 100644 src/sources/torrent/file_controller.go diff --git a/.gqlgen.yml b/.gqlgen.yml index e6c7dff..1f2b337 100644 --- a/.gqlgen.yml +++ b/.gqlgen.yml @@ -28,7 +28,7 @@ models: TorrentFile: extraFields: F: - type: "*github.com/anacrolix/torrent.File" + type: "*git.kmsign.ru/royalcat/tstor/src/sources/torrent.FileController" TorrentPeer: extraFields: F: diff --git a/go.mod b/go.mod index 34fed80..9a275e6 100644 --- a/go.mod +++ b/go.mod @@ -36,8 +36,8 @@ require ( github.com/ravilushqa/otelgqlgen v0.15.0 github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff - github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f - github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f + github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85 + github.com/royalcat/kv/kvbadger v0.0.0-20240707205211-fedd4883af85 github.com/rs/zerolog v1.32.0 github.com/samber/slog-multi v1.0.2 github.com/samber/slog-zerolog v1.0.0 diff --git a/go.sum b/go.sum index 4c30d9b..6f42f02 100644 --- a/go.sum +++ b/go.sum @@ -545,10 +545,10 @@ github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkL github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk= github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA= -github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f h1:bG8Pp/YXkpC2eFI7psTiTAL7QTBqHQcP/lEpiqmBXn4= -github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= -github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f h1:wz3pvg7YJdibZXQRV6B5pVPeDK8bgnuJVnBf7OFtCWI= -github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= +github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85 h1:AAuCp03M23u4qrK3dT1afFgf+diEijvSFnHb93Lv3PY= +github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU= +github.com/royalcat/kv/kvbadger v0.0.0-20240707205211-fedd4883af85 h1:OXRYz+eDPAlQjE1UCSIoBzVHjQ3Ayx7fGSM/Zlo3bhI= +github.com/royalcat/kv/kvbadger v0.0.0-20240707205211-fedd4883af85/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM= github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8= github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/graphql/sources/torrent_types.graphql b/graphql/sources/torrent_types.graphql index ecc5723..54b4548 100644 --- a/graphql/sources/torrent_types.graphql +++ b/graphql/sources/torrent_types.graphql @@ -4,7 +4,7 @@ type Torrent { bytesCompleted: Int! torrentFilePath: String! bytesMissing: Int! - priority: TorrentPriority! @resolver + priority: TorrentPriority! files: [TorrentFile!]! @resolver excludedFiles: [TorrentFile!]! @resolver peers: [TorrentPeer!]! @resolver @@ -14,6 +14,7 @@ type TorrentFile { filename: String! size: Int! bytesCompleted: Int! + priority: TorrentPriority! @resolver } type TorrentPeer { diff --git a/pkg/kvsingle/single.go b/pkg/kvsingle/single.go index 6212e4e..6c4b061 100644 --- a/pkg/kvsingle/single.go +++ b/pkg/kvsingle/single.go @@ -22,3 +22,7 @@ func (s *Value[K, V]) Get(ctx context.Context) (V, error) { func (s *Value[K, V]) Set(ctx context.Context, value V) error { return s.db.Set(ctx, s.Key, value) } + +func (s *Value[K, V]) Edit(ctx context.Context, edit kv.Edit[V]) error { + return s.db.Edit(ctx, s.Key, edit) +} diff --git a/src/delivery/graphql/generated.go b/src/delivery/graphql/generated.go index 11cd443..9723c03 100644 --- a/src/delivery/graphql/generated.go +++ b/src/delivery/graphql/generated.go @@ -51,6 +51,7 @@ type ResolverRoot interface { TorrentDaemonMutation() TorrentDaemonMutationResolver TorrentDaemonQuery() TorrentDaemonQueryResolver TorrentFS() TorrentFSResolver + TorrentFile() TorrentFileResolver } type DirectiveRoot struct { @@ -146,6 +147,7 @@ type ComplexityRoot struct { TorrentFile struct { BytesCompleted func(childComplexity int) int Filename func(childComplexity int) int + Priority func(childComplexity int) int Size func(childComplexity int) int } @@ -195,7 +197,6 @@ type SubscriptionResolver interface { type TorrentResolver interface { Name(ctx context.Context, obj *model.Torrent) (string, error) - Priority(ctx context.Context, obj *model.Torrent) (types.PiecePriority, error) Files(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) ExcludedFiles(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) Peers(ctx context.Context, obj *model.Torrent) ([]*model.TorrentPeer, error) @@ -211,6 +212,9 @@ type TorrentDaemonQueryResolver interface { type TorrentFSResolver interface { Entries(ctx context.Context, obj *model.TorrentFs) ([]model.FsEntry, error) } +type TorrentFileResolver interface { + Priority(ctx context.Context, obj *model.TorrentFile) (types.PiecePriority, error) +} type executableSchema struct { schema *ast.Schema @@ -546,6 +550,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.TorrentFile.Filename(childComplexity), true + case "TorrentFile.priority": + if e.complexity.TorrentFile.Priority == nil { + break + } + + return e.complexity.TorrentFile.Priority(childComplexity), true + case "TorrentFile.size": if e.complexity.TorrentFile.Size == nil { break @@ -859,7 +870,7 @@ input TorrentFilter @oneOf { bytesCompleted: Int! torrentFilePath: String! bytesMissing: Int! - priority: TorrentPriority! @resolver + priority: TorrentPriority! files: [TorrentFile!]! @resolver excludedFiles: [TorrentFile!]! @resolver peers: [TorrentPeer!]! @resolver @@ -869,6 +880,7 @@ type TorrentFile { filename: String! size: Int! bytesCompleted: Int! + priority: TorrentPriority! @resolver } type TorrentPeer { @@ -884,7 +896,6 @@ enum TorrentPriority { NORMAL HIGH READAHEAD - NEXT NOW } `, BuiltIn: false}, @@ -2676,28 +2687,8 @@ func (ec *executionContext) _Torrent_priority(ctx context.Context, field graphql } }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - directive0 := func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return ec.resolvers.Torrent().Priority(rctx, obj) - } - directive1 := func(ctx context.Context) (interface{}, error) { - if ec.directives.Resolver == nil { - return nil, errors.New("directive resolver is not implemented") - } - return ec.directives.Resolver(ctx, obj, directive0) - } - - tmp, err := directive1(rctx) - if err != nil { - return nil, graphql.ErrorOnPath(ctx, err) - } - if tmp == nil { - return nil, nil - } - if data, ok := tmp.(types.PiecePriority); ok { - return data, nil - } - return nil, fmt.Errorf(`unexpected type %T from directive, should be github.com/anacrolix/torrent/types.PiecePriority`, tmp) + ctx = rctx // use context from middleware stack in children + return obj.Priority, nil }) if err != nil { ec.Error(ctx, err) @@ -2718,8 +2709,8 @@ func (ec *executionContext) fieldContext_Torrent_priority(_ context.Context, fie fc = &graphql.FieldContext{ Object: "Torrent", Field: field, - IsMethod: true, - IsResolver: true, + IsMethod: false, + IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type TorrentPriority does not have child fields") }, @@ -2792,6 +2783,8 @@ func (ec *executionContext) fieldContext_Torrent_files(_ context.Context, field return ec.fieldContext_TorrentFile_size(ctx, field) case "bytesCompleted": return ec.fieldContext_TorrentFile_bytesCompleted(ctx, field) + case "priority": + return ec.fieldContext_TorrentFile_priority(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type TorrentFile", field.Name) }, @@ -2864,6 +2857,8 @@ func (ec *executionContext) fieldContext_Torrent_excludedFiles(_ context.Context return ec.fieldContext_TorrentFile_size(ctx, field) case "bytesCompleted": return ec.fieldContext_TorrentFile_bytesCompleted(ctx, field) + case "priority": + return ec.fieldContext_TorrentFile_priority(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type TorrentFile", field.Name) }, @@ -3577,6 +3572,70 @@ func (ec *executionContext) fieldContext_TorrentFile_bytesCompleted(_ context.Co return fc, nil } +func (ec *executionContext) _TorrentFile_priority(ctx context.Context, field graphql.CollectedField, obj *model.TorrentFile) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_TorrentFile_priority(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + directive0 := func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.TorrentFile().Priority(rctx, obj) + } + directive1 := func(ctx context.Context) (interface{}, error) { + if ec.directives.Resolver == nil { + return nil, errors.New("directive resolver is not implemented") + } + return ec.directives.Resolver(ctx, obj, directive0) + } + + tmp, err := directive1(rctx) + if err != nil { + return nil, graphql.ErrorOnPath(ctx, err) + } + if tmp == nil { + return nil, nil + } + if data, ok := tmp.(types.PiecePriority); ok { + return data, nil + } + return nil, fmt.Errorf(`unexpected type %T from directive, should be github.com/anacrolix/torrent/types.PiecePriority`, tmp) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(types.PiecePriority) + fc.Result = res + return ec.marshalNTorrentPriority2githubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriority(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_TorrentFile_priority(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "TorrentFile", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type TorrentPriority does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _TorrentFileEntry_name(ctx context.Context, field graphql.CollectedField, obj *model.TorrentFileEntry) (ret graphql.Marshaler) { fc, err := ec.fieldContext_TorrentFileEntry_name(ctx, field) if err != nil { @@ -7422,41 +7481,10 @@ func (ec *executionContext) _Torrent(ctx context.Context, sel ast.SelectionSet, atomic.AddUint32(&out.Invalids, 1) } case "priority": - field := field - - innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - } - }() - res = ec._Torrent_priority(ctx, field, obj) - if res == graphql.Null { - atomic.AddUint32(&fs.Invalids, 1) - } - return res + out.Values[i] = ec._Torrent_priority(ctx, field, obj) + if out.Values[i] == graphql.Null { + atomic.AddUint32(&out.Invalids, 1) } - - if field.Deferrable != nil { - dfs, ok := deferred[field.Deferrable.Label] - di := 0 - if ok { - dfs.AddField(field) - di = len(dfs.Values) - 1 - } else { - dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) - deferred[field.Deferrable.Label] = dfs - } - dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { - return innerFunc(ctx, dfs) - }) - - // don't run the out.Concurrently() call below - out.Values[i] = graphql.Null - continue - } - - out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) case "files": field := field @@ -7894,18 +7922,54 @@ func (ec *executionContext) _TorrentFile(ctx context.Context, sel ast.SelectionS case "filename": out.Values[i] = ec._TorrentFile_filename(ctx, field, obj) if out.Values[i] == graphql.Null { - out.Invalids++ + atomic.AddUint32(&out.Invalids, 1) } case "size": out.Values[i] = ec._TorrentFile_size(ctx, field, obj) if out.Values[i] == graphql.Null { - out.Invalids++ + atomic.AddUint32(&out.Invalids, 1) } case "bytesCompleted": out.Values[i] = ec._TorrentFile_bytesCompleted(ctx, field, obj) if out.Values[i] == graphql.Null { - out.Invalids++ + atomic.AddUint32(&out.Invalids, 1) } + case "priority": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._TorrentFile_priority(ctx, field, obj) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + if field.Deferrable != nil { + dfs, ok := deferred[field.Deferrable.Label] + di := 0 + if ok { + dfs.AddField(field) + di = len(dfs.Values) - 1 + } else { + dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) + deferred[field.Deferrable.Label] = dfs + } + dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { + return innerFunc(ctx, dfs) + }) + + // don't run the out.Concurrently() call below + out.Values[i] = graphql.Null + continue + } + + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) default: panic("unknown field " + strconv.Quote(field.Name)) } @@ -8772,7 +8836,6 @@ var ( "NORMAL": types.PiecePriorityNormal, "HIGH": types.PiecePriorityHigh, "READAHEAD": types.PiecePriorityReadahead, - "NEXT": types.PiecePriorityNext, "NOW": types.PiecePriorityNow, } marshalNTorrentPriority2githubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriority = map[types.PiecePriority]string{ @@ -8780,7 +8843,6 @@ var ( types.PiecePriorityNormal: "NORMAL", types.PiecePriorityHigh: "HIGH", types.PiecePriorityReadahead: "READAHEAD", - types.PiecePriorityNext: "NEXT", types.PiecePriorityNow: "NOW", } ) @@ -9341,7 +9403,6 @@ var ( "NORMAL": types.PiecePriorityNormal, "HIGH": types.PiecePriorityHigh, "READAHEAD": types.PiecePriorityReadahead, - "NEXT": types.PiecePriorityNext, "NOW": types.PiecePriorityNow, } marshalOTorrentPriority2ᚕgithubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriorityᚄ = map[types.PiecePriority]string{ @@ -9349,7 +9410,6 @@ var ( types.PiecePriorityNormal: "NORMAL", types.PiecePriorityHigh: "HIGH", types.PiecePriorityReadahead: "READAHEAD", - types.PiecePriorityNext: "NEXT", types.PiecePriorityNow: "NOW", } ) @@ -9377,7 +9437,6 @@ var ( "NORMAL": types.PiecePriorityNormal, "HIGH": types.PiecePriorityHigh, "READAHEAD": types.PiecePriorityReadahead, - "NEXT": types.PiecePriorityNext, "NOW": types.PiecePriorityNow, } marshalOTorrentPriority2ᚖgithubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriority = map[types.PiecePriority]string{ @@ -9385,7 +9444,6 @@ var ( types.PiecePriorityNormal: "NORMAL", types.PiecePriorityHigh: "HIGH", types.PiecePriorityReadahead: "READAHEAD", - types.PiecePriorityNext: "NEXT", types.PiecePriorityNow: "NOW", } ) diff --git a/src/delivery/graphql/model/mappers.go b/src/delivery/graphql/model/mappers.go index 2ebfba5..217e171 100644 --- a/src/delivery/graphql/model/mappers.go +++ b/src/delivery/graphql/model/mappers.go @@ -29,11 +29,17 @@ func MapPeerSource(source atorrent.PeerSource) string { } func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) { + prio, err := t.Priority(ctx) + if err != nil { + return nil, err + } + return &Torrent{ Infohash: t.InfoHash(), Name: t.Name(), BytesCompleted: t.BytesCompleted(), BytesMissing: t.BytesMissing(), + Priority: prio, T: t, }, nil } diff --git a/src/delivery/graphql/model/models_gen.go b/src/delivery/graphql/model/models_gen.go index 3efd284..cb5a4e6 100644 --- a/src/delivery/graphql/model/models_gen.go +++ b/src/delivery/graphql/model/models_gen.go @@ -217,10 +217,11 @@ func (this TorrentFs) GetEntries() []FsEntry { func (TorrentFs) IsFsEntry() {} type TorrentFile struct { - Filename string `json:"filename"` - Size int64 `json:"size"` - BytesCompleted int64 `json:"bytesCompleted"` - F *torrent1.File `json:"-"` + Filename string `json:"filename"` + Size int64 `json:"size"` + BytesCompleted int64 `json:"bytesCompleted"` + Priority types.PiecePriority `json:"priority"` + F *torrent.FileController `json:"-"` } type TorrentFileEntry struct { diff --git a/src/delivery/graphql/resolver/torrent_mutation.resolvers.go b/src/delivery/graphql/resolver/torrent_mutation.resolvers.go index 0f4bd8f..bf79feb 100644 --- a/src/delivery/graphql/resolver/torrent_mutation.resolvers.go +++ b/src/delivery/graphql/resolver/torrent_mutation.resolvers.go @@ -53,7 +53,19 @@ func (r *torrentDaemonMutationResolver) SetTorrentPriority(ctx context.Context, return false, nil } - err = t.SetPriority(ctx, file, priority) + if file == nil { + err = t.SetPriority(ctx, priority) + if err != nil { + return false, err + } + return true, nil + } + + f, err := t.GetFile(ctx, *file) + if err != nil { + return false, err + } + err = f.SetPriority(ctx, priority) if err != nil { return false, err } diff --git a/src/delivery/graphql/resolver/torrent_types.resolvers.go b/src/delivery/graphql/resolver/torrent_types.resolvers.go index 90f0dff..106c7ed 100644 --- a/src/delivery/graphql/resolver/torrent_types.resolvers.go +++ b/src/delivery/graphql/resolver/torrent_types.resolvers.go @@ -17,11 +17,6 @@ func (r *torrentResolver) Name(ctx context.Context, obj *model.Torrent) (string, return obj.T.Name(), nil } -// Priority is the resolver for the priority field. -func (r *torrentResolver) Priority(ctx context.Context, obj *model.Torrent) (types.PiecePriority, error) { - return obj.T.Priority(ctx, nil) -} - // Files is the resolver for the files field. func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) { out := []*model.TorrentFile{} @@ -31,8 +26,8 @@ func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*mod } for _, f := range files { out = append(out, &model.TorrentFile{ - Filename: f.DisplayPath(), - Size: f.Length(), + Filename: f.Path(), + Size: f.Size(), BytesCompleted: f.BytesCompleted(), F: f, }) @@ -76,7 +71,20 @@ func (r *torrentResolver) Peers(ctx context.Context, obj *model.Torrent) ([]*mod return peers, nil } +// Priority is the resolver for the priority field. +func (r *torrentFileResolver) Priority(ctx context.Context, obj *model.TorrentFile) (types.PiecePriority, error) { + props, err := obj.F.Properties(ctx) + if err != nil { + return 0, err + } + return props.Priority, nil +} + // Torrent returns graph.TorrentResolver implementation. func (r *Resolver) Torrent() graph.TorrentResolver { return &torrentResolver{r} } +// TorrentFile returns graph.TorrentFileResolver implementation. +func (r *Resolver) TorrentFile() graph.TorrentFileResolver { return &torrentFileResolver{r} } + type torrentResolver struct{ *Resolver } +type torrentFileResolver struct{ *Resolver } diff --git a/src/delivery/router.go b/src/delivery/router.go index cfb526b..59f8e7e 100644 --- a/src/delivery/router.go +++ b/src/delivery/router.go @@ -2,10 +2,8 @@ package delivery import ( "context" - "log/slog" "net/http" - "git.kmsign.ru/royalcat/tstor/pkg/rlog" graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql" "git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver" "git.kmsign.ru/royalcat/tstor/src/sources/torrent" @@ -36,14 +34,13 @@ func GraphQLHandler(service *torrent.Daemon, vfs vfs.Filesystem) http.Handler { ), ) - log := rlog.Component("graphql") - - graphqlHandler.AroundResponses(func(ctx context.Context, next graphql.ResponseHandler) *graphql.Response { - resp := next(ctx) - responseJson, _ := resp.Data.MarshalJSON() - log.Info(ctx, "response", slog.String("body", string(responseJson))) - return resp - }) + // log := rlog.Component("graphql") + // graphqlHandler.AroundResponses(func(ctx context.Context, next graphql.ResponseHandler) *graphql.Response { + // resp := next(ctx) + // responseJson, _ := resp.Data.MarshalJSON() + // log.Info(ctx, "response", slog.String("body", string(responseJson))) + // return resp + // }) graphqlHandler.AddTransport(&transport.POST{}) graphqlHandler.AddTransport(&transport.Websocket{}) diff --git a/src/export/nfs/kvhandler.go b/src/export/nfs/kvhandler.go index 729a996..678e5b5 100644 --- a/src/export/nfs/kvhandler.go +++ b/src/export/nfs/kvhandler.go @@ -49,7 +49,8 @@ var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvh // NewKvHandler provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings. func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) { - opts := kvbadger.DefaultOptions(path.Join(config.CachePath, "handlers")) + opts := kvbadger.DefaultOptions[handle](path.Join(config.CachePath, "handlers")) + opts.Codec = kv.CodecBinary[handle, *handle]{} opts.BadgerOptions.Logger = log.BadgerLogger("nfs", "kvhandler") activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts) diff --git a/src/log/badger.go b/src/log/badger.go index 52f59c0..51fd2d8 100644 --- a/src/log/badger.go +++ b/src/log/badger.go @@ -11,7 +11,7 @@ import ( func BadgerLogger(name ...string) badger.Logger { return &badgerLogger{ - L: rlog.Component(append([]string{"badger"}, name...)...).Nested(2), + L: rlog.Component(append(name, "badger")...).Nested(2), } } @@ -41,6 +41,8 @@ func (l *badgerLogger) Infof(m string, f ...interface{}) { } func (l *badgerLogger) Debugf(m string, f ...interface{}) { - ctx := context.Background() - l.L.Debug(ctx, fmtBadgerLog(m, f...)) + return + // too much logging + // ctx := context.Background() + // l.L.Debug(ctx, fmtBadgerLog(m, f...)) } diff --git a/src/sources/torrent/controller.go b/src/sources/torrent/controller.go index 0bf1b13..d2d45fa 100644 --- a/src/sources/torrent/controller.go +++ b/src/sources/torrent/controller.go @@ -3,9 +3,9 @@ package torrent import ( "context" "log/slog" - "slices" "strings" + "git.kmsign.ru/royalcat/tstor/pkg/kvsingle" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/types" @@ -75,7 +75,10 @@ func (s *Controller) Length() int64 { return s.t.Length() } -func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) { +func (s *Controller) Files(ctx context.Context) ([]*FileController, error) { + ctx, span := tracer.Start(ctx, "Files") + defer span.End() + fps := map[string]FileProperties{} err := s.fileProperties.Range(ctx, func(k string, v FileProperties) error { fps[k] = v @@ -91,27 +94,21 @@ func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) { case <-s.t.GotInfo(): } - files := s.t.Files() - files = slices.DeleteFunc(files, func(file *torrent.File) bool { - if file == nil { - return true + files := make([]*FileController, 0) + for _, v := range s.t.Files() { + if strings.Contains(v.Path(), "/.pad/") { + continue } - p := file.Path() - if strings.Contains(p, "/.pad/") { - return true - } - if props, ok := fps[p]; ok && props.Excluded { - return true - } - - return false - }) + props := kvsingle.New(s.fileProperties, v.Path()) + ctl := NewFileController(v, props) + files = append(files, ctl) + } return files, nil } -func (s *Controller) GetFile(ctx context.Context, file string) (*torrent.File, error) { +func (s *Controller) GetFile(ctx context.Context, file string) (*FileController, error) { files, err := s.Files(ctx) if err != nil { return nil, err @@ -181,21 +178,8 @@ func (s *Controller) ValidateTorrent(ctx context.Context) error { return nil } -func (c *Controller) SetPriority(ctx context.Context, filePath *string, priority types.PiecePriority) error { - log := c.log.With(slog.Int("priority", int(priority))) - - if filePath != nil { - file, err := c.GetFile(ctx, *filePath) - if err != nil { - return err - } - if file == nil { - log.Error(ctx, "file not found") - return nil - } - - return c.setFilePriority(ctx, file, priority) - } +func (c *Controller) SetPriority(ctx context.Context, priority types.PiecePriority) error { + // log := c.log.With(slog.Int("priority", int(priority))) for _, f := range c.t.Files() { err := c.setFilePriority(ctx, f, priority) @@ -209,49 +193,39 @@ func (c *Controller) SetPriority(ctx context.Context, filePath *string, priority const defaultPriority = types.PiecePriorityNone -func (c *Controller) Priority(ctx context.Context, filePath *string) (types.PiecePriority, error) { - if filePath == nil { - prio := defaultPriority - err := c.fileProperties.Range(ctx, func(filePath string, v FileProperties) error { - if filePath == "" { - return nil - } - - if v.Priority > prio { - prio = v.Priority - } - return nil - }) - if err == kv.ErrKeyNotFound { - err = nil - } - - return prio, err - } - - props, err := c.fileProperties.Get(ctx, *filePath) +func (c *Controller) Priority(ctx context.Context) (types.PiecePriority, error) { + prio := defaultPriority + files, err := c.Files(ctx) if err != nil { - if err == kv.ErrKeyNotFound { - return defaultPriority, nil - } return 0, err } + for _, v := range files { + props, err := v.Properties(ctx) + if err != nil { + return 0, err + } + if props.Priority > prio { + prio = props.Priority + } + } - return props.Priority, nil + return prio, nil } func (c *Controller) setFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error { err := c.fileProperties.Edit(ctx, file.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) { v.Priority = priority return v, nil }) - if err != nil { - if err == kv.ErrKeyNotFound { - err := c.fileProperties.Set(ctx, file.Path(), FileProperties{Priority: priority}) - if err != nil { - return err - } - } + if err == kv.ErrKeyNotFound { + seterr := c.fileProperties.Set(ctx, file.Path(), FileProperties{Priority: priority}) + if seterr != nil { + return seterr + } + err = nil + } + + if err != nil { return err } file.SetPriority(priority) @@ -259,7 +233,9 @@ func (c *Controller) setFilePriority(ctx context.Context, file *torrent.File, pr } func (c *Controller) initializeTorrentPriories(ctx context.Context) error { - log := c.log.WithComponent("initializeTorrentPriories") + ctx, span := tracer.Start(ctx, "initializeTorrentPriories") + defer span.End() + log := c.log files, err := c.Files(ctx) if err != nil { @@ -267,19 +243,14 @@ func (c *Controller) initializeTorrentPriories(ctx context.Context) error { } for _, file := range files { - if file == nil { + props, err := file.Properties(ctx) + if err != nil { + log.Error(ctx, "failed to get file properties", rlog.Error(err)) continue } + log = log.With(slog.Int("priority", int(props.Priority))) - props, err := c.fileProperties.Get(ctx, file.Path()) - if err != nil { - if err == kv.ErrKeyNotFound { - continue - } - log.Error(ctx, "failed to get file priority", rlog.Error(err)) - } - - file.SetPriority(props.Priority) + file.file.SetPriority(props.Priority) } log.Info(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name())) diff --git a/src/sources/torrent/daemon.go b/src/sources/torrent/daemon.go index d7eeb89..7594b3c 100644 --- a/src/sources/torrent/daemon.go +++ b/src/sources/torrent/daemon.go @@ -32,7 +32,9 @@ import ( "github.com/royalcat/kv" ) -var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent") +var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent", + trace.WithInstrumentationAttributes(attribute.String("component", "torrent-daemon")), +) type DirAquire struct { Name string @@ -131,7 +133,7 @@ func (s *Daemon) Close(ctx context.Context) error { } func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) { - ctx, span := tracer.Start(ctx, "LoadTorrent") + ctx, span := tracer.Start(ctx, "loadTorrent") defer span.End() log := s.log @@ -247,7 +249,7 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error { defer span.End() log := s.log - loaderPaths := make(chan string) + loaderPaths := make(chan string, loadWorkers*5) wg := sync.WaitGroup{} defer func() { diff --git a/src/sources/torrent/file_controller.go b/src/sources/torrent/file_controller.go new file mode 100644 index 0000000..1a54702 --- /dev/null +++ b/src/sources/torrent/file_controller.go @@ -0,0 +1,88 @@ +package torrent + +import ( + "context" + + "git.kmsign.ru/royalcat/tstor/pkg/kvsingle" + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/types" + "github.com/royalcat/kv" +) + +type FileController struct { + file *torrent.File + properties *kvsingle.Value[string, FileProperties] +} + +func NewFileController(f *torrent.File, properties *kvsingle.Value[string, FileProperties]) *FileController { + return &FileController{ + file: f, + properties: properties, + } +} + +func (s *FileController) Properties(ctx context.Context) (FileProperties, error) { + p, err := s.properties.Get(ctx) + if err == kv.ErrKeyNotFound { + return FileProperties{ + Excluded: false, + Priority: defaultPriority, + }, nil + } + + if err != nil { + return FileProperties{}, err + } + return p, nil +} + +func (s *FileController) SetPriority(ctx context.Context, priority types.PiecePriority) error { + err := s.properties.Edit(ctx, func(ctx context.Context, v FileProperties) (FileProperties, error) { + v.Priority = priority + return v, nil + }) + if err == kv.ErrKeyNotFound { + seterr := s.properties.Set(ctx, FileProperties{ + Priority: priority, + }) + if seterr != nil { + return err + } + err = nil + } + if err != nil { + return err + } + + s.file.SetPriority(priority) + + return nil +} + +func (s *FileController) FileInfo() metainfo.FileInfo { + return s.file.FileInfo() +} + +func (s *FileController) Excluded(ctx context.Context) (bool, error) { + p, err := s.properties.Get(ctx) + if err == kv.ErrKeyNotFound { + return false, nil + } + if err != nil { + return false, err + } + return p.Excluded, nil +} + +func (s *FileController) Path() string { + return s.file.Path() +} + +func (s *FileController) Size() int64 { + return s.file.Length() +} + +func (s *FileController) BytesCompleted() int64 { + return s.file.BytesCompleted() +} diff --git a/src/sources/torrent/fs.go b/src/sources/torrent/fs.go index e77cf8d..bbdb74d 100644 --- a/src/sources/torrent/fs.go +++ b/src/sources/torrent/fs.go @@ -121,8 +121,16 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) { fs.filesCache = make(map[string]vfs.File) for _, file := range files { + props, err := file.Properties(ctx) + if err != nil { + return nil, err + } + if props.Excluded { + continue + } + p := vfs.AbsPath(file.Path()) - tf, err := openTorrentFile(ctx, path.Base(p), file, &fs.lastTorrentReadTimeout) + tf, err := openTorrentFile(ctx, path.Base(p), file.file, &fs.lastTorrentReadTimeout) if err != nil { return nil, err } diff --git a/src/sources/torrent/piece_completion.go b/src/sources/torrent/piece_completion.go index 01a3d4d..3e82edc 100644 --- a/src/sources/torrent/piece_completion.go +++ b/src/sources/torrent/piece_completion.go @@ -1,14 +1,15 @@ package torrent import ( + "context" "encoding/binary" "fmt" - "log/slog" dlog "git.kmsign.ru/royalcat/tstor/src/log" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" - "github.com/dgraph-io/badger/v4" + "github.com/royalcat/kv" + "github.com/royalcat/kv/kvbadger" ) type PieceCompletionState byte @@ -18,121 +19,119 @@ const ( PieceComplete PieceCompletionState = 1<<8 - 1 ) +var _ kv.Binary = (*PieceCompletionState)(nil) + +// MarshalBinary implements kv.Binary. +func (p PieceCompletionState) MarshalBinary() (data []byte, err error) { + return []byte{byte(p)}, nil +} + +// UnmarshalBinary implements kv.Binary. +func (p *PieceCompletionState) UnmarshalBinary(data []byte) error { + if len(data) != 1 { + return fmt.Errorf("bad length") + } + + switch PieceCompletionState(data[0]) { + case PieceComplete: + *p = PieceComplete + case PieceNotComplete: + *p = PieceNotComplete + default: + *p = PieceNotComplete + } + + return nil +} + func pieceCompletionState(i bool) PieceCompletionState { if i { return PieceComplete - } else { - return PieceNotComplete } + return PieceNotComplete +} + +type pieceKey metainfo.PieceKey + +const pieceKeySize = metainfo.HashSize + 4 + +var _ kv.Binary = (*pieceKey)(nil) + +// const delimeter rune = 0x1F + +// MarshalBinary implements kv.Binary. +func (pk pieceKey) MarshalBinary() (data []byte, err error) { + key := make([]byte, 0, pieceKeySize) + key = append(key, pk.InfoHash.Bytes()...) + key = binary.BigEndian.AppendUint32(key, uint32(pk.Index)) + return key, nil +} + +// UnmarshalBinary implements kv.Binary. +func (p *pieceKey) UnmarshalBinary(data []byte) error { + if len(data) < pieceKeySize { + return fmt.Errorf("data too short") + } + p.InfoHash = metainfo.Hash(data[:metainfo.HashSize]) + p.Index = int(binary.BigEndian.Uint32(data[metainfo.HashSize:])) + return nil } type badgerPieceCompletion struct { - db *badger.DB + db kv.Store[pieceKey, PieceCompletionState] } var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil) func newPieceCompletion(dir string) (storage.PieceCompletion, error) { - opts := badger. - DefaultOptions(dir). - WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion")) - db, err := badger.Open(opts) + opts := kvbadger.DefaultOptions[PieceCompletionState](dir) + opts.Codec = kv.CodecBinary[PieceCompletionState, *PieceCompletionState]{} + opts.BadgerOptions = opts.BadgerOptions.WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion")) + + db, err := kvbadger.NewBagerKVBinaryKey[pieceKey, PieceCompletionState](opts) if err != nil { return nil, err } - return &badgerPieceCompletion{db}, nil + + return &badgerPieceCompletion{ + db: db, + }, nil } -const delimeter rune = 0x1F +func (c *badgerPieceCompletion) Get(pk metainfo.PieceKey) (completion storage.Completion, err error) { + ctx := context.Background() -func pkToBytes(pk metainfo.PieceKey) []byte { - key := make([]byte, 0, len(pk.InfoHash.Bytes())+1+4) - key = append(key, pk.InfoHash.Bytes()...) - key = append(key, byte(delimeter)) - key = binary.BigEndian.AppendUint32(key, uint32(pk.Index)) - return key -} - -func (k *badgerPieceCompletion) Get(pk metainfo.PieceKey) (storage.Completion, error) { - completion := storage.Completion{ - Complete: false, - Ok: false, + state, err := c.db.Get(ctx, pieceKey(pk)) + if err != nil { + if err == kv.ErrKeyNotFound { + return completion, nil + } + return completion, err } - err := k.db.View(func(tx *badger.Txn) error { - item, err := tx.Get(pkToBytes(pk)) - if err != nil { - if err == badger.ErrKeyNotFound { - completion.Complete = false - completion.Ok = false - return nil - } - return fmt.Errorf("getting value: %w", err) - } + if state == PieceComplete { + return storage.Completion{ + Complete: true, + Ok: true, + }, nil + } - valCopy, err := item.ValueCopy(nil) - if err != nil { - return fmt.Errorf("copying value: %w", err) - } - compl := PieceCompletionState(valCopy[0]) - - switch compl { - case PieceComplete: - completion.Ok = true - completion.Complete = true - case PieceNotComplete: - completion.Ok = true - completion.Complete = false - } - - return nil - }) - return completion, err + return storage.Completion{ + Complete: false, + Ok: true, + }, nil } func (me badgerPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { + ctx := context.Background() + if c, err := me.Get(pk); err == nil && c.Ok && c.Complete == b { return nil } - return me.db.Update(func(txn *badger.Txn) error { - return txn.Set(pkToBytes(pk), []byte{byte(pieceCompletionState(b))}) - }) -} - -func (k *badgerPieceCompletion) Delete(key string) error { - return k.db.Update( - func(txn *badger.Txn) error { - return txn.Delete([]byte(key)) - }) + return me.db.Set(ctx, pieceKey(pk), pieceCompletionState(b)) } func (me *badgerPieceCompletion) Close() error { - return me.db.Close() + return me.db.Close(context.Background()) } - -type badgerSlog struct { - slog *slog.Logger -} - -// Debugf implements badger.Logger. -func (log badgerSlog) Debugf(f string, a ...interface{}) { - log.slog.Debug(f, a...) -} - -// Errorf implements badger.Logger. -func (log badgerSlog) Errorf(f string, a ...interface{}) { - log.slog.Error(f, a...) -} - -// Infof implements badger.Logger. -func (log badgerSlog) Infof(f string, a ...interface{}) { - log.slog.Info(f, a...) -} - -// Warningf implements badger.Logger. -func (log badgerSlog) Warningf(f string, a ...interface{}) { - log.slog.Warn(f, a...) -} - -var _ badger.Logger = (*badgerSlog)(nil) diff --git a/src/telemetry/setup.go b/src/telemetry/setup.go index e09091e..796f74d 100644 --- a/src/telemetry/setup.go +++ b/src/telemetry/setup.go @@ -147,9 +147,7 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) { // replace this with the address of pyroscope server ServerAddress: "https://pyroscope.kmsign.ru", // you can disable logging by setting this to nil - Logger: &pyroscopeLogger{ - log: client.log.WithComponent("pyroscope"), - }, + Logger: newPyroscopeLogger(client.log), ProfileTypes: []pyroscope.ProfileType{ // these profile types are enabled by default: pyroscope.ProfileCPU, @@ -172,6 +170,12 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) { return client, nil } +func newPyroscopeLogger(log *rlog.Logger) *pyroscopeLogger { + return &pyroscopeLogger{ + log: log.WithComponent("pyroscope").Nested(1), + } +} + type pyroscopeLogger struct { log *rlog.Logger } diff --git a/src/tkv/new.go b/src/tkv/new.go index ded1cef..13fe01a 100644 --- a/src/tkv/new.go +++ b/src/tkv/new.go @@ -3,22 +3,20 @@ package tkv import ( "path" - "git.kmsign.ru/royalcat/tstor/pkg/kvtrace" tlog "git.kmsign.ru/royalcat/tstor/src/log" "github.com/royalcat/kv" "github.com/royalcat/kv/kvbadger" - "go.opentelemetry.io/otel/attribute" ) func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) { - opts := kvbadger.DefaultOptions(path.Join(dbdir, name)) + opts := kvbadger.DefaultOptions[V](path.Join(dbdir, name)) opts.BadgerOptions.Logger = tlog.BadgerLogger(name, "badger") store, err = kvbadger.NewBadgerKVBytesKey[K, V](opts) if err != nil { return nil, err } - store = kvtrace.WrapTracing(store, attribute.String("collection", name), attribute.String("database", "badger")) + // store = kvtrace.WrapTracing(store, attribute.String("collection", name), attribute.String("database", "badger")) return store, err } diff --git a/ui/lib/api/schema.graphql b/ui/lib/api/schema.graphql index 7ba6433..4c384cf 100644 --- a/ui/lib/api/schema.graphql +++ b/ui/lib/api/schema.graphql @@ -94,7 +94,7 @@ type Torrent { bytesCompleted: Int! torrentFilePath: String! bytesMissing: Int! - priority: TorrentPriority! @resolver + priority: TorrentPriority! files: [TorrentFile!]! @resolver excludedFiles: [TorrentFile!]! @resolver peers: [TorrentPeer!]! @resolver @@ -116,6 +116,7 @@ type TorrentFile { filename: String! size: Int! bytesCompleted: Int! + priority: TorrentPriority! @resolver } type TorrentFileEntry implements File & FsEntry { name: String! @@ -138,7 +139,6 @@ enum TorrentPriority { NORMAL HIGH READAHEAD - NEXT NOW } input TorrentPriorityFilter @oneOf {