torrent cleanup

This commit is contained in:
royalcat 2024-11-15 16:39:56 +03:00
parent b069b3ad1c
commit fc6b838cf5
24 changed files with 1316 additions and 395 deletions

View file

@ -0,0 +1,66 @@
package qbittorrent
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"path"
"git.kmsign.ru/royalcat/tstor/pkg/qbittorrent"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
)
func (d *Daemon) Cleanup(ctx context.Context, dryRun bool) ([]string, error) {
d.log.Info(ctx, "cleanup started")
torrentInfos, err := d.client.qb.Torrent().GetTorrents(ctx, &qbittorrent.TorrentOption{})
if err != nil {
d.log.Error(ctx, "failed to get torrents", rlog.Error(err))
return nil, fmt.Errorf("failed to get torrents: %w", err)
}
torrentToDelete := make([]string, 0, 5)
for _, info := range torrentInfos {
if d.registeredTorrents.Contains(info.Hash) {
continue
}
d.log.Info(ctx, "torrent not found in registry", slog.String("infohash", info.Hash))
torrentToDelete = append(torrentToDelete, info.Hash)
}
d.log.Info(ctx, "marked torrents to delete",
slog.Int("count", len(torrentToDelete)),
slog.Any("infohashes", torrentToDelete),
)
if dryRun {
d.log.Info(ctx, "dry run, skipping deletion")
return torrentToDelete, nil
}
err = d.client.qb.Torrent().DeleteTorrents(ctx, torrentToDelete, true)
if err != nil {
d.log.Error(ctx, "failed to delete torrents", slog.Any("infohashes", torrentToDelete), rlog.Error(err))
return nil, fmt.Errorf("failed to delete torrents: %w", err)
}
d.log.Info(ctx, "torrents deleted from qbittorrent", slog.Int("count", len(torrentToDelete)))
for _, hash := range torrentToDelete {
torrentPath := path.Join(d.dataDir, hash)
_, err := os.Stat(torrentPath)
if errors.Is(err, os.ErrNotExist) {
continue
}
if err != nil {
d.log.Error(ctx, "failed to get torrent path", slog.String("path", torrentPath), rlog.Error(err))
continue
}
d.log.Warn(ctx, "leftover data for torrent detected, cleaning up", slog.String("infohash", hash), slog.String("path", torrentPath))
}
return torrentToDelete, nil
}

View file

@ -20,6 +20,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types/infohash"
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
mapset "github.com/deckarep/golang-set/v2"
"github.com/iceber/iouring-go"
"github.com/royalcat/ctxio"
"go.opentelemetry.io/otel"
@ -35,6 +36,8 @@ type Daemon struct {
sourceFilesMu sync.Mutex
sourceFiles map[string]string // [sourcePath]infohash
registeredTorrents mapset.Set[string] // infohash list
dataDir string
ur *iouring.IOURing
log *rlog.Logger
@ -124,13 +127,14 @@ func NewDaemon(conf config.QBittorrent) (*Daemon, error) {
}
return &Daemon{
qb: qb,
proc: proc,
dataDir: conf.DataFolder,
ur: ur,
sourceFiles: make(map[string]string),
client: wrapClient(qb),
log: rlog.Component("qbittorrent"),
qb: qb,
proc: proc,
dataDir: conf.DataFolder,
ur: ur,
sourceFiles: make(map[string]string),
registeredTorrents: mapset.NewSet[string](),
client: wrapClient(qb),
log: rlog.Component("qbittorrent"),
}, nil
}
@ -217,10 +221,13 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
FirstLastPiecePrio: "true",
})
if err != nil {
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
return err
}
var props *qbittorrent.TorrentProperties
for {
_, err := d.client.getProperties(ctx, ih.HexString())
props, err = d.client.getProperties(ctx, ih.HexString())
if err == nil {
break
}
@ -230,10 +237,7 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
log.Info(ctx, "added torrent", slog.String("infohash", ih.HexString()))
if err != nil {
d.log.Error(ctx, "error adding torrent", rlog.Error(err))
return err
}
d.registeredTorrents.Add(props.Hash)
return nil
} else {
@ -243,6 +247,8 @@ func (d *Daemon) syncTorrentState(ctx context.Context, file vfs.File, ih metainf
return err
}
d.registeredTorrents.Add(props.Hash)
if props.SavePath != torrentPath {
log.Info(ctx, "moving torrent to correct location", slog.String("oldPath", props.SavePath))
err = d.qb.Torrent().SetLocation(ctx, []string{ih.HexString()}, torrentPath)

View file

@ -390,7 +390,7 @@ func (f *fileInfo) ModTime() time.Time {
// Mode implements fs.FileInfo.
func (f *fileInfo) Mode() fs.FileMode {
return vfs.ROMode
return vfs.ModeFileRO
}
// Name implements fs.FileInfo.

View file

@ -446,7 +446,7 @@ func (tf *torrentFile) Seek(offset int64, whence int) (int64, error) {
// Type implements File.
func (tf *torrentFile) Type() fs.FileMode {
return vfs.ROMode | fs.ModeDir
return vfs.ModeFileRO | fs.ModeDir
}
func (tf *torrentFile) Info() (fs.FileInfo, error) {

View file

@ -43,6 +43,7 @@ type Config struct {
type ResolverRoot interface {
ArchiveFS() ArchiveFSResolver
Mutation() MutationResolver
QBitTorrentDaemonMutation() QBitTorrentDaemonMutationResolver
QBitTorrentDaemonQuery() QBitTorrentDaemonQueryResolver
QTorrent() QTorrentResolver
Query() QueryResolver
@ -79,9 +80,19 @@ type ComplexityRoot struct {
}
Mutation struct {
DedupeStorage func(childComplexity int) int
TorrentDaemon func(childComplexity int) int
UploadFile func(childComplexity int, dir string, file graphql.Upload) int
DedupeStorage func(childComplexity int) int
QbitTorrentDaemon func(childComplexity int) int
TorrentDaemon func(childComplexity int) int
UploadFile func(childComplexity int, dir string, file graphql.Upload) int
}
QBitCleanupResponse struct {
Count func(childComplexity int) int
Hashes func(childComplexity int) int
}
QBitTorrentDaemonMutation struct {
Cleanup func(childComplexity int, dryRun bool) int
}
QBitTorrentDaemonQuery struct {
@ -96,7 +107,7 @@ type ComplexityRoot struct {
Query struct {
FsEntry func(childComplexity int, path string) int
QbittorrentDaemon func(childComplexity int) int
QbitTorrentDaemon func(childComplexity int) int
TorrentDaemon func(childComplexity int) int
}
@ -217,9 +228,13 @@ type ArchiveFSResolver interface {
}
type MutationResolver interface {
TorrentDaemon(ctx context.Context) (*model.TorrentDaemonMutation, error)
QbitTorrentDaemon(ctx context.Context) (*model.QBitTorrentDaemonMutation, error)
UploadFile(ctx context.Context, dir string, file graphql.Upload) (bool, error)
DedupeStorage(ctx context.Context) (int64, error)
}
type QBitTorrentDaemonMutationResolver interface {
Cleanup(ctx context.Context, obj *model.QBitTorrentDaemonMutation, dryRun bool) (*model.QBitCleanupResponse, error)
}
type QBitTorrentDaemonQueryResolver interface {
Torrents(ctx context.Context, obj *model.QBitTorrentDaemonQuery) ([]*model.QTorrent, error)
}
@ -228,7 +243,7 @@ type QTorrentResolver interface {
}
type QueryResolver interface {
TorrentDaemon(ctx context.Context) (*model.TorrentDaemonQuery, error)
QbittorrentDaemon(ctx context.Context) (*model.QBitTorrentDaemonQuery, error)
QbitTorrentDaemon(ctx context.Context) (*model.QBitTorrentDaemonQuery, error)
FsEntry(ctx context.Context, path string) (model.FsEntry, error)
}
type ResolverFSResolver interface {
@ -333,6 +348,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Mutation.DedupeStorage(childComplexity), true
case "Mutation.qbitTorrentDaemon":
if e.complexity.Mutation.QbitTorrentDaemon == nil {
break
}
return e.complexity.Mutation.QbitTorrentDaemon(childComplexity), true
case "Mutation.torrentDaemon":
if e.complexity.Mutation.TorrentDaemon == nil {
break
@ -352,6 +374,32 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Mutation.UploadFile(childComplexity, args["dir"].(string), args["file"].(graphql.Upload)), true
case "QBitCleanupResponse.count":
if e.complexity.QBitCleanupResponse.Count == nil {
break
}
return e.complexity.QBitCleanupResponse.Count(childComplexity), true
case "QBitCleanupResponse.hashes":
if e.complexity.QBitCleanupResponse.Hashes == nil {
break
}
return e.complexity.QBitCleanupResponse.Hashes(childComplexity), true
case "QBitTorrentDaemonMutation.cleanup":
if e.complexity.QBitTorrentDaemonMutation.Cleanup == nil {
break
}
args, err := ec.field_QBitTorrentDaemonMutation_cleanup_args(context.TODO(), rawArgs)
if err != nil {
return 0, false
}
return e.complexity.QBitTorrentDaemonMutation.Cleanup(childComplexity, args["dryRun"].(bool)), true
case "QBitTorrentDaemonQuery.torrents":
if e.complexity.QBitTorrentDaemonQuery.Torrents == nil {
break
@ -392,12 +440,12 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Query.FsEntry(childComplexity, args["path"].(string)), true
case "Query.qbittorrentDaemon":
if e.complexity.Query.QbittorrentDaemon == nil {
case "Query.qbitTorrentDaemon":
if e.complexity.Query.QbitTorrentDaemon == nil {
break
}
return e.complexity.Query.QbittorrentDaemon(childComplexity), true
return e.complexity.Query.QbitTorrentDaemon(childComplexity), true
case "Query.torrentDaemon":
if e.complexity.Query.TorrentDaemon == nil {
@ -1009,6 +1057,7 @@ func (ec *executionContext) introspectType(name string) (*introspection.Type, er
var sources = []*ast.Source{
{Name: "../../../graphql/mutation.graphql", Input: `type Mutation {
torrentDaemon: TorrentDaemonMutation @resolver
qbitTorrentDaemon: QBitTorrentDaemonMutation @resolver
uploadFile(dir: String!, file: Upload!): Boolean!
dedupeStorage: Int!
@ -1020,7 +1069,7 @@ type Task {
`, BuiltIn: false},
{Name: "../../../graphql/query.graphql", Input: `type Query {
torrentDaemon: TorrentDaemonQuery @resolver
qbittorrentDaemon: QBitTorrentDaemonQuery @resolver
qbitTorrentDaemon: QBitTorrentDaemonQuery @resolver
fsEntry(path: String!): FsEntry
}
@ -1055,17 +1104,26 @@ interface Progress {
current: Int!
total: Int!
}`, BuiltIn: false},
{Name: "../../../graphql/daemons/qbittorrent_query.graphql", Input: `type QBitTorrentDaemonQuery {
{Name: "../../../graphql/sources/qbittorrent_mutation.graphql", Input: `type QBitTorrentDaemonMutation {
cleanup(dryRun: Boolean!): QBitCleanupResponse! @resolver
}
type QBitCleanupResponse {
count: Int!
hashes: [String!]!
}
`, BuiltIn: false},
{Name: "../../../graphql/sources/qbittorrent_query.graphql", Input: `type QBitTorrentDaemonQuery {
torrents: [QTorrent!]! @resolver
}
`, BuiltIn: false},
{Name: "../../../graphql/daemons/qbittorrent_types.graphql", Input: `type QTorrent {
{Name: "../../../graphql/sources/qbittorrent_types.graphql", Input: `type QTorrent {
name: String!
hash: String!
sourceFiles: [String!]! @resolver
}
`, BuiltIn: false},
{Name: "../../../graphql/daemons/torrent_mutation.graphql", Input: `type TorrentDaemonMutation {
{Name: "../../../graphql/sources/torrent_mutation.graphql", Input: `type TorrentDaemonMutation {
validateTorrent(filter: TorrentFilter!): Boolean! @resolver
setTorrentPriority(
infohash: String!
@ -1084,7 +1142,7 @@ type DownloadTorrentResponse {
task: Task
}
`, BuiltIn: false},
{Name: "../../../graphql/daemons/torrent_query.graphql", Input: `type TorrentDaemonQuery {
{Name: "../../../graphql/sources/torrent_query.graphql", Input: `type TorrentDaemonQuery {
torrents(filter: TorrentsFilter): [Torrent!]! @resolver
clientStats: TorrentClientStats! @resolver
statsHistory(since: DateTime!, infohash: String): [TorrentStats!]! @resolver
@ -1114,7 +1172,7 @@ input TorrentFilter @oneOf {
# pathGlob: String!
}
`, BuiltIn: false},
{Name: "../../../graphql/daemons/torrent_types.graphql", Input: `type Torrent {
{Name: "../../../graphql/sources/torrent_types.graphql", Input: `type Torrent {
name: String! @resolver
infohash: String!
bytesCompleted: Int!
@ -1323,6 +1381,38 @@ func (ec *executionContext) field_Mutation_uploadFile_argsFile(
return zeroVal, nil
}
func (ec *executionContext) field_QBitTorrentDaemonMutation_cleanup_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error
args := map[string]interface{}{}
arg0, err := ec.field_QBitTorrentDaemonMutation_cleanup_argsDryRun(ctx, rawArgs)
if err != nil {
return nil, err
}
args["dryRun"] = arg0
return args, nil
}
func (ec *executionContext) field_QBitTorrentDaemonMutation_cleanup_argsDryRun(
ctx context.Context,
rawArgs map[string]interface{},
) (bool, error) {
// We won't call the directive if the argument is null.
// Set call_argument_directives_with_null to true to call directives
// even if the argument is null.
_, ok := rawArgs["dryRun"]
if !ok {
var zeroVal bool
return zeroVal, nil
}
ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("dryRun"))
if tmp, ok := rawArgs["dryRun"]; ok {
return ec.unmarshalNBoolean2bool(ctx, tmp)
}
var zeroVal bool
return zeroVal, nil
}
func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error
args := map[string]interface{}{}
@ -2117,6 +2207,73 @@ func (ec *executionContext) fieldContext_Mutation_torrentDaemon(_ context.Contex
return fc, nil
}
func (ec *executionContext) _Mutation_qbitTorrentDaemon(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Mutation_qbitTorrentDaemon(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
directive0 := func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Mutation().QbitTorrentDaemon(rctx)
}
directive1 := func(ctx context.Context) (interface{}, error) {
if ec.directives.Resolver == nil {
var zeroVal *model.QBitTorrentDaemonMutation
return zeroVal, errors.New("directive resolver is not implemented")
}
return ec.directives.Resolver(ctx, nil, directive0)
}
tmp, err := directive1(rctx)
if err != nil {
return nil, graphql.ErrorOnPath(ctx, err)
}
if tmp == nil {
return nil, nil
}
if data, ok := tmp.(*model.QBitTorrentDaemonMutation); ok {
return data, nil
}
return nil, fmt.Errorf(`unexpected type %T from directive, should be *git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model.QBitTorrentDaemonMutation`, tmp)
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(*model.QBitTorrentDaemonMutation)
fc.Result = res
return ec.marshalOQBitTorrentDaemonMutation2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐQBitTorrentDaemonMutation(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Mutation_qbitTorrentDaemon(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Mutation",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name {
case "cleanup":
return ec.fieldContext_QBitTorrentDaemonMutation_cleanup(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type QBitTorrentDaemonMutation", field.Name)
},
}
return fc, nil
}
func (ec *executionContext) _Mutation_uploadFile(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Mutation_uploadFile(ctx, field)
if err != nil {
@ -2216,6 +2373,177 @@ func (ec *executionContext) fieldContext_Mutation_dedupeStorage(_ context.Contex
return fc, nil
}
func (ec *executionContext) _QBitCleanupResponse_count(ctx context.Context, field graphql.CollectedField, obj *model.QBitCleanupResponse) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_QBitCleanupResponse_count(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Count, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int64)
fc.Result = res
return ec.marshalNInt2int64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_QBitCleanupResponse_count(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "QBitCleanupResponse",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _QBitCleanupResponse_hashes(ctx context.Context, field graphql.CollectedField, obj *model.QBitCleanupResponse) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_QBitCleanupResponse_hashes(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Hashes, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.([]string)
fc.Result = res
return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_QBitCleanupResponse_hashes(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "QBitCleanupResponse",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _QBitTorrentDaemonMutation_cleanup(ctx context.Context, field graphql.CollectedField, obj *model.QBitTorrentDaemonMutation) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_QBitTorrentDaemonMutation_cleanup(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
directive0 := func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.QBitTorrentDaemonMutation().Cleanup(rctx, obj, fc.Args["dryRun"].(bool))
}
directive1 := func(ctx context.Context) (interface{}, error) {
if ec.directives.Resolver == nil {
var zeroVal *model.QBitCleanupResponse
return zeroVal, errors.New("directive resolver is not implemented")
}
return ec.directives.Resolver(ctx, obj, directive0)
}
tmp, err := directive1(rctx)
if err != nil {
return nil, graphql.ErrorOnPath(ctx, err)
}
if tmp == nil {
return nil, nil
}
if data, ok := tmp.(*model.QBitCleanupResponse); ok {
return data, nil
}
return nil, fmt.Errorf(`unexpected type %T from directive, should be *git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model.QBitCleanupResponse`, tmp)
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(*model.QBitCleanupResponse)
fc.Result = res
return ec.marshalNQBitCleanupResponse2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐQBitCleanupResponse(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_QBitTorrentDaemonMutation_cleanup(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "QBitTorrentDaemonMutation",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name {
case "count":
return ec.fieldContext_QBitCleanupResponse_count(ctx, field)
case "hashes":
return ec.fieldContext_QBitCleanupResponse_hashes(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type QBitCleanupResponse", field.Name)
},
}
defer func() {
if r := recover(); r != nil {
err = ec.Recover(ctx, r)
ec.Error(ctx, err)
}
}()
ctx = graphql.WithFieldContext(ctx, fc)
if fc.Args, err = ec.field_QBitTorrentDaemonMutation_cleanup_args(ctx, field.ArgumentMap(ec.Variables)); err != nil {
ec.Error(ctx, err)
return fc, err
}
return fc, nil
}
func (ec *executionContext) _QBitTorrentDaemonQuery_torrents(ctx context.Context, field graphql.CollectedField, obj *model.QBitTorrentDaemonQuery) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_QBitTorrentDaemonQuery_torrents(ctx, field)
if err != nil {
@ -2515,8 +2843,8 @@ func (ec *executionContext) fieldContext_Query_torrentDaemon(_ context.Context,
return fc, nil
}
func (ec *executionContext) _Query_qbittorrentDaemon(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Query_qbittorrentDaemon(ctx, field)
func (ec *executionContext) _Query_qbitTorrentDaemon(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Query_qbitTorrentDaemon(ctx, field)
if err != nil {
return graphql.Null
}
@ -2530,7 +2858,7 @@ func (ec *executionContext) _Query_qbittorrentDaemon(ctx context.Context, field
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
directive0 := func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().QbittorrentDaemon(rctx)
return ec.resolvers.Query().QbitTorrentDaemon(rctx)
}
directive1 := func(ctx context.Context) (interface{}, error) {
@ -2565,7 +2893,7 @@ func (ec *executionContext) _Query_qbittorrentDaemon(ctx context.Context, field
return ec.marshalOQBitTorrentDaemonQuery2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐQBitTorrentDaemonQuery(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Query_qbittorrentDaemon(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
func (ec *executionContext) fieldContext_Query_qbitTorrentDaemon(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Query",
Field: field,
@ -2900,8 +3228,8 @@ func (ec *executionContext) fieldContext_Schema_query(_ context.Context, field g
switch field.Name {
case "torrentDaemon":
return ec.fieldContext_Query_torrentDaemon(ctx, field)
case "qbittorrentDaemon":
return ec.fieldContext_Query_qbittorrentDaemon(ctx, field)
case "qbitTorrentDaemon":
return ec.fieldContext_Query_qbitTorrentDaemon(ctx, field)
case "fsEntry":
return ec.fieldContext_Query_fsEntry(ctx, field)
case "__schema":
@ -2942,6 +3270,8 @@ func (ec *executionContext) fieldContext_Schema_mutation(_ context.Context, fiel
switch field.Name {
case "torrentDaemon":
return ec.fieldContext_Mutation_torrentDaemon(ctx, field)
case "qbitTorrentDaemon":
return ec.fieldContext_Mutation_qbitTorrentDaemon(ctx, field)
case "uploadFile":
return ec.fieldContext_Mutation_uploadFile(ctx, field)
case "dedupeStorage":
@ -8972,6 +9302,10 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet)
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
return ec._Mutation_torrentDaemon(ctx, field)
})
case "qbitTorrentDaemon":
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
return ec._Mutation_qbitTorrentDaemon(ctx, field)
})
case "uploadFile":
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
return ec._Mutation_uploadFile(ctx, field)
@ -9009,6 +9343,120 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet)
return out
}
var qBitCleanupResponseImplementors = []string{"QBitCleanupResponse"}
func (ec *executionContext) _QBitCleanupResponse(ctx context.Context, sel ast.SelectionSet, obj *model.QBitCleanupResponse) graphql.Marshaler {
fields := graphql.CollectFields(ec.OperationContext, sel, qBitCleanupResponseImplementors)
out := graphql.NewFieldSet(fields)
deferred := make(map[string]*graphql.FieldSet)
for i, field := range fields {
switch field.Name {
case "__typename":
out.Values[i] = graphql.MarshalString("QBitCleanupResponse")
case "count":
out.Values[i] = ec._QBitCleanupResponse_count(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
case "hashes":
out.Values[i] = ec._QBitCleanupResponse_hashes(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
default:
panic("unknown field " + strconv.Quote(field.Name))
}
}
out.Dispatch(ctx)
if out.Invalids > 0 {
return graphql.Null
}
atomic.AddInt32(&ec.deferred, int32(len(deferred)))
for label, dfs := range deferred {
ec.processDeferredGroup(graphql.DeferredGroup{
Label: label,
Path: graphql.GetPath(ctx),
FieldSet: dfs,
Context: ctx,
})
}
return out
}
var qBitTorrentDaemonMutationImplementors = []string{"QBitTorrentDaemonMutation"}
func (ec *executionContext) _QBitTorrentDaemonMutation(ctx context.Context, sel ast.SelectionSet, obj *model.QBitTorrentDaemonMutation) graphql.Marshaler {
fields := graphql.CollectFields(ec.OperationContext, sel, qBitTorrentDaemonMutationImplementors)
out := graphql.NewFieldSet(fields)
deferred := make(map[string]*graphql.FieldSet)
for i, field := range fields {
switch field.Name {
case "__typename":
out.Values[i] = graphql.MarshalString("QBitTorrentDaemonMutation")
case "cleanup":
field := field
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._QBitTorrentDaemonMutation_cleanup(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&fs.Invalids, 1)
}
return res
}
if field.Deferrable != nil {
dfs, ok := deferred[field.Deferrable.Label]
di := 0
if ok {
dfs.AddField(field)
di = len(dfs.Values) - 1
} else {
dfs = graphql.NewFieldSet([]graphql.CollectedField{field})
deferred[field.Deferrable.Label] = dfs
}
dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler {
return innerFunc(ctx, dfs)
})
// don't run the out.Concurrently() call below
out.Values[i] = graphql.Null
continue
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
default:
panic("unknown field " + strconv.Quote(field.Name))
}
}
out.Dispatch(ctx)
if out.Invalids > 0 {
return graphql.Null
}
atomic.AddInt32(&ec.deferred, int32(len(deferred)))
for label, dfs := range deferred {
ec.processDeferredGroup(graphql.DeferredGroup{
Label: label,
Path: graphql.GetPath(ctx),
FieldSet: dfs,
Context: ctx,
})
}
return out
}
var qBitTorrentDaemonQueryImplementors = []string{"QBitTorrentDaemonQuery"}
func (ec *executionContext) _QBitTorrentDaemonQuery(ctx context.Context, sel ast.SelectionSet, obj *model.QBitTorrentDaemonQuery) graphql.Marshaler {
@ -9197,7 +9645,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) })
case "qbittorrentDaemon":
case "qbitTorrentDaemon":
field := field
innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) {
@ -9206,7 +9654,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Query_qbittorrentDaemon(ctx, field)
res = ec._Query_qbitTorrentDaemon(ctx, field)
return res
}
@ -10996,6 +11444,16 @@ func (ec *executionContext) marshalNInt2int64(ctx context.Context, sel ast.Selec
return res
}
func (ec *executionContext) marshalNQBitCleanupResponse2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐQBitCleanupResponse(ctx context.Context, sel ast.SelectionSet, v *model.QBitCleanupResponse) graphql.Marshaler {
if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
return graphql.Null
}
return ec._QBitCleanupResponse(ctx, sel, v)
}
func (ec *executionContext) marshalNQTorrent2ᚕᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐQTorrentᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.QTorrent) graphql.Marshaler {
ret := make(graphql.Array, len(v))
var wg sync.WaitGroup
@ -11769,6 +12227,13 @@ func (ec *executionContext) marshalOProgress2gitᚗkmsignᚗruᚋroyalcatᚋtsto
return ec._Progress(ctx, sel, v)
}
func (ec *executionContext) marshalOQBitTorrentDaemonMutation2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐQBitTorrentDaemonMutation(ctx context.Context, sel ast.SelectionSet, v *model.QBitTorrentDaemonMutation) graphql.Marshaler {
if v == nil {
return graphql.Null
}
return ec._QBitTorrentDaemonMutation(ctx, sel, v)
}
func (ec *executionContext) marshalOQBitTorrentDaemonQuery2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐQBitTorrentDaemonQuery(ctx context.Context, sel ast.SelectionSet, v *model.QBitTorrentDaemonQuery) graphql.Marshaler {
if v == nil {
return graphql.Null

View file

@ -96,6 +96,15 @@ type Pagination struct {
Limit int64 `json:"limit"`
}
type QBitCleanupResponse struct {
Count int64 `json:"count"`
Hashes []string `json:"hashes"`
}
type QBitTorrentDaemonMutation struct {
Cleanup *QBitCleanupResponse `json:"cleanup"`
}
type QBitTorrentDaemonQuery struct {
Torrents []*QTorrent `json:"torrents"`
}

View file

@ -21,6 +21,11 @@ func (r *mutationResolver) TorrentDaemon(ctx context.Context) (*model.TorrentDae
return &model.TorrentDaemonMutation{}, nil
}
// QbitTorrentDaemon is the resolver for the qbitTorrentDaemon field.
func (r *mutationResolver) QbitTorrentDaemon(ctx context.Context) (*model.QBitTorrentDaemonMutation, error) {
return &model.QBitTorrentDaemonMutation{}, nil
}
// UploadFile is the resolver for the uploadFile field.
func (r *mutationResolver) UploadFile(ctx context.Context, dir string, file graphql.Upload) (bool, error) {
dirInfo, err := r.SourceFS.Stat(dir)

View file

@ -0,0 +1,31 @@
package resolver
// This file will be automatically regenerated based on the schema, any resolver implementations
// will be copied through when generating and any unknown code will be moved to the end.
// Code generated by github.com/99designs/gqlgen version v0.17.55
import (
"context"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
)
// Cleanup is the resolver for the cleanup field.
func (r *qBitTorrentDaemonMutationResolver) Cleanup(ctx context.Context, obj *model.QBitTorrentDaemonMutation, dryRun bool) (*model.QBitCleanupResponse, error) {
hahses, err := r.QBitTorrentDaemon.Cleanup(ctx, dryRun)
if err != nil {
return nil, err
}
return &model.QBitCleanupResponse{
Count: int64(len(hahses)),
Hashes: hahses,
}, nil
}
// QBitTorrentDaemonMutation returns graph.QBitTorrentDaemonMutationResolver implementation.
func (r *Resolver) QBitTorrentDaemonMutation() graph.QBitTorrentDaemonMutationResolver {
return &qBitTorrentDaemonMutationResolver{r}
}
type qBitTorrentDaemonMutationResolver struct{ *Resolver }

View file

@ -16,8 +16,8 @@ func (r *queryResolver) TorrentDaemon(ctx context.Context) (*model.TorrentDaemon
return &model.TorrentDaemonQuery{}, nil
}
// QbittorrentDaemon is the resolver for the qbittorrentDaemon field.
func (r *queryResolver) QbittorrentDaemon(ctx context.Context) (*model.QBitTorrentDaemonQuery, error) {
// QbitTorrentDaemon is the resolver for the qbitTorrentDaemon field.
func (r *queryResolver) QbitTorrentDaemon(ctx context.Context) (*model.QBitTorrentDaemonQuery, error) {
return &model.QBitTorrentDaemonQuery{}, nil
}

View file

@ -241,7 +241,7 @@ func (d *archiveFile) Name() string {
// Type implements File.
func (d *archiveFile) Type() fs.FileMode {
return ROMode
return ModeFileRO
}
func (d *archiveFile) Info() (fs.FileInfo, error) {

View file

@ -3,15 +3,19 @@ package vfs
import (
"context"
"errors"
"io"
"sync"
"github.com/dgraph-io/ristretto"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/royalcat/ctxio"
)
// TODO переделать кеш в демон
const blockSize int64 = 1024 * 16 // 16KB
const defaultBlockCount = 32768 // 512MB of total usage
const blockSize = 1024 * 16 // 16KB
const cacheSize = 1024 * 1024 * 1024 * 4 // 4GB of total usage
const defaultBlockCount = cacheSize / blockSize
type archiveFileIndex struct {
archive string
@ -23,15 +27,21 @@ type blockIndex struct {
off int64
}
var blockCache *lru.Cache[blockIndex, []byte]
type block struct {
data [blockSize]byte
len int
}
var blockCache *lru.Cache[blockIndex, block]
func ChangeBufferSize(blockCount int) {
blockCache.Resize(blockCount)
}
func init() {
ristretto.NewCache(&ristretto.Config{})
var err error
blockCache, err = lru.New[blockIndex, []byte](defaultBlockCount)
blockCache, err = lru.New[blockIndex, block](defaultBlockCount)
if err != nil {
panic(err)
}
@ -50,6 +60,7 @@ type randomReaderFromLinear struct {
readerFactory archiveFileReaderFactory
reader ctxio.ReadCloser
readen int64
readerMutex sync.Mutex
size int64
closed bool
}
@ -71,11 +82,12 @@ func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64
}
aligntOff := (off / blockSize) * blockSize
bI := blockIndex{index: a.index, off: aligntOff}
block, ok := blockCache.Get(blockIndex{index: a.index, off: aligntOff})
block, ok := blockCache.Get(bI)
if ok {
n = copy(p, block[off-aligntOff:])
if len(block) < int(blockSize) {
n = copy(p, block.data[off-aligntOff:block.len])
if block.len < int(blockSize) {
err = ctxio.EOF
}
@ -83,48 +95,56 @@ func (a *randomReaderFromLinear) ReadAt(ctx context.Context, p []byte, off int64
}
span.AddEvent("cache miss, reading from file")
if err := a.readTo(ctx, aligntOff+blockSize); err != nil && err != ctxio.EOF {
block, err = a.readBlock(ctx, bI)
if err != nil && err != ctxio.EOF {
return 0, err
}
block, ok = blockCache.Get(blockIndex{index: a.index, off: aligntOff})
if !ok {
// WTF this theoretically shouldn't happen under normal scenarios
return 0, errors.New("block not found or block cache under too much pressure, try to increase the cache size")
}
n = copy(p, block[off-aligntOff:])
if len(block) < int(blockSize) {
err = ctxio.EOF
}
return n, err
return copy(p, block.data[off-aligntOff:block.len]), err
}
func (a *randomReaderFromLinear) readTo(ctx context.Context, targetOffset int64) (err error) {
if a.reader == nil || a.readen > targetOffset {
func (a *randomReaderFromLinear) readBlock(ctx context.Context, bI blockIndex) (block, error) {
a.readerMutex.Lock()
defer a.readerMutex.Unlock()
if b, ok := blockCache.Get(bI); ok { // check again, maybe another goroutine already read this block
return b, nil
}
if a.reader == nil || a.readen > bI.off {
var err error
a.reader, err = a.readerFactory(context.TODO())
if err != nil {
return err
return block{}, err
}
a.readen = 0
}
for off := a.readen; off < targetOffset; off += blockSize {
for off := a.readen; off <= bI.off; off += blockSize {
// TODO sync.Pool ?
buf := make([]byte, blockSize)
n, err := a.reader.Read(ctx, buf)
buf := [blockSize]byte{}
n, err := a.reader.Read(ctx, buf[:])
if err != nil && err != ctxio.EOF {
return err
return block{}, err
}
a.readen += int64(n)
if int64(n) < blockSize {
buf = buf[:n]
if n == 0 {
return block{}, io.EOF
}
blockCache.Add(blockIndex{index: a.index, off: off}, buf)
blockCache.Add(blockIndex{bI.index, off}, block{len: n, data: buf})
if off == bI.off {
return block{len: n, data: buf}, err
}
if n < int(blockSize) && errors.Is(err, ctxio.EOF) {
return block{}, err
}
}
return nil
return block{}, io.EOF
}
// Close implements ctxio.Closer.

View file

@ -60,5 +60,5 @@ func (d *dirFile) Size() int64 {
// Type implements File.
func (d *dirFile) Type() fs.FileMode {
return ROMode | fs.ModeDir
return ModeFileRO | fs.ModeDir
}

View file

@ -44,7 +44,7 @@ type Filesystem interface {
}
// readonly
const ROMode = fs.FileMode(0555)
const ModeFileRO = fs.FileMode(0555)
type fileInfo struct {
name string
@ -93,10 +93,10 @@ func (fi *fileInfo) Size() int64 {
func (fi *fileInfo) Mode() fs.FileMode {
if fi.isDir {
return ROMode | fs.ModeDir
return ModeFileRO | fs.ModeDir
}
return ROMode
return ModeFileRO
}
func (fi *fileInfo) ModTime() time.Time {

View file

@ -37,7 +37,7 @@ func TestDirInfo(t *testing.T) {
require.NotNil(fi.ModTime())
require.NotZero(fi.Type() & fs.ModeDir)
require.NotZero(fi.Mode() & fs.ModeDir)
require.Equal(ROMode|fs.ModeDir, fi.Mode())
require.Equal(ModeFileRO|fs.ModeDir, fi.Mode())
require.Nil(fi.Sys())
}

View file

@ -120,7 +120,7 @@ func (d *MemoryFile) Seek(offset int64, whence int) (int64, error) {
// Type implements File.
func (d *MemoryFile) Type() fs.FileMode {
return ROMode
return ModeFileRO
}
func (d *MemoryFile) Info() (fs.FileInfo, error) {

View file

@ -42,6 +42,13 @@ func AddTrailSlash(p string) string {
return p
}
func RemoveTrailingSlash(p string) string {
if p == Separator {
return ""
}
return strings.TrimSuffix(p, Separator)
}
// OnceValueWOErr returns a function that invokes f only once and returns the value
// returned by f . The returned function may be called concurrently.
//