file controller
All checks were successful
docker / build-docker (linux/arm64) (push) Successful in 2m10s
docker / build-docker (linux/amd64) (push) Successful in 2m12s

This commit is contained in:
royalcat 2024-07-09 00:19:04 +03:00
parent 199a82ff0c
commit 0371af3344
21 changed files with 440 additions and 280 deletions

View file

@ -28,7 +28,7 @@ models:
TorrentFile:
extraFields:
F:
type: "*github.com/anacrolix/torrent.File"
type: "*git.kmsign.ru/royalcat/tstor/src/sources/torrent.FileController"
TorrentPeer:
extraFields:
F:

4
go.mod
View file

@ -36,8 +36,8 @@ require (
github.com/ravilushqa/otelgqlgen v0.15.0
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f
github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85
github.com/royalcat/kv/kvbadger v0.0.0-20240707205211-fedd4883af85
github.com/rs/zerolog v1.32.0
github.com/samber/slog-multi v1.0.2
github.com/samber/slog-zerolog v1.0.0

8
go.sum
View file

@ -545,10 +545,10 @@ github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be h1:Ui+Imq1Vk26rfpkL
github.com/royalcat/ctxio v0.0.0-20240602060200-590d464c39be/go.mod h1:NFNp3OsEMUPYj5LZUFDiyDt+2E6gR/g8JLd0k+y8XWI=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff h1:KlZaOEZYhCzyNYIp0LcE7MNR2Ar0PJS3eJU6A5mMTpk=
github.com/royalcat/ctxprogress v0.0.0-20240614113930-3cc5bb935bff/go.mod h1:RcUpbosy/m3bJ3JsVO18MXEbrKRHOHkmYBXigDGekaA=
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f h1:bG8Pp/YXkpC2eFI7psTiTAL7QTBqHQcP/lEpiqmBXn4=
github.com/royalcat/kv v0.0.0-20240617101007-c9c746b3916f/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f h1:wz3pvg7YJdibZXQRV6B5pVPeDK8bgnuJVnBf7OFtCWI=
github.com/royalcat/kv/kvbadger v0.0.0-20240617101007-c9c746b3916f/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85 h1:AAuCp03M23u4qrK3dT1afFgf+diEijvSFnHb93Lv3PY=
github.com/royalcat/kv v0.0.0-20240707205211-fedd4883af85/go.mod h1:UB/VwpTut8c3IXLJFvYWFxAAZymk9eBuJRMJmpSpwYU=
github.com/royalcat/kv/kvbadger v0.0.0-20240707205211-fedd4883af85 h1:OXRYz+eDPAlQjE1UCSIoBzVHjQ3Ayx7fGSM/Zlo3bhI=
github.com/royalcat/kv/kvbadger v0.0.0-20240707205211-fedd4883af85/go.mod h1:JxgA1VGwbqu+WqdmjmjT0v6KeWoWlN6Y5lesjmphExM=
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8=
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=

View file

@ -4,7 +4,7 @@ type Torrent {
bytesCompleted: Int!
torrentFilePath: String!
bytesMissing: Int!
priority: TorrentPriority! @resolver
priority: TorrentPriority!
files: [TorrentFile!]! @resolver
excludedFiles: [TorrentFile!]! @resolver
peers: [TorrentPeer!]! @resolver
@ -14,6 +14,7 @@ type TorrentFile {
filename: String!
size: Int!
bytesCompleted: Int!
priority: TorrentPriority! @resolver
}
type TorrentPeer {

View file

@ -22,3 +22,7 @@ func (s *Value[K, V]) Get(ctx context.Context) (V, error) {
func (s *Value[K, V]) Set(ctx context.Context, value V) error {
return s.db.Set(ctx, s.Key, value)
}
func (s *Value[K, V]) Edit(ctx context.Context, edit kv.Edit[V]) error {
return s.db.Edit(ctx, s.Key, edit)
}

View file

@ -51,6 +51,7 @@ type ResolverRoot interface {
TorrentDaemonMutation() TorrentDaemonMutationResolver
TorrentDaemonQuery() TorrentDaemonQueryResolver
TorrentFS() TorrentFSResolver
TorrentFile() TorrentFileResolver
}
type DirectiveRoot struct {
@ -146,6 +147,7 @@ type ComplexityRoot struct {
TorrentFile struct {
BytesCompleted func(childComplexity int) int
Filename func(childComplexity int) int
Priority func(childComplexity int) int
Size func(childComplexity int) int
}
@ -195,7 +197,6 @@ type SubscriptionResolver interface {
type TorrentResolver interface {
Name(ctx context.Context, obj *model.Torrent) (string, error)
Priority(ctx context.Context, obj *model.Torrent) (types.PiecePriority, error)
Files(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error)
ExcludedFiles(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error)
Peers(ctx context.Context, obj *model.Torrent) ([]*model.TorrentPeer, error)
@ -211,6 +212,9 @@ type TorrentDaemonQueryResolver interface {
type TorrentFSResolver interface {
Entries(ctx context.Context, obj *model.TorrentFs) ([]model.FsEntry, error)
}
type TorrentFileResolver interface {
Priority(ctx context.Context, obj *model.TorrentFile) (types.PiecePriority, error)
}
type executableSchema struct {
schema *ast.Schema
@ -546,6 +550,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.TorrentFile.Filename(childComplexity), true
case "TorrentFile.priority":
if e.complexity.TorrentFile.Priority == nil {
break
}
return e.complexity.TorrentFile.Priority(childComplexity), true
case "TorrentFile.size":
if e.complexity.TorrentFile.Size == nil {
break
@ -859,7 +870,7 @@ input TorrentFilter @oneOf {
bytesCompleted: Int!
torrentFilePath: String!
bytesMissing: Int!
priority: TorrentPriority! @resolver
priority: TorrentPriority!
files: [TorrentFile!]! @resolver
excludedFiles: [TorrentFile!]! @resolver
peers: [TorrentPeer!]! @resolver
@ -869,6 +880,7 @@ type TorrentFile {
filename: String!
size: Int!
bytesCompleted: Int!
priority: TorrentPriority! @resolver
}
type TorrentPeer {
@ -884,7 +896,6 @@ enum TorrentPriority {
NORMAL
HIGH
READAHEAD
NEXT
NOW
}
`, BuiltIn: false},
@ -2676,28 +2687,8 @@ func (ec *executionContext) _Torrent_priority(ctx context.Context, field graphql
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
directive0 := func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Torrent().Priority(rctx, obj)
}
directive1 := func(ctx context.Context) (interface{}, error) {
if ec.directives.Resolver == nil {
return nil, errors.New("directive resolver is not implemented")
}
return ec.directives.Resolver(ctx, obj, directive0)
}
tmp, err := directive1(rctx)
if err != nil {
return nil, graphql.ErrorOnPath(ctx, err)
}
if tmp == nil {
return nil, nil
}
if data, ok := tmp.(types.PiecePriority); ok {
return data, nil
}
return nil, fmt.Errorf(`unexpected type %T from directive, should be github.com/anacrolix/torrent/types.PiecePriority`, tmp)
return obj.Priority, nil
})
if err != nil {
ec.Error(ctx, err)
@ -2718,8 +2709,8 @@ func (ec *executionContext) fieldContext_Torrent_priority(_ context.Context, fie
fc = &graphql.FieldContext{
Object: "Torrent",
Field: field,
IsMethod: true,
IsResolver: true,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type TorrentPriority does not have child fields")
},
@ -2792,6 +2783,8 @@ func (ec *executionContext) fieldContext_Torrent_files(_ context.Context, field
return ec.fieldContext_TorrentFile_size(ctx, field)
case "bytesCompleted":
return ec.fieldContext_TorrentFile_bytesCompleted(ctx, field)
case "priority":
return ec.fieldContext_TorrentFile_priority(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type TorrentFile", field.Name)
},
@ -2864,6 +2857,8 @@ func (ec *executionContext) fieldContext_Torrent_excludedFiles(_ context.Context
return ec.fieldContext_TorrentFile_size(ctx, field)
case "bytesCompleted":
return ec.fieldContext_TorrentFile_bytesCompleted(ctx, field)
case "priority":
return ec.fieldContext_TorrentFile_priority(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type TorrentFile", field.Name)
},
@ -3577,6 +3572,70 @@ func (ec *executionContext) fieldContext_TorrentFile_bytesCompleted(_ context.Co
return fc, nil
}
func (ec *executionContext) _TorrentFile_priority(ctx context.Context, field graphql.CollectedField, obj *model.TorrentFile) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_TorrentFile_priority(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
directive0 := func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.TorrentFile().Priority(rctx, obj)
}
directive1 := func(ctx context.Context) (interface{}, error) {
if ec.directives.Resolver == nil {
return nil, errors.New("directive resolver is not implemented")
}
return ec.directives.Resolver(ctx, obj, directive0)
}
tmp, err := directive1(rctx)
if err != nil {
return nil, graphql.ErrorOnPath(ctx, err)
}
if tmp == nil {
return nil, nil
}
if data, ok := tmp.(types.PiecePriority); ok {
return data, nil
}
return nil, fmt.Errorf(`unexpected type %T from directive, should be github.com/anacrolix/torrent/types.PiecePriority`, tmp)
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(types.PiecePriority)
fc.Result = res
return ec.marshalNTorrentPriority2githubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriority(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_TorrentFile_priority(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "TorrentFile",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type TorrentPriority does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _TorrentFileEntry_name(ctx context.Context, field graphql.CollectedField, obj *model.TorrentFileEntry) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_TorrentFileEntry_name(ctx, field)
if err != nil {
@ -7422,41 +7481,10 @@ func (ec *executionContext) _Torrent(ctx context.Context, sel ast.SelectionSet,
atomic.AddUint32(&out.Invalids, 1)
}
case "priority":
field := field
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
out.Values[i] = ec._Torrent_priority(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
}()
res = ec._Torrent_priority(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&fs.Invalids, 1)
}
return res
}
if field.Deferrable != nil {
dfs, ok := deferred[field.Deferrable.Label]
di := 0
if ok {
dfs.AddField(field)
di = len(dfs.Values) - 1
} else {
dfs = graphql.NewFieldSet([]graphql.CollectedField{field})
deferred[field.Deferrable.Label] = dfs
}
dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler {
return innerFunc(ctx, dfs)
})
// don't run the out.Concurrently() call below
out.Values[i] = graphql.Null
continue
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
case "files":
field := field
@ -7894,18 +7922,54 @@ func (ec *executionContext) _TorrentFile(ctx context.Context, sel ast.SelectionS
case "filename":
out.Values[i] = ec._TorrentFile_filename(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
atomic.AddUint32(&out.Invalids, 1)
}
case "size":
out.Values[i] = ec._TorrentFile_size(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
atomic.AddUint32(&out.Invalids, 1)
}
case "bytesCompleted":
out.Values[i] = ec._TorrentFile_bytesCompleted(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
atomic.AddUint32(&out.Invalids, 1)
}
case "priority":
field := field
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._TorrentFile_priority(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&fs.Invalids, 1)
}
return res
}
if field.Deferrable != nil {
dfs, ok := deferred[field.Deferrable.Label]
di := 0
if ok {
dfs.AddField(field)
di = len(dfs.Values) - 1
} else {
dfs = graphql.NewFieldSet([]graphql.CollectedField{field})
deferred[field.Deferrable.Label] = dfs
}
dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler {
return innerFunc(ctx, dfs)
})
// don't run the out.Concurrently() call below
out.Values[i] = graphql.Null
continue
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
default:
panic("unknown field " + strconv.Quote(field.Name))
}
@ -8772,7 +8836,6 @@ var (
"NORMAL": types.PiecePriorityNormal,
"HIGH": types.PiecePriorityHigh,
"READAHEAD": types.PiecePriorityReadahead,
"NEXT": types.PiecePriorityNext,
"NOW": types.PiecePriorityNow,
}
marshalNTorrentPriority2githubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriority = map[types.PiecePriority]string{
@ -8780,7 +8843,6 @@ var (
types.PiecePriorityNormal: "NORMAL",
types.PiecePriorityHigh: "HIGH",
types.PiecePriorityReadahead: "READAHEAD",
types.PiecePriorityNext: "NEXT",
types.PiecePriorityNow: "NOW",
}
)
@ -9341,7 +9403,6 @@ var (
"NORMAL": types.PiecePriorityNormal,
"HIGH": types.PiecePriorityHigh,
"READAHEAD": types.PiecePriorityReadahead,
"NEXT": types.PiecePriorityNext,
"NOW": types.PiecePriorityNow,
}
marshalOTorrentPriority2ᚕgithubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriorityᚄ = map[types.PiecePriority]string{
@ -9349,7 +9410,6 @@ var (
types.PiecePriorityNormal: "NORMAL",
types.PiecePriorityHigh: "HIGH",
types.PiecePriorityReadahead: "READAHEAD",
types.PiecePriorityNext: "NEXT",
types.PiecePriorityNow: "NOW",
}
)
@ -9377,7 +9437,6 @@ var (
"NORMAL": types.PiecePriorityNormal,
"HIGH": types.PiecePriorityHigh,
"READAHEAD": types.PiecePriorityReadahead,
"NEXT": types.PiecePriorityNext,
"NOW": types.PiecePriorityNow,
}
marshalOTorrentPriority2ᚖgithubᚗcomᚋanacrolixᚋtorrentᚋtypesᚐPiecePriority = map[types.PiecePriority]string{
@ -9385,7 +9444,6 @@ var (
types.PiecePriorityNormal: "NORMAL",
types.PiecePriorityHigh: "HIGH",
types.PiecePriorityReadahead: "READAHEAD",
types.PiecePriorityNext: "NEXT",
types.PiecePriorityNow: "NOW",
}
)

View file

@ -29,11 +29,17 @@ func MapPeerSource(source atorrent.PeerSource) string {
}
func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) {
prio, err := t.Priority(ctx)
if err != nil {
return nil, err
}
return &Torrent{
Infohash: t.InfoHash(),
Name: t.Name(),
BytesCompleted: t.BytesCompleted(),
BytesMissing: t.BytesMissing(),
Priority: prio,
T: t,
}, nil
}

View file

@ -220,7 +220,8 @@ type TorrentFile struct {
Filename string `json:"filename"`
Size int64 `json:"size"`
BytesCompleted int64 `json:"bytesCompleted"`
F *torrent1.File `json:"-"`
Priority types.PiecePriority `json:"priority"`
F *torrent.FileController `json:"-"`
}
type TorrentFileEntry struct {

View file

@ -53,7 +53,19 @@ func (r *torrentDaemonMutationResolver) SetTorrentPriority(ctx context.Context,
return false, nil
}
err = t.SetPriority(ctx, file, priority)
if file == nil {
err = t.SetPriority(ctx, priority)
if err != nil {
return false, err
}
return true, nil
}
f, err := t.GetFile(ctx, *file)
if err != nil {
return false, err
}
err = f.SetPriority(ctx, priority)
if err != nil {
return false, err
}

View file

@ -17,11 +17,6 @@ func (r *torrentResolver) Name(ctx context.Context, obj *model.Torrent) (string,
return obj.T.Name(), nil
}
// Priority is the resolver for the priority field.
func (r *torrentResolver) Priority(ctx context.Context, obj *model.Torrent) (types.PiecePriority, error) {
return obj.T.Priority(ctx, nil)
}
// Files is the resolver for the files field.
func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) {
out := []*model.TorrentFile{}
@ -31,8 +26,8 @@ func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*mod
}
for _, f := range files {
out = append(out, &model.TorrentFile{
Filename: f.DisplayPath(),
Size: f.Length(),
Filename: f.Path(),
Size: f.Size(),
BytesCompleted: f.BytesCompleted(),
F: f,
})
@ -76,7 +71,20 @@ func (r *torrentResolver) Peers(ctx context.Context, obj *model.Torrent) ([]*mod
return peers, nil
}
// Priority is the resolver for the priority field.
func (r *torrentFileResolver) Priority(ctx context.Context, obj *model.TorrentFile) (types.PiecePriority, error) {
props, err := obj.F.Properties(ctx)
if err != nil {
return 0, err
}
return props.Priority, nil
}
// Torrent returns graph.TorrentResolver implementation.
func (r *Resolver) Torrent() graph.TorrentResolver { return &torrentResolver{r} }
// TorrentFile returns graph.TorrentFileResolver implementation.
func (r *Resolver) TorrentFile() graph.TorrentFileResolver { return &torrentFileResolver{r} }
type torrentResolver struct{ *Resolver }
type torrentFileResolver struct{ *Resolver }

View file

@ -2,10 +2,8 @@ package delivery
import (
"context"
"log/slog"
"net/http"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver"
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
@ -36,14 +34,13 @@ func GraphQLHandler(service *torrent.Daemon, vfs vfs.Filesystem) http.Handler {
),
)
log := rlog.Component("graphql")
graphqlHandler.AroundResponses(func(ctx context.Context, next graphql.ResponseHandler) *graphql.Response {
resp := next(ctx)
responseJson, _ := resp.Data.MarshalJSON()
log.Info(ctx, "response", slog.String("body", string(responseJson)))
return resp
})
// log := rlog.Component("graphql")
// graphqlHandler.AroundResponses(func(ctx context.Context, next graphql.ResponseHandler) *graphql.Response {
// resp := next(ctx)
// responseJson, _ := resp.Data.MarshalJSON()
// log.Info(ctx, "response", slog.String("body", string(responseJson)))
// return resp
// })
graphqlHandler.AddTransport(&transport.POST{})
graphqlHandler.AddTransport(&transport.Websocket{})

View file

@ -49,7 +49,8 @@ var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvh
// NewKvHandler provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem, config config.NFS) (nfs.Handler, error) {
opts := kvbadger.DefaultOptions(path.Join(config.CachePath, "handlers"))
opts := kvbadger.DefaultOptions[handle](path.Join(config.CachePath, "handlers"))
opts.Codec = kv.CodecBinary[handle, *handle]{}
opts.BadgerOptions.Logger = log.BadgerLogger("nfs", "kvhandler")
activeHandles, err := kvbadger.NewBagerKVBinaryKey[uuid.UUID, handle](opts)

View file

@ -11,7 +11,7 @@ import (
func BadgerLogger(name ...string) badger.Logger {
return &badgerLogger{
L: rlog.Component(append([]string{"badger"}, name...)...).Nested(2),
L: rlog.Component(append(name, "badger")...).Nested(2),
}
}
@ -41,6 +41,8 @@ func (l *badgerLogger) Infof(m string, f ...interface{}) {
}
func (l *badgerLogger) Debugf(m string, f ...interface{}) {
ctx := context.Background()
l.L.Debug(ctx, fmtBadgerLog(m, f...))
return
// too much logging
// ctx := context.Background()
// l.L.Debug(ctx, fmtBadgerLog(m, f...))
}

View file

@ -3,9 +3,9 @@ package torrent
import (
"context"
"log/slog"
"slices"
"strings"
"git.kmsign.ru/royalcat/tstor/pkg/kvsingle"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types"
@ -75,7 +75,10 @@ func (s *Controller) Length() int64 {
return s.t.Length()
}
func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
func (s *Controller) Files(ctx context.Context) ([]*FileController, error) {
ctx, span := tracer.Start(ctx, "Files")
defer span.End()
fps := map[string]FileProperties{}
err := s.fileProperties.Range(ctx, func(k string, v FileProperties) error {
fps[k] = v
@ -91,27 +94,21 @@ func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
case <-s.t.GotInfo():
}
files := s.t.Files()
files = slices.DeleteFunc(files, func(file *torrent.File) bool {
if file == nil {
return true
files := make([]*FileController, 0)
for _, v := range s.t.Files() {
if strings.Contains(v.Path(), "/.pad/") {
continue
}
p := file.Path()
if strings.Contains(p, "/.pad/") {
return true
props := kvsingle.New(s.fileProperties, v.Path())
ctl := NewFileController(v, props)
files = append(files, ctl)
}
if props, ok := fps[p]; ok && props.Excluded {
return true
}
return false
})
return files, nil
}
func (s *Controller) GetFile(ctx context.Context, file string) (*torrent.File, error) {
func (s *Controller) GetFile(ctx context.Context, file string) (*FileController, error) {
files, err := s.Files(ctx)
if err != nil {
return nil, err
@ -181,21 +178,8 @@ func (s *Controller) ValidateTorrent(ctx context.Context) error {
return nil
}
func (c *Controller) SetPriority(ctx context.Context, filePath *string, priority types.PiecePriority) error {
log := c.log.With(slog.Int("priority", int(priority)))
if filePath != nil {
file, err := c.GetFile(ctx, *filePath)
if err != nil {
return err
}
if file == nil {
log.Error(ctx, "file not found")
return nil
}
return c.setFilePriority(ctx, file, priority)
}
func (c *Controller) SetPriority(ctx context.Context, priority types.PiecePriority) error {
// log := c.log.With(slog.Int("priority", int(priority)))
for _, f := range c.t.Files() {
err := c.setFilePriority(ctx, f, priority)
@ -209,49 +193,39 @@ func (c *Controller) SetPriority(ctx context.Context, filePath *string, priority
const defaultPriority = types.PiecePriorityNone
func (c *Controller) Priority(ctx context.Context, filePath *string) (types.PiecePriority, error) {
if filePath == nil {
func (c *Controller) Priority(ctx context.Context) (types.PiecePriority, error) {
prio := defaultPriority
err := c.fileProperties.Range(ctx, func(filePath string, v FileProperties) error {
if filePath == "" {
return nil
}
if v.Priority > prio {
prio = v.Priority
}
return nil
})
if err == kv.ErrKeyNotFound {
err = nil
}
return prio, err
}
props, err := c.fileProperties.Get(ctx, *filePath)
files, err := c.Files(ctx)
if err != nil {
if err == kv.ErrKeyNotFound {
return defaultPriority, nil
}
return 0, err
}
for _, v := range files {
props, err := v.Properties(ctx)
if err != nil {
return 0, err
}
if props.Priority > prio {
prio = props.Priority
}
}
return props.Priority, nil
return prio, nil
}
func (c *Controller) setFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error {
err := c.fileProperties.Edit(ctx, file.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
v.Priority = priority
return v, nil
})
if err != nil {
if err == kv.ErrKeyNotFound {
err := c.fileProperties.Set(ctx, file.Path(), FileProperties{Priority: priority})
if err != nil {
return err
seterr := c.fileProperties.Set(ctx, file.Path(), FileProperties{Priority: priority})
if seterr != nil {
return seterr
}
err = nil
}
if err != nil {
return err
}
file.SetPriority(priority)
@ -259,7 +233,9 @@ func (c *Controller) setFilePriority(ctx context.Context, file *torrent.File, pr
}
func (c *Controller) initializeTorrentPriories(ctx context.Context) error {
log := c.log.WithComponent("initializeTorrentPriories")
ctx, span := tracer.Start(ctx, "initializeTorrentPriories")
defer span.End()
log := c.log
files, err := c.Files(ctx)
if err != nil {
@ -267,19 +243,14 @@ func (c *Controller) initializeTorrentPriories(ctx context.Context) error {
}
for _, file := range files {
if file == nil {
continue
}
props, err := c.fileProperties.Get(ctx, file.Path())
props, err := file.Properties(ctx)
if err != nil {
if err == kv.ErrKeyNotFound {
log.Error(ctx, "failed to get file properties", rlog.Error(err))
continue
}
log.Error(ctx, "failed to get file priority", rlog.Error(err))
}
log = log.With(slog.Int("priority", int(props.Priority)))
file.SetPriority(props.Priority)
file.file.SetPriority(props.Priority)
}
log.Info(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name()))

View file

@ -32,7 +32,9 @@ import (
"github.com/royalcat/kv"
)
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent")
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/sources/torrent",
trace.WithInstrumentationAttributes(attribute.String("component", "torrent-daemon")),
)
type DirAquire struct {
Name string
@ -131,7 +133,7 @@ func (s *Daemon) Close(ctx context.Context) error {
}
func (s *Daemon) loadTorrent(ctx context.Context, f vfs.File) (*Controller, error) {
ctx, span := tracer.Start(ctx, "LoadTorrent")
ctx, span := tracer.Start(ctx, "loadTorrent")
defer span.End()
log := s.log
@ -247,7 +249,7 @@ func (s *Daemon) loadTorrentFiles(ctx context.Context) error {
defer span.End()
log := s.log
loaderPaths := make(chan string)
loaderPaths := make(chan string, loadWorkers*5)
wg := sync.WaitGroup{}
defer func() {

View file

@ -0,0 +1,88 @@
package torrent
import (
"context"
"git.kmsign.ru/royalcat/tstor/pkg/kvsingle"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types"
"github.com/royalcat/kv"
)
type FileController struct {
file *torrent.File
properties *kvsingle.Value[string, FileProperties]
}
func NewFileController(f *torrent.File, properties *kvsingle.Value[string, FileProperties]) *FileController {
return &FileController{
file: f,
properties: properties,
}
}
func (s *FileController) Properties(ctx context.Context) (FileProperties, error) {
p, err := s.properties.Get(ctx)
if err == kv.ErrKeyNotFound {
return FileProperties{
Excluded: false,
Priority: defaultPriority,
}, nil
}
if err != nil {
return FileProperties{}, err
}
return p, nil
}
func (s *FileController) SetPriority(ctx context.Context, priority types.PiecePriority) error {
err := s.properties.Edit(ctx, func(ctx context.Context, v FileProperties) (FileProperties, error) {
v.Priority = priority
return v, nil
})
if err == kv.ErrKeyNotFound {
seterr := s.properties.Set(ctx, FileProperties{
Priority: priority,
})
if seterr != nil {
return err
}
err = nil
}
if err != nil {
return err
}
s.file.SetPriority(priority)
return nil
}
func (s *FileController) FileInfo() metainfo.FileInfo {
return s.file.FileInfo()
}
func (s *FileController) Excluded(ctx context.Context) (bool, error) {
p, err := s.properties.Get(ctx)
if err == kv.ErrKeyNotFound {
return false, nil
}
if err != nil {
return false, err
}
return p.Excluded, nil
}
func (s *FileController) Path() string {
return s.file.Path()
}
func (s *FileController) Size() int64 {
return s.file.Length()
}
func (s *FileController) BytesCompleted() int64 {
return s.file.BytesCompleted()
}

View file

@ -121,8 +121,16 @@ func (fs *TorrentFS) files(ctx context.Context) (map[string]vfs.File, error) {
fs.filesCache = make(map[string]vfs.File)
for _, file := range files {
props, err := file.Properties(ctx)
if err != nil {
return nil, err
}
if props.Excluded {
continue
}
p := vfs.AbsPath(file.Path())
tf, err := openTorrentFile(ctx, path.Base(p), file, &fs.lastTorrentReadTimeout)
tf, err := openTorrentFile(ctx, path.Base(p), file.file, &fs.lastTorrentReadTimeout)
if err != nil {
return nil, err
}

View file

@ -1,14 +1,15 @@
package torrent
import (
"context"
"encoding/binary"
"fmt"
"log/slog"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/dgraph-io/badger/v4"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
)
type PieceCompletionState byte
@ -18,121 +19,119 @@ const (
PieceComplete PieceCompletionState = 1<<8 - 1
)
var _ kv.Binary = (*PieceCompletionState)(nil)
// MarshalBinary implements kv.Binary.
func (p PieceCompletionState) MarshalBinary() (data []byte, err error) {
return []byte{byte(p)}, nil
}
// UnmarshalBinary implements kv.Binary.
func (p *PieceCompletionState) UnmarshalBinary(data []byte) error {
if len(data) != 1 {
return fmt.Errorf("bad length")
}
switch PieceCompletionState(data[0]) {
case PieceComplete:
*p = PieceComplete
case PieceNotComplete:
*p = PieceNotComplete
default:
*p = PieceNotComplete
}
return nil
}
func pieceCompletionState(i bool) PieceCompletionState {
if i {
return PieceComplete
} else {
return PieceNotComplete
}
return PieceNotComplete
}
type pieceKey metainfo.PieceKey
const pieceKeySize = metainfo.HashSize + 4
var _ kv.Binary = (*pieceKey)(nil)
// const delimeter rune = 0x1F
// MarshalBinary implements kv.Binary.
func (pk pieceKey) MarshalBinary() (data []byte, err error) {
key := make([]byte, 0, pieceKeySize)
key = append(key, pk.InfoHash.Bytes()...)
key = binary.BigEndian.AppendUint32(key, uint32(pk.Index))
return key, nil
}
// UnmarshalBinary implements kv.Binary.
func (p *pieceKey) UnmarshalBinary(data []byte) error {
if len(data) < pieceKeySize {
return fmt.Errorf("data too short")
}
p.InfoHash = metainfo.Hash(data[:metainfo.HashSize])
p.Index = int(binary.BigEndian.Uint32(data[metainfo.HashSize:]))
return nil
}
type badgerPieceCompletion struct {
db *badger.DB
db kv.Store[pieceKey, PieceCompletionState]
}
var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil)
func newPieceCompletion(dir string) (storage.PieceCompletion, error) {
opts := badger.
DefaultOptions(dir).
WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion"))
db, err := badger.Open(opts)
opts := kvbadger.DefaultOptions[PieceCompletionState](dir)
opts.Codec = kv.CodecBinary[PieceCompletionState, *PieceCompletionState]{}
opts.BadgerOptions = opts.BadgerOptions.WithLogger(dlog.BadgerLogger("torrent-client", "piece-completion"))
db, err := kvbadger.NewBagerKVBinaryKey[pieceKey, PieceCompletionState](opts)
if err != nil {
return nil, err
}
return &badgerPieceCompletion{db}, nil
return &badgerPieceCompletion{
db: db,
}, nil
}
const delimeter rune = 0x1F
func (c *badgerPieceCompletion) Get(pk metainfo.PieceKey) (completion storage.Completion, err error) {
ctx := context.Background()
func pkToBytes(pk metainfo.PieceKey) []byte {
key := make([]byte, 0, len(pk.InfoHash.Bytes())+1+4)
key = append(key, pk.InfoHash.Bytes()...)
key = append(key, byte(delimeter))
key = binary.BigEndian.AppendUint32(key, uint32(pk.Index))
return key
}
func (k *badgerPieceCompletion) Get(pk metainfo.PieceKey) (storage.Completion, error) {
completion := storage.Completion{
Complete: false,
Ok: false,
}
err := k.db.View(func(tx *badger.Txn) error {
item, err := tx.Get(pkToBytes(pk))
state, err := c.db.Get(ctx, pieceKey(pk))
if err != nil {
if err == badger.ErrKeyNotFound {
completion.Complete = false
completion.Ok = false
return nil
if err == kv.ErrKeyNotFound {
return completion, nil
}
return fmt.Errorf("getting value: %w", err)
}
valCopy, err := item.ValueCopy(nil)
if err != nil {
return fmt.Errorf("copying value: %w", err)
}
compl := PieceCompletionState(valCopy[0])
switch compl {
case PieceComplete:
completion.Ok = true
completion.Complete = true
case PieceNotComplete:
completion.Ok = true
completion.Complete = false
}
return nil
})
return completion, err
}
if state == PieceComplete {
return storage.Completion{
Complete: true,
Ok: true,
}, nil
}
return storage.Completion{
Complete: false,
Ok: true,
}, nil
}
func (me badgerPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
ctx := context.Background()
if c, err := me.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}
return me.db.Update(func(txn *badger.Txn) error {
return txn.Set(pkToBytes(pk), []byte{byte(pieceCompletionState(b))})
})
}
func (k *badgerPieceCompletion) Delete(key string) error {
return k.db.Update(
func(txn *badger.Txn) error {
return txn.Delete([]byte(key))
})
return me.db.Set(ctx, pieceKey(pk), pieceCompletionState(b))
}
func (me *badgerPieceCompletion) Close() error {
return me.db.Close()
return me.db.Close(context.Background())
}
type badgerSlog struct {
slog *slog.Logger
}
// Debugf implements badger.Logger.
func (log badgerSlog) Debugf(f string, a ...interface{}) {
log.slog.Debug(f, a...)
}
// Errorf implements badger.Logger.
func (log badgerSlog) Errorf(f string, a ...interface{}) {
log.slog.Error(f, a...)
}
// Infof implements badger.Logger.
func (log badgerSlog) Infof(f string, a ...interface{}) {
log.slog.Info(f, a...)
}
// Warningf implements badger.Logger.
func (log badgerSlog) Warningf(f string, a ...interface{}) {
log.slog.Warn(f, a...)
}
var _ badger.Logger = (*badgerSlog)(nil)

View file

@ -147,9 +147,7 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
// replace this with the address of pyroscope server
ServerAddress: "https://pyroscope.kmsign.ru",
// you can disable logging by setting this to nil
Logger: &pyroscopeLogger{
log: client.log.WithComponent("pyroscope"),
},
Logger: newPyroscopeLogger(client.log),
ProfileTypes: []pyroscope.ProfileType{
// these profile types are enabled by default:
pyroscope.ProfileCPU,
@ -172,6 +170,12 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
return client, nil
}
func newPyroscopeLogger(log *rlog.Logger) *pyroscopeLogger {
return &pyroscopeLogger{
log: log.WithComponent("pyroscope").Nested(1),
}
}
type pyroscopeLogger struct {
log *rlog.Logger
}

View file

@ -3,22 +3,20 @@ package tkv
import (
"path"
"git.kmsign.ru/royalcat/tstor/pkg/kvtrace"
tlog "git.kmsign.ru/royalcat/tstor/src/log"
"github.com/royalcat/kv"
"github.com/royalcat/kv/kvbadger"
"go.opentelemetry.io/otel/attribute"
)
func NewKV[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
opts := kvbadger.DefaultOptions(path.Join(dbdir, name))
opts := kvbadger.DefaultOptions[V](path.Join(dbdir, name))
opts.BadgerOptions.Logger = tlog.BadgerLogger(name, "badger")
store, err = kvbadger.NewBadgerKVBytesKey[K, V](opts)
if err != nil {
return nil, err
}
store = kvtrace.WrapTracing(store, attribute.String("collection", name), attribute.String("database", "badger"))
// store = kvtrace.WrapTracing(store, attribute.String("collection", name), attribute.String("database", "badger"))
return store, err
}

View file

@ -94,7 +94,7 @@ type Torrent {
bytesCompleted: Int!
torrentFilePath: String!
bytesMissing: Int!
priority: TorrentPriority! @resolver
priority: TorrentPriority!
files: [TorrentFile!]! @resolver
excludedFiles: [TorrentFile!]! @resolver
peers: [TorrentPeer!]! @resolver
@ -116,6 +116,7 @@ type TorrentFile {
filename: String!
size: Int!
bytesCompleted: Int!
priority: TorrentPriority! @resolver
}
type TorrentFileEntry implements File & FsEntry {
name: String!
@ -138,7 +139,6 @@ enum TorrentPriority {
NORMAL
HIGH
READAHEAD
NEXT
NOW
}
input TorrentPriorityFilter @oneOf {