torrent priority and piece state fix
This commit is contained in:
parent
13ce2aa07f
commit
199a82ff0c
33 changed files with 2227 additions and 959 deletions
src
File diff suppressed because it is too large
Load diff
|
@ -3,6 +3,8 @@ package model
|
|||
import (
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/anacrolix/torrent/types"
|
||||
)
|
||||
|
||||
type Filter[T any] interface {
|
||||
|
@ -52,3 +54,23 @@ func (f *BooleanFilter) Include(v bool) bool {
|
|||
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *TorrentPriorityFilter) Include(v types.PiecePriority) bool {
|
||||
if f == nil {
|
||||
return true
|
||||
} else if f.Eq != nil {
|
||||
return v == *f.Eq
|
||||
} else if f.Gt != nil {
|
||||
return v > *f.Gt
|
||||
} else if f.Gte != nil {
|
||||
return v >= *f.Gte
|
||||
} else if f.Lt != nil {
|
||||
return v < *f.Lt
|
||||
} else if f.Lte != nil {
|
||||
return v <= *f.Lte
|
||||
} else if f.In != nil {
|
||||
return slices.Contains(f.In, v)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -29,24 +29,11 @@ func MapPeerSource(source atorrent.PeerSource) string {
|
|||
}
|
||||
|
||||
func MapTorrent(ctx context.Context, t *torrent.Controller) (*Torrent, error) {
|
||||
downloading := false
|
||||
files, err := t.Files(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, file := range files {
|
||||
if file.Priority() > atorrent.PiecePriorityNone && file.BytesCompleted() < file.Length() {
|
||||
downloading = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return &Torrent{
|
||||
Infohash: t.InfoHash(),
|
||||
Name: t.Name(),
|
||||
BytesCompleted: t.BytesCompleted(),
|
||||
BytesMissing: t.BytesMissing(),
|
||||
T: t,
|
||||
Downloading: downloading,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"git.kmsign.ru/royalcat/tstor/src/vfs"
|
||||
torrent1 "github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/types"
|
||||
)
|
||||
|
||||
type Dir interface {
|
||||
|
@ -176,13 +177,23 @@ type Torrent struct {
|
|||
BytesCompleted int64 `json:"bytesCompleted"`
|
||||
TorrentFilePath string `json:"torrentFilePath"`
|
||||
BytesMissing int64 `json:"bytesMissing"`
|
||||
Priority types.PiecePriority `json:"priority"`
|
||||
Files []*TorrentFile `json:"files"`
|
||||
ExcludedFiles []*TorrentFile `json:"excludedFiles"`
|
||||
Peers []*TorrentPeer `json:"peers"`
|
||||
Downloading bool `json:"downloading"`
|
||||
T *torrent.Controller `json:"-"`
|
||||
}
|
||||
|
||||
type TorrentDaemonMutation struct {
|
||||
ValidateTorrent bool `json:"validateTorrent"`
|
||||
SetTorrentPriority bool `json:"setTorrentPriority"`
|
||||
Cleanup *CleanupResponse `json:"cleanup"`
|
||||
}
|
||||
|
||||
type TorrentDaemonQuery struct {
|
||||
Torrents []*Torrent `json:"torrents"`
|
||||
}
|
||||
|
||||
type TorrentFs struct {
|
||||
Name string `json:"name"`
|
||||
Torrent *Torrent `json:"torrent"`
|
||||
|
@ -238,6 +249,15 @@ type TorrentPeer struct {
|
|||
F *torrent1.PeerConn `json:"-"`
|
||||
}
|
||||
|
||||
type TorrentPriorityFilter struct {
|
||||
Eq *types.PiecePriority `json:"eq,omitempty"`
|
||||
Gt *types.PiecePriority `json:"gt,omitempty"`
|
||||
Lt *types.PiecePriority `json:"lt,omitempty"`
|
||||
Gte *types.PiecePriority `json:"gte,omitempty"`
|
||||
Lte *types.PiecePriority `json:"lte,omitempty"`
|
||||
In []types.PiecePriority `json:"in,omitempty"`
|
||||
}
|
||||
|
||||
type TorrentProgress struct {
|
||||
Torrent *Torrent `json:"torrent"`
|
||||
Current int64 `json:"current"`
|
||||
|
@ -249,10 +269,10 @@ func (this TorrentProgress) GetCurrent() int64 { return this.Current }
|
|||
func (this TorrentProgress) GetTotal() int64 { return this.Total }
|
||||
|
||||
type TorrentsFilter struct {
|
||||
Infohash *StringFilter `json:"infohash,omitempty"`
|
||||
Name *StringFilter `json:"name,omitempty"`
|
||||
BytesCompleted *IntFilter `json:"bytesCompleted,omitempty"`
|
||||
BytesMissing *IntFilter `json:"bytesMissing,omitempty"`
|
||||
PeersCount *IntFilter `json:"peersCount,omitempty"`
|
||||
Downloading *BooleanFilter `json:"downloading,omitempty"`
|
||||
Infohash *StringFilter `json:"infohash,omitempty"`
|
||||
Name *StringFilter `json:"name,omitempty"`
|
||||
BytesCompleted *IntFilter `json:"bytesCompleted,omitempty"`
|
||||
BytesMissing *IntFilter `json:"bytesMissing,omitempty"`
|
||||
PeersCount *IntFilter `json:"peersCount,omitempty"`
|
||||
Priority *TorrentPriorityFilter `json:"priority,omitempty"`
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package resolver
|
|||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.45
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.49
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
|
@ -2,7 +2,7 @@ package resolver
|
|||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.45
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.49
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -11,84 +11,14 @@ import (
|
|||
"os"
|
||||
pathlib "path"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/uuid"
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
"github.com/99designs/gqlgen/graphql"
|
||||
aih "github.com/anacrolix/torrent/types/infohash"
|
||||
)
|
||||
|
||||
// ValidateTorrents is the resolver for the validateTorrents field.
|
||||
func (r *mutationResolver) ValidateTorrents(ctx context.Context, filter model.TorrentFilter) (bool, error) {
|
||||
if filter.Infohash != nil {
|
||||
t, err := r.Resolver.Service.GetTorrent(*filter.Infohash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if t == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
t.ValidateTorrent(ctx)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if filter.Everything != nil && *filter.Everything {
|
||||
torrents, err := r.Resolver.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, v := range torrents {
|
||||
if err := v.ValidateTorrent(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// CleanupTorrents is the resolver for the cleanupTorrents field.
|
||||
func (r *mutationResolver) CleanupTorrents(ctx context.Context, files *bool, dryRun bool) (*model.CleanupResponse, error) {
|
||||
torrents, err := r.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if files != nil && *files {
|
||||
r, err := r.Service.Storage.CleanupFiles(ctx, torrents, dryRun)
|
||||
return &model.CleanupResponse{
|
||||
Count: int64(len(r)),
|
||||
List: r,
|
||||
}, err
|
||||
} else {
|
||||
r, err := r.Service.Storage.CleanupDirs(ctx, torrents, dryRun)
|
||||
return &model.CleanupResponse{
|
||||
Count: int64(len(r)),
|
||||
List: r,
|
||||
}, err
|
||||
}
|
||||
}
|
||||
|
||||
// DownloadTorrent is the resolver for the downloadTorrent field.
|
||||
func (r *mutationResolver) DownloadTorrent(ctx context.Context, infohash string, file *string) (*model.DownloadTorrentResponse, error) {
|
||||
f := ""
|
||||
if file != nil {
|
||||
f = *file
|
||||
}
|
||||
|
||||
err := r.Service.Download(ctx, &torrent.DownloadTask{
|
||||
ID: uuid.New(),
|
||||
InfoHash: aih.FromHexString(infohash),
|
||||
File: f,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &model.DownloadTorrentResponse{}, nil
|
||||
// TorrentDaemon is the resolver for the torrentDaemon field.
|
||||
func (r *mutationResolver) TorrentDaemon(ctx context.Context) (*model.TorrentDaemonMutation, error) {
|
||||
return &model.TorrentDaemonMutation{}, nil
|
||||
}
|
||||
|
||||
// UploadFile is the resolver for the uploadFile field.
|
||||
|
|
|
@ -2,89 +2,18 @@ package resolver
|
|||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.45
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.49
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
)
|
||||
|
||||
// Torrents is the resolver for the torrents field.
|
||||
func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilter) ([]*model.Torrent, error) {
|
||||
torrents, err := r.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterFuncs := []func(torrent *model.Torrent) bool{}
|
||||
|
||||
if filter != nil {
|
||||
if filter.BytesCompleted != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.BytesCompleted.Include(torrent.BytesCompleted)
|
||||
})
|
||||
}
|
||||
if filter.BytesMissing != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.BytesMissing.Include(torrent.BytesMissing)
|
||||
})
|
||||
}
|
||||
if filter.PeersCount != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.PeersCount.Include(
|
||||
int64(len(torrent.T.Torrent().PeerConns())),
|
||||
)
|
||||
})
|
||||
}
|
||||
if filter.Infohash != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.Infohash.Include(
|
||||
torrent.Infohash,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
if filter.Downloading != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.Downloading.Include(
|
||||
torrent.Downloading,
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
filterFunc := func(torrent *model.Torrent) bool {
|
||||
for _, f := range filterFuncs {
|
||||
if !f(torrent) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
tr := []*model.Torrent{}
|
||||
for _, t := range torrents {
|
||||
d, err := model.MapTorrent(ctx, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !filterFunc(d) {
|
||||
continue
|
||||
}
|
||||
tr = append(tr, d)
|
||||
}
|
||||
|
||||
slices.SortStableFunc(torrents, func(t1, t2 *torrent.Controller) int {
|
||||
return strings.Compare(t1.Name(), t2.Name())
|
||||
})
|
||||
|
||||
return tr, nil
|
||||
// TorrentDaemon is the resolver for the torrentDaemon field.
|
||||
func (r *queryResolver) TorrentDaemon(ctx context.Context) (*model.TorrentDaemonQuery, error) {
|
||||
return &model.TorrentDaemonQuery{}, nil
|
||||
}
|
||||
|
||||
// FsEntry is the resolver for the fsEntry field.
|
||||
|
|
|
@ -2,7 +2,7 @@ package resolver
|
|||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.45
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.49
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
90
src/delivery/graphql/resolver/torrent_mutation.resolvers.go
Normal file
90
src/delivery/graphql/resolver/torrent_mutation.resolvers.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
package resolver
|
||||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.49
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
"github.com/anacrolix/torrent/types"
|
||||
)
|
||||
|
||||
// ValidateTorrent is the resolver for the validateTorrent field.
|
||||
func (r *torrentDaemonMutationResolver) ValidateTorrent(ctx context.Context, obj *model.TorrentDaemonMutation, filter model.TorrentFilter) (bool, error) {
|
||||
if filter.Infohash != nil {
|
||||
t, err := r.Resolver.Service.GetTorrent(*filter.Infohash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if t == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
t.ValidateTorrent(ctx)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if filter.Everything != nil && *filter.Everything {
|
||||
torrents, err := r.Resolver.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, v := range torrents {
|
||||
if err := v.ValidateTorrent(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// SetTorrentPriority is the resolver for the setTorrentPriority field.
|
||||
func (r *torrentDaemonMutationResolver) SetTorrentPriority(ctx context.Context, obj *model.TorrentDaemonMutation, infohash string, file *string, priority types.PiecePriority) (bool, error) {
|
||||
t, err := r.Resolver.Service.GetTorrent(infohash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if t == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
err = t.SetPriority(ctx, file, priority)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Cleanup is the resolver for the cleanup field.
|
||||
func (r *torrentDaemonMutationResolver) Cleanup(ctx context.Context, obj *model.TorrentDaemonMutation, files *bool, dryRun bool) (*model.CleanupResponse, error) {
|
||||
torrents, err := r.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if files != nil && *files {
|
||||
r, err := r.Service.Storage.CleanupFiles(ctx, torrents, dryRun)
|
||||
return &model.CleanupResponse{
|
||||
Count: int64(len(r)),
|
||||
List: r,
|
||||
}, err
|
||||
} else {
|
||||
r, err := r.Service.Storage.CleanupDirs(ctx, torrents, dryRun)
|
||||
return &model.CleanupResponse{
|
||||
Count: int64(len(r)),
|
||||
List: r,
|
||||
}, err
|
||||
}
|
||||
}
|
||||
|
||||
// TorrentDaemonMutation returns graph.TorrentDaemonMutationResolver implementation.
|
||||
func (r *Resolver) TorrentDaemonMutation() graph.TorrentDaemonMutationResolver {
|
||||
return &torrentDaemonMutationResolver{r}
|
||||
}
|
||||
|
||||
type torrentDaemonMutationResolver struct{ *Resolver }
|
94
src/delivery/graphql/resolver/torrent_query.resolvers.go
Normal file
94
src/delivery/graphql/resolver/torrent_query.resolvers.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package resolver
|
||||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.49
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
"git.kmsign.ru/royalcat/tstor/src/sources/torrent"
|
||||
)
|
||||
|
||||
// Torrents is the resolver for the torrents field.
|
||||
func (r *torrentDaemonQueryResolver) Torrents(ctx context.Context, obj *model.TorrentDaemonQuery, filter *model.TorrentsFilter) ([]*model.Torrent, error) {
|
||||
torrents, err := r.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterFuncs := []func(torrent *model.Torrent) bool{}
|
||||
|
||||
if filter != nil {
|
||||
if filter.BytesCompleted != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.BytesCompleted.Include(torrent.BytesCompleted)
|
||||
})
|
||||
}
|
||||
if filter.BytesMissing != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.BytesMissing.Include(torrent.BytesMissing)
|
||||
})
|
||||
}
|
||||
if filter.PeersCount != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.PeersCount.Include(
|
||||
int64(len(torrent.T.Torrent().PeerConns())),
|
||||
)
|
||||
})
|
||||
}
|
||||
if filter.Infohash != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.Infohash.Include(
|
||||
torrent.Infohash,
|
||||
)
|
||||
})
|
||||
}
|
||||
if filter.Priority != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.Priority.Include(
|
||||
torrent.Priority,
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
filterFunc := func(torrent *model.Torrent) bool {
|
||||
for _, f := range filterFuncs {
|
||||
if !f(torrent) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
tr := []*model.Torrent{}
|
||||
for _, t := range torrents {
|
||||
d, err := model.MapTorrent(ctx, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !filterFunc(d) {
|
||||
continue
|
||||
}
|
||||
tr = append(tr, d)
|
||||
}
|
||||
|
||||
slices.SortStableFunc(torrents, func(t1, t2 *torrent.Controller) int {
|
||||
return strings.Compare(t1.Name(), t2.Name())
|
||||
})
|
||||
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
// TorrentDaemonQuery returns graph.TorrentDaemonQueryResolver implementation.
|
||||
func (r *Resolver) TorrentDaemonQuery() graph.TorrentDaemonQueryResolver {
|
||||
return &torrentDaemonQueryResolver{r}
|
||||
}
|
||||
|
||||
type torrentDaemonQueryResolver struct{ *Resolver }
|
|
@ -2,13 +2,14 @@ package resolver
|
|||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.45
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.49
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
"github.com/anacrolix/torrent/types"
|
||||
)
|
||||
|
||||
// Name is the resolver for the name field.
|
||||
|
@ -16,6 +17,11 @@ func (r *torrentResolver) Name(ctx context.Context, obj *model.Torrent) (string,
|
|||
return obj.T.Name(), nil
|
||||
}
|
||||
|
||||
// Priority is the resolver for the priority field.
|
||||
func (r *torrentResolver) Priority(ctx context.Context, obj *model.Torrent) (types.PiecePriority, error) {
|
||||
return obj.T.Priority(ctx, nil)
|
||||
}
|
||||
|
||||
// Files is the resolver for the files field.
|
||||
func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) {
|
||||
out := []*model.TorrentFile{}
|
|
@ -18,13 +18,19 @@ import (
|
|||
"github.com/ravilushqa/otelgqlgen"
|
||||
)
|
||||
|
||||
func noopDirective(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) {
|
||||
return next(ctx)
|
||||
}
|
||||
|
||||
func GraphQLHandler(service *torrent.Daemon, vfs vfs.Filesystem) http.Handler {
|
||||
graphqlHandler := handler.NewDefaultServer(
|
||||
graph.NewExecutableSchema(
|
||||
graph.Config{
|
||||
Resolvers: &resolver.Resolver{Service: service, VFS: vfs},
|
||||
Directives: graph.DirectiveRoot{
|
||||
OneOf: graph.OneOf,
|
||||
OneOf: graph.OneOf,
|
||||
Resolver: noopDirective,
|
||||
Stream: noopDirective,
|
||||
},
|
||||
},
|
||||
),
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
|
||||
|
@ -11,14 +11,14 @@ import (
|
|||
|
||||
func BadgerLogger(name ...string) badger.Logger {
|
||||
return &badgerLogger{
|
||||
L: rlog.Component(append(name, "badger")...).Slog(),
|
||||
L: rlog.Component(append([]string{"badger"}, name...)...).Nested(2),
|
||||
}
|
||||
}
|
||||
|
||||
var _ badger.Logger = (*badgerLogger)(nil)
|
||||
|
||||
type badgerLogger struct {
|
||||
L *slog.Logger
|
||||
L *rlog.Logger
|
||||
}
|
||||
|
||||
func fmtBadgerLog(m string, f ...any) string {
|
||||
|
@ -26,17 +26,21 @@ func fmtBadgerLog(m string, f ...any) string {
|
|||
}
|
||||
|
||||
func (l *badgerLogger) Errorf(m string, f ...interface{}) {
|
||||
l.L.Error(fmtBadgerLog(m, f...))
|
||||
ctx := context.Background()
|
||||
l.L.Error(ctx, fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
||||
func (l *badgerLogger) Warningf(m string, f ...interface{}) {
|
||||
l.L.Warn(fmtBadgerLog(m, f...))
|
||||
ctx := context.Background()
|
||||
l.L.Warn(ctx, fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
||||
func (l *badgerLogger) Infof(m string, f ...interface{}) {
|
||||
l.L.Info(fmtBadgerLog(m, f...))
|
||||
ctx := context.Background()
|
||||
l.L.Info(ctx, fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
||||
func (l *badgerLogger) Debugf(m string, f ...interface{}) {
|
||||
l.L.Debug(fmtBadgerLog(m, f...))
|
||||
ctx := context.Background()
|
||||
l.L.Debug(ctx, fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
|
|
@ -111,6 +111,21 @@ func (s *Controller) Files(ctx context.Context) ([]*torrent.File, error) {
|
|||
return files, nil
|
||||
}
|
||||
|
||||
func (s *Controller) GetFile(ctx context.Context, file string) (*torrent.File, error) {
|
||||
files, err := s.Files(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, v := range files {
|
||||
if v.Path() == file {
|
||||
return v, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func Map[T, U any](ts []T, f func(T) U) []U {
|
||||
us := make([]U, len(ts))
|
||||
for i := range ts {
|
||||
|
@ -166,18 +181,79 @@ func (s *Controller) ValidateTorrent(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) SetFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error {
|
||||
log := c.log.With(slog.String("file", file.Path()), slog.Int("priority", int(priority)))
|
||||
log.Info(ctx, "set pritority for file")
|
||||
func (c *Controller) SetPriority(ctx context.Context, filePath *string, priority types.PiecePriority) error {
|
||||
log := c.log.With(slog.Int("priority", int(priority)))
|
||||
|
||||
if filePath != nil {
|
||||
file, err := c.GetFile(ctx, *filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if file == nil {
|
||||
log.Error(ctx, "file not found")
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.setFilePriority(ctx, file, priority)
|
||||
}
|
||||
|
||||
for _, f := range c.t.Files() {
|
||||
err := c.setFilePriority(ctx, f, priority)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
const defaultPriority = types.PiecePriorityNone
|
||||
|
||||
func (c *Controller) Priority(ctx context.Context, filePath *string) (types.PiecePriority, error) {
|
||||
if filePath == nil {
|
||||
prio := defaultPriority
|
||||
err := c.fileProperties.Range(ctx, func(filePath string, v FileProperties) error {
|
||||
if filePath == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if v.Priority > prio {
|
||||
prio = v.Priority
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == kv.ErrKeyNotFound {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return prio, err
|
||||
}
|
||||
|
||||
props, err := c.fileProperties.Get(ctx, *filePath)
|
||||
if err != nil {
|
||||
if err == kv.ErrKeyNotFound {
|
||||
return defaultPriority, nil
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return props.Priority, nil
|
||||
}
|
||||
func (c *Controller) setFilePriority(ctx context.Context, file *torrent.File, priority types.PiecePriority) error {
|
||||
err := c.fileProperties.Edit(ctx, file.Path(), func(ctx context.Context, v FileProperties) (FileProperties, error) {
|
||||
v.Priority = priority
|
||||
return v, nil
|
||||
})
|
||||
if err != nil {
|
||||
if err == kv.ErrKeyNotFound {
|
||||
err := c.fileProperties.Set(ctx, file.Path(), FileProperties{Priority: priority})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
file.SetPriority(priority)
|
||||
return nil
|
||||
}
|
||||
|
@ -204,7 +280,6 @@ func (c *Controller) initializeTorrentPriories(ctx context.Context) error {
|
|||
}
|
||||
|
||||
file.SetPriority(props.Priority)
|
||||
|
||||
}
|
||||
|
||||
log.Info(ctx, "torrent initialization complete", slog.String("infohash", c.InfoHash()), slog.String("torrent_name", c.Name()))
|
||||
|
|
|
@ -43,21 +43,26 @@ func newPieceCompletion(dir string) (storage.PieceCompletion, error) {
|
|||
return &badgerPieceCompletion{db}, nil
|
||||
}
|
||||
|
||||
const delimeter rune = 0x1F
|
||||
|
||||
func pkToBytes(pk metainfo.PieceKey) []byte {
|
||||
key := make([]byte, len(pk.InfoHash.Bytes()))
|
||||
copy(key, pk.InfoHash.Bytes())
|
||||
binary.BigEndian.AppendUint32(key, uint32(pk.Index))
|
||||
key := make([]byte, 0, len(pk.InfoHash.Bytes())+1+4)
|
||||
key = append(key, pk.InfoHash.Bytes()...)
|
||||
key = append(key, byte(delimeter))
|
||||
key = binary.BigEndian.AppendUint32(key, uint32(pk.Index))
|
||||
return key
|
||||
}
|
||||
|
||||
func (k *badgerPieceCompletion) Get(pk metainfo.PieceKey) (storage.Completion, error) {
|
||||
completion := storage.Completion{
|
||||
Ok: true,
|
||||
Complete: false,
|
||||
Ok: false,
|
||||
}
|
||||
err := k.db.View(func(tx *badger.Txn) error {
|
||||
item, err := tx.Get(pkToBytes(pk))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
completion.Complete = false
|
||||
completion.Ok = false
|
||||
return nil
|
||||
}
|
||||
|
@ -71,11 +76,12 @@ func (k *badgerPieceCompletion) Get(pk metainfo.PieceKey) (storage.Completion, e
|
|||
}
|
||||
compl := PieceCompletionState(valCopy[0])
|
||||
|
||||
completion.Ok = true
|
||||
switch compl {
|
||||
case PieceComplete:
|
||||
completion.Ok = true
|
||||
completion.Complete = true
|
||||
case PieceNotComplete:
|
||||
completion.Ok = true
|
||||
completion.Complete = false
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,12 @@ func setupStorage(cfg config.TorrentClient) (*fileStorage, storage.PieceCompleti
|
|||
if err := os.MkdirAll(pcp, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
|
||||
// pc, err := storage.NewBoltPieceCompletion(pcp)
|
||||
// if err != nil {
|
||||
// return nil, nil, err
|
||||
// }
|
||||
|
||||
pc, err := newPieceCompletion(pcp)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -22,34 +21,38 @@ import (
|
|||
// OpenTorrent implements storage.ClientImplCloser.
|
||||
func (me *fileStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
|
||||
ctx := context.Background()
|
||||
log := me.log
|
||||
log := me.log.With(slog.String("infohash", infoHash.HexString()))
|
||||
|
||||
dir := torrentDir(me.baseDir, infoHash)
|
||||
legacyDir := filepath.Join(me.baseDir, info.Name)
|
||||
// dir := torrentDir(me.baseDir, infoHash)
|
||||
// legacyDir := filepath.Join(me.baseDir, info.Name)
|
||||
|
||||
log = log.With(slog.String("legacy_dir", legacyDir), slog.String("dir", dir))
|
||||
if _, err := os.Stat(legacyDir); err == nil {
|
||||
log.Warn(ctx, "legacy torrent dir found, renaming", slog.String("dir", dir))
|
||||
err = os.Rename(legacyDir, dir)
|
||||
if err != nil {
|
||||
return storage.TorrentImpl{}, fmt.Errorf("error renaming legacy torrent dir: %w", err)
|
||||
}
|
||||
// log = log.With(slog.String("legacy_dir", legacyDir), slog.String("dir", dir))
|
||||
// if _, err := os.Stat(legacyDir); err == nil {
|
||||
// log.Warn(ctx, "legacy torrent dir found, renaming", slog.String("dir", dir))
|
||||
// err = os.Rename(legacyDir, dir)
|
||||
// if err != nil {
|
||||
// return storage.TorrentImpl{}, fmt.Errorf("error renaming legacy torrent dir: %w", err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// if _, err := os.Stat(dir); errors.Is(err, fs.ErrNotExist) {
|
||||
// log.Info(ctx, "new torrent, trying copy files from existing")
|
||||
// dups := me.dupIndex.Includes(infoHash, info.Files)
|
||||
|
||||
// for _, dup := range dups {
|
||||
// err := me.copyDup(ctx, infoHash, dup)
|
||||
// if err != nil {
|
||||
// log.Error(ctx, "error copying file", slog.String("file", dup.fileinfo.DisplayPath(info)), rlog.Error(err))
|
||||
// }
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
impl, err := me.client.OpenTorrent(info, infoHash)
|
||||
if err != nil {
|
||||
log.Error(ctx, "error opening torrent", rlog.Error(err))
|
||||
}
|
||||
|
||||
if _, err := os.Stat(dir); errors.Is(err, fs.ErrNotExist) {
|
||||
log.Info(ctx, "new torrent, trying copy files from existing")
|
||||
dups := me.dupIndex.Includes(infoHash, info.Files)
|
||||
|
||||
for _, dup := range dups {
|
||||
err := me.copyDup(ctx, infoHash, dup)
|
||||
if err != nil {
|
||||
log.Error(ctx, "error copying file", slog.String("file", dup.fileinfo.DisplayPath(info)), rlog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return me.client.OpenTorrent(info, infoHash)
|
||||
return impl, err
|
||||
}
|
||||
|
||||
func (me *fileStorage) copyDup(ctx context.Context, infoHash infohash.T, dup dupInfo) error {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue