wip
This commit is contained in:
parent
2cefb9db98
commit
b97dcc8d8f
52 changed files with 7570 additions and 555 deletions
src
delivery
export
host
controller
filestorage
service
storage
store
vfs
http
log
proto
5741
src/delivery/graphql/generated.go
Normal file
5741
src/delivery/graphql/generated.go
Normal file
File diff suppressed because it is too large
Load diff
21
src/delivery/graphql/model/filter.go
Normal file
21
src/delivery/graphql/model/filter.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package model
|
||||
|
||||
import "slices"
|
||||
|
||||
func (f *IntFilter) IsValid(v int64) bool {
|
||||
if f.Eq != nil {
|
||||
return v == *f.Eq
|
||||
} else if f.Gt != nil {
|
||||
return v > *f.Gt
|
||||
} else if f.Gte != nil {
|
||||
return v >= *f.Gte
|
||||
} else if f.Lt != nil {
|
||||
return v < *f.Lt
|
||||
} else if f.Lte != nil {
|
||||
return v <= *f.Lte
|
||||
} else if f.In != nil {
|
||||
return slices.Contains(f.In, v)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
24
src/delivery/graphql/model/mappers.go
Normal file
24
src/delivery/graphql/model/mappers.go
Normal file
|
@ -0,0 +1,24 @@
|
|||
package model
|
||||
|
||||
import "github.com/anacrolix/torrent"
|
||||
|
||||
func MapPeerSource(source torrent.PeerSource) string {
|
||||
switch source {
|
||||
case torrent.PeerSourceDirect:
|
||||
return "Direct"
|
||||
case torrent.PeerSourceUtHolepunch:
|
||||
return "Ut Holepunch"
|
||||
case torrent.PeerSourceDhtAnnouncePeer:
|
||||
return "DHT Announce"
|
||||
case torrent.PeerSourceDhtGetPeers:
|
||||
return "DHT"
|
||||
case torrent.PeerSourceIncoming:
|
||||
return "Incoming"
|
||||
case torrent.PeerSourceTracker:
|
||||
return "Tracker"
|
||||
case torrent.PeerSourcePex:
|
||||
return "PEX"
|
||||
default:
|
||||
return "Unknown"
|
||||
}
|
||||
}
|
93
src/delivery/graphql/model/models_gen.go
Normal file
93
src/delivery/graphql/model/models_gen.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
// Code generated by github.com/99designs/gqlgen, DO NOT EDIT.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/controller"
|
||||
"github.com/anacrolix/torrent"
|
||||
)
|
||||
|
||||
type BooleanFilter struct {
|
||||
Eq *bool `json:"eq,omitempty"`
|
||||
}
|
||||
|
||||
type DateTimeFilter struct {
|
||||
Eq *time.Time `json:"eq,omitempty"`
|
||||
Gt *time.Time `json:"gt,omitempty"`
|
||||
Lt *time.Time `json:"lt,omitempty"`
|
||||
Gte *time.Time `json:"gte,omitempty"`
|
||||
Lte *time.Time `json:"lte,omitempty"`
|
||||
}
|
||||
|
||||
type IntFilter struct {
|
||||
Eq *int64 `json:"eq,omitempty"`
|
||||
Gt *int64 `json:"gt,omitempty"`
|
||||
Lt *int64 `json:"lt,omitempty"`
|
||||
Gte *int64 `json:"gte,omitempty"`
|
||||
Lte *int64 `json:"lte,omitempty"`
|
||||
In []int64 `json:"in,omitempty"`
|
||||
}
|
||||
|
||||
type Mutation struct {
|
||||
}
|
||||
|
||||
type Pagination struct {
|
||||
Offset int64 `json:"offset"`
|
||||
Limit int64 `json:"limit"`
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
}
|
||||
|
||||
type Schema struct {
|
||||
Query *Query `json:"query,omitempty"`
|
||||
Mutation *Mutation `json:"mutation,omitempty"`
|
||||
}
|
||||
|
||||
type StringFilter struct {
|
||||
Eq *string `json:"eq,omitempty"`
|
||||
Substr *string `json:"substr,omitempty"`
|
||||
In []string `json:"in,omitempty"`
|
||||
}
|
||||
|
||||
type Torrent struct {
|
||||
Name string `json:"name"`
|
||||
Infohash string `json:"infohash"`
|
||||
BytesCompleted int64 `json:"bytesCompleted"`
|
||||
TorrentFilePath string `json:"torrentFilePath"`
|
||||
BytesMissing int64 `json:"bytesMissing"`
|
||||
Files []*TorrentFile `json:"files"`
|
||||
ExcludedFiles []*TorrentFile `json:"excludedFiles"`
|
||||
Peers []*TorrentPeer `json:"peers"`
|
||||
T *controller.Torrent `json:"-"`
|
||||
}
|
||||
|
||||
type TorrentFile struct {
|
||||
Filename string `json:"filename"`
|
||||
Size int64 `json:"size"`
|
||||
BytesCompleted int64 `json:"bytesCompleted"`
|
||||
F *torrent.File `json:"-"`
|
||||
}
|
||||
|
||||
type TorrentFilter struct {
|
||||
Everything *bool `json:"everything,omitempty"`
|
||||
Infohash *string `json:"infohash,omitempty"`
|
||||
}
|
||||
|
||||
type TorrentPeer struct {
|
||||
IP string `json:"ip"`
|
||||
DownloadRate float64 `json:"downloadRate"`
|
||||
Discovery string `json:"discovery"`
|
||||
Port int64 `json:"port"`
|
||||
ClientName string `json:"clientName"`
|
||||
F *torrent.PeerConn `json:"-"`
|
||||
}
|
||||
|
||||
type TorrentsFilter struct {
|
||||
Name *StringFilter `json:"name,omitempty"`
|
||||
BytesCompleted *IntFilter `json:"bytesCompleted,omitempty"`
|
||||
BytesMissing *IntFilter `json:"bytesMissing,omitempty"`
|
||||
PeersCount *IntFilter `json:"peersCount,omitempty"`
|
||||
}
|
28
src/delivery/graphql/oneof.go
Normal file
28
src/delivery/graphql/oneof.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package graph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/99designs/gqlgen/graphql"
|
||||
)
|
||||
|
||||
func OneOf(ctx context.Context, obj interface{}, next graphql.Resolver) (res interface{}, err error) {
|
||||
wasValue := false
|
||||
m, ok := obj.(map[string]any)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("OneOf error, unknow object type: %T", obj)
|
||||
}
|
||||
|
||||
for k, v := range m {
|
||||
if v != nil {
|
||||
if !wasValue {
|
||||
wasValue = true
|
||||
} else {
|
||||
return nil, fmt.Errorf("OneOf with multiple fields: %s", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return next(ctx)
|
||||
}
|
64
src/delivery/graphql/resolver/mutation.resolvers.go
Normal file
64
src/delivery/graphql/resolver/mutation.resolvers.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package resolver
|
||||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.43
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
)
|
||||
|
||||
// ValidateTorrents is the resolver for the validateTorrents field.
|
||||
func (r *mutationResolver) ValidateTorrents(ctx context.Context, filter model.TorrentFilter) (bool, error) {
|
||||
if filter.Infohash != nil {
|
||||
t, err := r.Resolver.Service.GetTorrent(*filter.Infohash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if t == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
t.ValidateTorrent()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if filter.Everything != nil && *filter.Everything {
|
||||
torrents, err := r.Resolver.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, v := range torrents {
|
||||
if err := v.ValidateTorrent(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// CleanupTorrents is the resolver for the cleanupTorrents field.
|
||||
func (r *mutationResolver) CleanupTorrents(ctx context.Context, files *bool, dryRun bool) (int64, error) {
|
||||
torrents, err := r.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if files != nil && *files {
|
||||
r, err := r.Service.Storage.CleanupFiles(ctx, torrents, dryRun)
|
||||
return int64(r), err
|
||||
} else {
|
||||
r, err := r.Service.Storage.CleanupDirs(ctx, torrents, dryRun)
|
||||
return int64(r), err
|
||||
}
|
||||
}
|
||||
|
||||
// Mutation returns graph.MutationResolver implementation.
|
||||
func (r *Resolver) Mutation() graph.MutationResolver { return &mutationResolver{r} }
|
||||
|
||||
type mutationResolver struct{ *Resolver }
|
75
src/delivery/graphql/resolver/query.resolvers.go
Normal file
75
src/delivery/graphql/resolver/query.resolvers.go
Normal file
|
@ -0,0 +1,75 @@
|
|||
package resolver
|
||||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.43
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
)
|
||||
|
||||
// Torrents is the resolver for the torrents field.
|
||||
func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilter, pagination *model.Pagination) ([]*model.Torrent, error) {
|
||||
torrents, err := r.Service.ListTorrents(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterFuncs := []func(torrent *model.Torrent) bool{}
|
||||
|
||||
if filter != nil {
|
||||
if filter.BytesCompleted != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.BytesCompleted.IsValid(torrent.BytesCompleted)
|
||||
})
|
||||
}
|
||||
if filter.BytesMissing != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.BytesMissing.IsValid(torrent.BytesMissing)
|
||||
})
|
||||
}
|
||||
if filter.PeersCount != nil {
|
||||
filterFuncs = append(filterFuncs, func(torrent *model.Torrent) bool {
|
||||
return filter.PeersCount.IsValid(
|
||||
int64(len(torrent.T.Torrent().PeerConns())),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
filterFunc := func(torrent *model.Torrent) bool {
|
||||
for _, f := range filterFuncs {
|
||||
if !f(torrent) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
tr := []*model.Torrent{}
|
||||
for _, t := range torrents {
|
||||
d := &model.Torrent{
|
||||
Infohash: t.InfoHash(),
|
||||
Name: t.Name(),
|
||||
BytesCompleted: t.BytesCompleted(),
|
||||
BytesMissing: t.BytesMissing(),
|
||||
T: t,
|
||||
}
|
||||
|
||||
if !filterFunc(d) {
|
||||
continue
|
||||
}
|
||||
tr = append(tr, d)
|
||||
}
|
||||
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
// Query returns graph.QueryResolver implementation.
|
||||
func (r *Resolver) Query() graph.QueryResolver { return &queryResolver{r} }
|
||||
|
||||
type queryResolver struct{ *Resolver }
|
11
src/delivery/graphql/resolver/resolver.go
Normal file
11
src/delivery/graphql/resolver/resolver.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package resolver
|
||||
|
||||
import "git.kmsign.ru/royalcat/tstor/src/host/service"
|
||||
|
||||
// This file will not be regenerated automatically.
|
||||
//
|
||||
// It serves as dependency injection for your app, add any dependencies you require here.
|
||||
|
||||
type Resolver struct {
|
||||
Service *service.Service
|
||||
}
|
73
src/delivery/graphql/resolver/torrent.resolvers.go
Normal file
73
src/delivery/graphql/resolver/torrent.resolvers.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package resolver
|
||||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.43
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
|
||||
)
|
||||
|
||||
// Name is the resolver for the name field.
|
||||
func (r *torrentResolver) Name(ctx context.Context, obj *model.Torrent) (string, error) {
|
||||
return obj.T.Name(), nil
|
||||
}
|
||||
|
||||
// Files is the resolver for the files field.
|
||||
func (r *torrentResolver) Files(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) {
|
||||
out := []*model.TorrentFile{}
|
||||
files, err := obj.T.Files()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, f := range files {
|
||||
out = append(out, &model.TorrentFile{
|
||||
Filename: f.DisplayPath(),
|
||||
Size: f.Length(),
|
||||
BytesCompleted: f.BytesCompleted(),
|
||||
F: f,
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ExcludedFiles is the resolver for the excludedFiles field.
|
||||
func (r *torrentResolver) ExcludedFiles(ctx context.Context, obj *model.Torrent) ([]*model.TorrentFile, error) {
|
||||
out := []*model.TorrentFile{}
|
||||
files, err := obj.T.ExcludedFiles()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, f := range files {
|
||||
out = append(out, &model.TorrentFile{
|
||||
Filename: f.DisplayPath(),
|
||||
Size: f.Length(),
|
||||
F: f,
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Peers is the resolver for the peers field.
|
||||
func (r *torrentResolver) Peers(ctx context.Context, obj *model.Torrent) ([]*model.TorrentPeer, error) {
|
||||
peers := []*model.TorrentPeer{}
|
||||
for _, peer := range obj.T.Torrent().PeerConns() {
|
||||
peers = append(peers, &model.TorrentPeer{
|
||||
IP: peer.RemoteAddr.String(),
|
||||
DownloadRate: peer.DownloadRate(),
|
||||
Discovery: model.MapPeerSource(peer.Discovery),
|
||||
Port: int64(peer.PeerListenPort),
|
||||
ClientName: peer.PeerClientName.Load().(string),
|
||||
F: peer,
|
||||
})
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
// Torrent returns graph.TorrentResolver implementation.
|
||||
func (r *Resolver) Torrent() graph.TorrentResolver { return &torrentResolver{r} }
|
||||
|
||||
type torrentResolver struct{ *Resolver }
|
35
src/delivery/router.go
Normal file
35
src/delivery/router.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package delivery
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/service"
|
||||
"github.com/99designs/gqlgen/graphql/handler"
|
||||
"github.com/99designs/gqlgen/graphql/handler/extension"
|
||||
"github.com/99designs/gqlgen/graphql/handler/lru"
|
||||
"github.com/99designs/gqlgen/graphql/handler/transport"
|
||||
)
|
||||
|
||||
func GraphQLHandler(service *service.Service) http.Handler {
|
||||
graphqlHandler := handler.NewDefaultServer(
|
||||
graph.NewExecutableSchema(
|
||||
graph.Config{
|
||||
Resolvers: &resolver.Resolver{Service: service},
|
||||
Directives: graph.DirectiveRoot{
|
||||
OneOf: graph.OneOf,
|
||||
},
|
||||
},
|
||||
),
|
||||
)
|
||||
graphqlHandler.AddTransport(&transport.POST{})
|
||||
graphqlHandler.AddTransport(&transport.Websocket{})
|
||||
graphqlHandler.AddTransport(&transport.SSE{})
|
||||
graphqlHandler.AddTransport(&transport.UrlEncodedForm{})
|
||||
graphqlHandler.SetQueryCache(lru.New(1000))
|
||||
graphqlHandler.Use(extension.Introspection{})
|
||||
graphqlHandler.Use(extension.AutomaticPersistedQuery{Cache: lru.New(100)})
|
||||
|
||||
return graphqlHandler
|
||||
}
|
|
@ -3,14 +3,13 @@
|
|||
package fuse
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/billziss-gh/cgofuse/fuse"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
|
@ -18,16 +17,20 @@ type Handler struct {
|
|||
path string
|
||||
|
||||
host *fuse.FileSystemHost
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
func NewHandler(fuseAllowOther bool, path string) *Handler {
|
||||
return &Handler{
|
||||
fuseAllowOther: fuseAllowOther,
|
||||
path: path,
|
||||
log: slog.With("component", "fuse-handler").With("path", path),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Handler) Mount(vfs vfs.Filesystem) error {
|
||||
log := s.log.With("function", "Mount")
|
||||
|
||||
folder := s.path
|
||||
// On windows, the folder must don't exist
|
||||
if runtime.GOOS == "windows" {
|
||||
|
@ -52,18 +55,20 @@ func (s *Handler) Mount(vfs vfs.Filesystem) error {
|
|||
|
||||
ok := host.Mount(s.path, config)
|
||||
if !ok {
|
||||
log.Error().Str("path", s.path).Msg("error trying to mount filesystem")
|
||||
log.Error("error trying to mount filesystem")
|
||||
}
|
||||
}()
|
||||
|
||||
s.host = host
|
||||
|
||||
log.Info().Str("path", s.path).Msg("starting FUSE mount")
|
||||
log.Info("starting FUSE mount", "path", s.path)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Handler) Unmount() {
|
||||
log := s.log.With("function", "Unmount")
|
||||
|
||||
if s.host == nil {
|
||||
return
|
||||
}
|
||||
|
@ -71,6 +76,6 @@ func (s *Handler) Unmount() {
|
|||
ok := s.host.Unmount()
|
||||
if !ok {
|
||||
//TODO try to force unmount if possible
|
||||
log.Error().Str("path", s.path).Msg("unmount failed")
|
||||
log.Error("unmount failed")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,26 +5,24 @@ package fuse
|
|||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/billziss-gh/cgofuse/fuse"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type fuseFS struct {
|
||||
fuse.FileSystemBase
|
||||
fh *fileHandler
|
||||
|
||||
log zerolog.Logger
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
func newFuseFS(fs vfs.Filesystem) fuse.FileSystemInterface {
|
||||
l := log.Logger.With().Str("component", "fuse").Logger()
|
||||
l := slog.With("component", "fuse")
|
||||
return &fuseFS{
|
||||
fh: &fileHandler{fs: fs},
|
||||
log: l,
|
||||
|
@ -32,14 +30,16 @@ func newFuseFS(fs vfs.Filesystem) fuse.FileSystemInterface {
|
|||
}
|
||||
|
||||
func (fs *fuseFS) Open(path string, flags int) (errc int, fh uint64) {
|
||||
log := fs.log.With("function", "Open", "path", path, "flags", flags)
|
||||
|
||||
fh, err := fs.fh.OpenHolder(path)
|
||||
if os.IsNotExist(err) {
|
||||
fs.log.Debug().Str("path", path).Msg("file does not exists")
|
||||
log.Debug("file does not exists")
|
||||
return -fuse.ENOENT, fhNone
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
fs.log.Error().Err(err).Str("path", path).Msg("error opening file")
|
||||
log.Error("error opening file", "err", err)
|
||||
return -fuse.EIO, fhNone
|
||||
}
|
||||
|
||||
|
@ -57,6 +57,7 @@ func (fs *fuseFS) Opendir(path string) (errc int, fh uint64) {
|
|||
}
|
||||
|
||||
func (fs *fuseFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int) {
|
||||
log := fs.log.With("function", "Getattr", "path", path, "filehandler", fh)
|
||||
if path == "/" {
|
||||
stat.Mode = fuse.S_IFDIR | 0555
|
||||
return 0
|
||||
|
@ -64,12 +65,12 @@ func (fs *fuseFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int)
|
|||
|
||||
file, err := fs.fh.GetFile(path, fh)
|
||||
if os.IsNotExist(err) {
|
||||
fs.log.Debug().Str("path", path).Msg("file does not exists")
|
||||
log.Debug("file does not exists", "error", err)
|
||||
return -fuse.ENOENT
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
fs.log.Error().Err(err).Str("path", path).Msg("error getting holder when reading file attributes")
|
||||
log.Error("error getting holder when reading file attributes", "error", err)
|
||||
return -fuse.EIO
|
||||
}
|
||||
|
||||
|
@ -84,14 +85,15 @@ func (fs *fuseFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int)
|
|||
}
|
||||
|
||||
func (fs *fuseFS) Read(path string, dest []byte, off int64, fh uint64) int {
|
||||
log := fs.log.With("function", "Read", "path", path, "offset", off, "filehandler", fh)
|
||||
file, err := fs.fh.GetFile(path, fh)
|
||||
if os.IsNotExist(err) {
|
||||
fs.log.Error().Err(err).Str("path", path).Msg("file not found on READ operation")
|
||||
log.Error("file not found on READ operation", "path", path, "error", err)
|
||||
return -fuse.ENOENT
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
fs.log.Error().Err(err).Str("path", path).Msg("error getting holder reading data from file")
|
||||
fs.log.Error("error getting holder reading data from file", "path", path, "error", err)
|
||||
return -fuse.EIO
|
||||
}
|
||||
|
||||
|
@ -104,7 +106,7 @@ func (fs *fuseFS) Read(path string, dest []byte, off int64, fh uint64) int {
|
|||
|
||||
n, err := file.ReadAt(buf, off)
|
||||
if err != nil && err != io.EOF {
|
||||
log.Error().Err(err).Str("path", path).Msg("error reading data")
|
||||
log.Error("error reading data")
|
||||
return -fuse.EIO
|
||||
}
|
||||
|
||||
|
@ -113,8 +115,9 @@ func (fs *fuseFS) Read(path string, dest []byte, off int64, fh uint64) int {
|
|||
}
|
||||
|
||||
func (fs *fuseFS) Release(path string, fh uint64) int {
|
||||
log := fs.log.With("function", "Release", "path", path, "filehandler", fh)
|
||||
if err := fs.fh.Remove(fh); err != nil {
|
||||
fs.log.Error().Err(err).Str("path", path).Msg("error getting holder when releasing file")
|
||||
log.Error("error getting holder when releasing file", "path", path, "error", err)
|
||||
return -fuse.EIO
|
||||
}
|
||||
|
||||
|
@ -129,19 +132,20 @@ func (fs *fuseFS) Readdir(path string,
|
|||
fill func(name string, stat *fuse.Stat_t, ofst int64) bool,
|
||||
ofst int64,
|
||||
fh uint64) (errc int) {
|
||||
log := fs.log.With("function", "Readdir", "path", path, "offset", ofst, "filehandler", fh)
|
||||
fill(".", nil, 0)
|
||||
fill("..", nil, 0)
|
||||
|
||||
//TODO improve this function to make use of fh index if possible
|
||||
paths, err := fs.fh.ListDir(path)
|
||||
if err != nil {
|
||||
fs.log.Error().Err(err).Str("path", path).Msg("error reading directory")
|
||||
log.Error("error reading directory", "error", err)
|
||||
return -fuse.ENOSYS
|
||||
}
|
||||
|
||||
for _, p := range paths {
|
||||
if !fill(p, nil, 0) {
|
||||
fs.log.Error().Str("path", path).Msg("error adding directory")
|
||||
log.Error("error adding directory")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,16 @@
|
|||
package nfs
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"git.kmsign.ru/royalcat/tstor/src/log"
|
||||
zlog "github.com/rs/zerolog/log"
|
||||
nfs "github.com/willscott/go-nfs"
|
||||
nfshelper "github.com/willscott/go-nfs/helpers"
|
||||
)
|
||||
|
||||
func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
|
||||
nfslog := zlog.Logger.With().Str("component", "nfs").Logger()
|
||||
nfslog := slog.With("component", "nfs")
|
||||
nfs.SetLogger(log.NewNFSLog(nfslog))
|
||||
nfs.Log.SetLevel(nfs.InfoLevel)
|
||||
|
||||
|
|
|
@ -3,16 +3,16 @@ package nfs
|
|||
import (
|
||||
"errors"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/go-git/go-billy/v5"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type billyFsWrapper struct {
|
||||
fs vfs.Filesystem
|
||||
log zerolog.Logger
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
var _ billy.Filesystem = (*billyFsWrapper)(nil)
|
||||
|
@ -56,7 +56,7 @@ func (fs *billyFsWrapper) Open(filename string) (billy.File, error) {
|
|||
return &billyFile{
|
||||
name: filename,
|
||||
file: file,
|
||||
log: fs.log.With().Str("filename", filename).Logger(),
|
||||
log: fs.log.With("filename", filename),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode)
|
|||
return &billyFile{
|
||||
name: filename,
|
||||
file: file,
|
||||
log: fs.log.With().Str("filename", filename).Int("flag", flag).Str("perm", perm.String()).Logger(),
|
||||
log: fs.log.With("filename", filename, "flag", flag, "perm", perm.String()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -102,8 +102,8 @@ func (*billyFsWrapper) Readlink(link string) (string, error) {
|
|||
}
|
||||
|
||||
// Remove implements billy.Filesystem.
|
||||
func (*billyFsWrapper) Remove(filename string) error {
|
||||
return billy.ErrNotSupported
|
||||
func (s *billyFsWrapper) Remove(filename string) error {
|
||||
return s.fs.Unlink(filename)
|
||||
}
|
||||
|
||||
// Rename implements billy.Filesystem.
|
||||
|
@ -138,7 +138,7 @@ func (fs *billyFsWrapper) TempFile(dir string, prefix string) (billy.File, error
|
|||
type billyFile struct {
|
||||
name string
|
||||
file vfs.File
|
||||
log zerolog.Logger
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
var _ billy.File = (*billyFile)(nil)
|
||||
|
@ -188,13 +188,13 @@ func (*billyFile) Unlock() error {
|
|||
return nil // TODO
|
||||
}
|
||||
|
||||
func billyErr(err error, log zerolog.Logger) error {
|
||||
func billyErr(err error, log *slog.Logger) error {
|
||||
if errors.Is(err, vfs.ErrNotImplemented) {
|
||||
return billy.ErrNotSupported
|
||||
}
|
||||
if errors.Is(err, vfs.ErrNotExist) {
|
||||
if err, ok := asErr[*fs.PathError](err); ok {
|
||||
log.Error().Err(err.Err).Str("op", err.Op).Str("path", err.Path).Msg("file not found")
|
||||
log.Error("file not found", "op", err.Op, "path", err.Path, "error", err.Err)
|
||||
}
|
||||
return fs.ErrNotExist
|
||||
}
|
||||
|
|
|
@ -1,22 +1,22 @@
|
|||
package webdav
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/rs/zerolog/log"
|
||||
"golang.org/x/net/webdav"
|
||||
)
|
||||
|
||||
func newHandler(fs vfs.Filesystem) *webdav.Handler {
|
||||
l := log.Logger.With().Str("component", "webDAV").Logger()
|
||||
log := slog.With("component", "webDAV")
|
||||
return &webdav.Handler{
|
||||
Prefix: "/",
|
||||
FileSystem: newFS(fs),
|
||||
LockSystem: webdav.NewMemLS(),
|
||||
Logger: func(req *http.Request, err error) {
|
||||
if err != nil {
|
||||
l.Error().Err(err).Str("path", req.RequestURI).Msg("webDAV error")
|
||||
log.Error("webDAV error", "path", req.RequestURI, "error", err)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
|
@ -2,10 +2,10 @@ package webdav
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
"github.com/rs/zerolog/log"
|
||||
"golang.org/x/net/webdav"
|
||||
)
|
||||
|
||||
|
@ -33,21 +33,21 @@ func NewWebDAVServer(fs vfs.Filesystem, port int, user, pass string) error {
|
|||
Handler: serveMux,
|
||||
}
|
||||
|
||||
log.Info().Str("host", httpServer.Addr).Msg("starting webDAV server")
|
||||
slog.With("host", httpServer.Addr).Info("starting webDAV server")
|
||||
|
||||
return httpServer.ListenAndServe()
|
||||
}
|
||||
|
||||
func NewDirServer(dir string, port int, user, pass string) error {
|
||||
|
||||
l := log.Logger.With().Str("component", "webDAV").Logger()
|
||||
log := slog.With("component", "webDAV")
|
||||
srv := &webdav.Handler{
|
||||
Prefix: "/",
|
||||
FileSystem: webdav.Dir(dir),
|
||||
LockSystem: webdav.NewMemLS(),
|
||||
Logger: func(req *http.Request, err error) {
|
||||
if err != nil {
|
||||
l.Error().Err(err).Str("path", req.RequestURI).Msg("webDAV error")
|
||||
log.Error("webDAV error", "path", req.RequestURI)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func NewDirServer(dir string, port int, user, pass string) error {
|
|||
Handler: serveMux,
|
||||
}
|
||||
|
||||
log.Info().Str("host", httpServer.Addr).Msg("starting webDAV server")
|
||||
log.Info("starting webDAV server", "host", httpServer.Addr)
|
||||
|
||||
return httpServer.ListenAndServe()
|
||||
}
|
||||
|
|
107
src/host/controller/torrent.go
Normal file
107
src/host/controller/torrent.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/store"
|
||||
"github.com/anacrolix/torrent"
|
||||
)
|
||||
|
||||
type Torrent struct {
|
||||
torrentFilePath string
|
||||
t *torrent.Torrent
|
||||
rep *store.ExlcudedFiles
|
||||
}
|
||||
|
||||
func NewTorrent(t *torrent.Torrent, rep *store.ExlcudedFiles) *Torrent {
|
||||
return &Torrent{t: t, rep: rep}
|
||||
}
|
||||
|
||||
func (s *Torrent) TorrentFilePath() string {
|
||||
return s.torrentFilePath
|
||||
}
|
||||
|
||||
func (s *Torrent) Torrent() *torrent.Torrent {
|
||||
return s.t
|
||||
}
|
||||
|
||||
func (s *Torrent) Name() string {
|
||||
<-s.t.GotInfo()
|
||||
return s.t.Name()
|
||||
}
|
||||
|
||||
func (s *Torrent) InfoHash() string {
|
||||
<-s.t.GotInfo()
|
||||
return s.t.InfoHash().HexString()
|
||||
}
|
||||
|
||||
func (s *Torrent) BytesCompleted() int64 {
|
||||
<-s.t.GotInfo()
|
||||
return s.t.BytesCompleted()
|
||||
}
|
||||
|
||||
func (s *Torrent) BytesMissing() int64 {
|
||||
<-s.t.GotInfo()
|
||||
return s.t.BytesMissing()
|
||||
}
|
||||
|
||||
func (s *Torrent) Files() ([]*torrent.File, error) {
|
||||
excludedFiles, err := s.rep.ExcludedFiles(s.t.InfoHash())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
<-s.t.GotInfo()
|
||||
files := s.t.Files()
|
||||
files = slices.DeleteFunc(files, func(file *torrent.File) bool {
|
||||
p := file.Path()
|
||||
|
||||
if strings.Contains(p, "/.pad/") {
|
||||
return false
|
||||
}
|
||||
|
||||
if !slices.Contains(excludedFiles, p) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (s *Torrent) ExcludedFiles() ([]*torrent.File, error) {
|
||||
excludedFiles, err := s.rep.ExcludedFiles(s.t.InfoHash())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
<-s.t.GotInfo()
|
||||
files := s.t.Files()
|
||||
files = slices.DeleteFunc(files, func(file *torrent.File) bool {
|
||||
p := file.Path()
|
||||
|
||||
if strings.Contains(p, "/.pad/") {
|
||||
return false
|
||||
}
|
||||
|
||||
if slices.Contains(excludedFiles, p) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (s *Torrent) ExcludeFile(f *torrent.File) error {
|
||||
return s.rep.ExcludeFile(f)
|
||||
}
|
||||
|
||||
func (s *Torrent) ValidateTorrent() error {
|
||||
<-s.t.GotInfo()
|
||||
s.t.VerifyData()
|
||||
return nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package filestorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -6,15 +6,16 @@ import (
|
|||
"path/filepath"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/store"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
)
|
||||
|
||||
func SetupStorage(cfg config.TorrentClient) (storage.ClientImplCloser, storage.PieceCompletion, error) {
|
||||
func Setup(cfg config.TorrentClient) (*FileStorage, storage.PieceCompletion, error) {
|
||||
pcp := filepath.Join(cfg.MetadataFolder, "piece-completion")
|
||||
if err := os.MkdirAll(pcp, 0744); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
|
||||
}
|
||||
pc, err := NewBadgerPieceCompletion(pcp)
|
||||
pc, err := store.NewBadgerPieceCompletion(pcp)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
|
||||
}
|
186
src/host/filestorage/storage_files.go
Normal file
186
src/host/filestorage/storage_files.go
Normal file
|
@ -0,0 +1,186 @@
|
|||
package filestorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/controller"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
)
|
||||
|
||||
type FileStorageDeleter interface {
|
||||
storage.ClientImplCloser
|
||||
DeleteFile(file *torrent.File) error
|
||||
}
|
||||
|
||||
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
|
||||
func NewFileStorage(baseDir string, pc storage.PieceCompletion) *FileStorage {
|
||||
return &FileStorage{
|
||||
baseDir: baseDir,
|
||||
ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{
|
||||
ClientBaseDir: baseDir,
|
||||
PieceCompletion: pc,
|
||||
TorrentDirMaker: torrentDir,
|
||||
FilePathMaker: filePath,
|
||||
}),
|
||||
pieceCompletion: pc,
|
||||
log: slog.With("component", "torrent-client"),
|
||||
}
|
||||
}
|
||||
|
||||
// File-based storage for torrents, that isn't yet bound to a particular torrent.
|
||||
type FileStorage struct {
|
||||
baseDir string
|
||||
storage.ClientImplCloser
|
||||
pieceCompletion storage.PieceCompletion
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
func (me *FileStorage) Close() error {
|
||||
return me.pieceCompletion.Close()
|
||||
}
|
||||
|
||||
func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string {
|
||||
return filepath.Join(baseDir, info.Name)
|
||||
}
|
||||
|
||||
func filePath(opts storage.FilePathMakerOpts) string {
|
||||
return filepath.Join(opts.File.Path...)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string {
|
||||
return filepath.Join(torrentDir(fs.baseDir, info, infoHash), filePath(storage.FilePathMakerOpts{
|
||||
Info: info,
|
||||
File: fileInfo,
|
||||
}))
|
||||
}
|
||||
|
||||
func (fs *FileStorage) DeleteFile(file *torrent.File) error {
|
||||
info := file.Torrent().Info()
|
||||
infoHash := file.Torrent().InfoHash()
|
||||
torrentDir := torrentDir(fs.baseDir, info, infoHash)
|
||||
fileInfo := file.FileInfo()
|
||||
relFilePath := filePath(storage.FilePathMakerOpts{
|
||||
Info: info,
|
||||
File: &fileInfo,
|
||||
})
|
||||
filePath := path.Join(torrentDir, relFilePath)
|
||||
for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
|
||||
pk := metainfo.PieceKey{InfoHash: infoHash, Index: i}
|
||||
err := fs.pieceCompletion.Set(pk, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return os.Remove(filePath)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) {
|
||||
log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun)
|
||||
|
||||
expectedEntries := []string{}
|
||||
for _, e := range expected {
|
||||
expectedEntries = append(expectedEntries, e.Torrent().Name())
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(fs.baseDir)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
toDelete := []string{}
|
||||
for _, v := range entries {
|
||||
if !slices.Contains(expectedEntries, v.Name()) {
|
||||
toDelete = append(toDelete, v.Name())
|
||||
}
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return 0, ctx.Err()
|
||||
}
|
||||
|
||||
log.Info("deleting trash data", "dirsCount", len(toDelete))
|
||||
if !dryRun {
|
||||
for i, name := range toDelete {
|
||||
p := path.Join(fs.baseDir, name)
|
||||
log.Warn("deleting trash data", "path", p)
|
||||
err := os.RemoveAll(p)
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return len(toDelete), nil
|
||||
}
|
||||
|
||||
func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) {
|
||||
log := fs.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun)
|
||||
|
||||
expectedEntries := []string{}
|
||||
{
|
||||
for _, e := range expected {
|
||||
files, err := e.Files()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
expectedEntries = append(expectedEntries, fs.filePath(e.Torrent().Info(), e.Torrent().InfoHash(), ptr(f.FileInfo())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
entries := []string{}
|
||||
err := filepath.Walk(fs.baseDir,
|
||||
func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
entries = append(entries, path)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
toDelete := []string{}
|
||||
for _, v := range entries {
|
||||
if !slices.Contains(expectedEntries, v) {
|
||||
toDelete = append(toDelete, v)
|
||||
}
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return len(toDelete), ctx.Err()
|
||||
}
|
||||
|
||||
log.Info("deleting trash data", "filesCount", len(toDelete))
|
||||
if !dryRun {
|
||||
for i, p := range toDelete {
|
||||
fs.log.Warn("deleting trash data", "path", p)
|
||||
err := os.Remove(p)
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return len(toDelete), nil
|
||||
}
|
||||
|
||||
func ptr[D any](v D) *D {
|
||||
return &v
|
||||
}
|
|
@ -4,99 +4,153 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/storage"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/controller"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/filestorage"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/store"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/bencode"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/types"
|
||||
"github.com/anacrolix/torrent/types/infohash"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
c *torrent.Client
|
||||
rep storage.ExlcudedFiles
|
||||
c *torrent.Client
|
||||
excludedFiles *store.ExlcudedFiles
|
||||
infoBytes *store.InfoBytes
|
||||
|
||||
torrentLoaded chan struct{}
|
||||
|
||||
// stats *Stats
|
||||
DefaultPriority types.PiecePriority
|
||||
Storage *filestorage.FileStorage
|
||||
SourceDir string
|
||||
|
||||
log *slog.Logger
|
||||
addTimeout, readTimeout int
|
||||
}
|
||||
|
||||
func NewService(c *torrent.Client, rep storage.ExlcudedFiles, addTimeout, readTimeout int) *Service {
|
||||
l := slog.With("component", "torrent-service")
|
||||
return &Service{
|
||||
log: l,
|
||||
func NewService(sourceDir string, c *torrent.Client, storage *filestorage.FileStorage, excludedFiles *store.ExlcudedFiles, infoBytes *store.InfoBytes, addTimeout, readTimeout int) *Service {
|
||||
s := &Service{
|
||||
log: slog.With("component", "torrent-service"),
|
||||
c: c,
|
||||
DefaultPriority: types.PiecePriorityNone,
|
||||
rep: rep,
|
||||
excludedFiles: excludedFiles,
|
||||
infoBytes: infoBytes,
|
||||
Storage: storage,
|
||||
SourceDir: sourceDir,
|
||||
torrentLoaded: make(chan struct{}),
|
||||
// stats: newStats(), // TODO persistent
|
||||
addTimeout: addTimeout,
|
||||
readTimeout: readTimeout,
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := s.loadTorrentFiles(context.Background())
|
||||
if err != nil {
|
||||
s.log.Error("initial torrent load failed", "error", err)
|
||||
}
|
||||
close(s.torrentLoaded)
|
||||
}()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs
|
||||
|
||||
func (s *Service) NewTorrent(f vfs.File) (*torrent.Torrent, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*time.Duration(s.addTimeout))
|
||||
defer cancel()
|
||||
func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) {
|
||||
defer f.Close()
|
||||
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("call stat failed: %w", err)
|
||||
}
|
||||
|
||||
mi, err := metainfo.Load(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err)
|
||||
}
|
||||
|
||||
t, ok := s.c.Torrent(mi.HashInfoBytes())
|
||||
if !ok {
|
||||
t, err = s.c.AddTorrent(mi)
|
||||
spec, err := torrent.TorrentSpecFromMetaInfoErr(mi)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("parse spec from metadata: %w", err)
|
||||
}
|
||||
infoBytes := spec.InfoBytes
|
||||
|
||||
if !isValidInfoHashBytes(infoBytes) {
|
||||
infoBytes = nil
|
||||
}
|
||||
|
||||
if len(infoBytes) == 0 {
|
||||
infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash)
|
||||
if err != nil && err != store.ErrNotFound {
|
||||
return nil, fmt.Errorf("get info bytes from database: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var info metainfo.Info
|
||||
err = bencode.Unmarshal(infoBytes, &info)
|
||||
if err != nil {
|
||||
infoBytes = nil
|
||||
} else {
|
||||
for _, t := range s.c.Torrents() {
|
||||
if t.Name() == info.BestName() {
|
||||
return nil, fmt.Errorf("torrent with name '%s' already exists", t.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t, _ = s.c.AddTorrentOpt(torrent.AddTorrentOpts{
|
||||
InfoHash: spec.InfoHash,
|
||||
Storage: s.Storage,
|
||||
InfoBytes: infoBytes,
|
||||
ChunkSize: spec.ChunkSize,
|
||||
})
|
||||
t.AllowDataDownload()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("creating torrent fs timed out")
|
||||
return nil, fmt.Errorf("creating torrent timed out")
|
||||
case <-t.GotInfo():
|
||||
err := s.infoBytes.Set(t.InfoHash(), t.Metainfo())
|
||||
if err != nil {
|
||||
s.log.Error("error setting info bytes for torrent %s: %s", t.Name(), err.Error())
|
||||
}
|
||||
for _, f := range t.Files() {
|
||||
f.SetPriority(s.DefaultPriority)
|
||||
}
|
||||
|
||||
}
|
||||
for _, f := range t.Files() {
|
||||
f.SetPriority(s.DefaultPriority)
|
||||
}
|
||||
t.AllowDataDownload()
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func isValidInfoHashBytes(d []byte) bool {
|
||||
var info metainfo.Info
|
||||
err := bencode.Unmarshal(d, &info)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*time.Duration(s.addTimeout))
|
||||
defer cancel()
|
||||
defer f.Close()
|
||||
|
||||
mi, err := metainfo.Load(f)
|
||||
t, err := s.AddTorrent(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t, ok := s.c.Torrent(mi.HashInfoBytes())
|
||||
if !ok {
|
||||
t, err = s.c.AddTorrent(mi)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("creating torrent fs timed out")
|
||||
case <-t.GotInfo():
|
||||
}
|
||||
for _, f := range t.Files() {
|
||||
f.SetPriority(s.DefaultPriority)
|
||||
}
|
||||
t.AllowDataDownload()
|
||||
}
|
||||
|
||||
return vfs.NewTorrentFs(t, s.rep, s.readTimeout), nil
|
||||
return vfs.NewTorrentFs(controller.NewTorrent(t, s.excludedFiles), s.readTimeout), nil
|
||||
}
|
||||
|
||||
func (s *Service) Stats() (*Stats, error) {
|
||||
|
@ -106,3 +160,52 @@ func (s *Service) Stats() (*Stats, error) {
|
|||
func (s *Service) GetStats() torrent.ConnStats {
|
||||
return s.c.ConnStats()
|
||||
}
|
||||
|
||||
func (s *Service) loadTorrentFiles(ctx context.Context) error {
|
||||
return filepath.Walk(s.SourceDir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("fs walk error: %w", err)
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if strings.HasSuffix(path, ".torrent") {
|
||||
file := vfs.NewLazyOsFile(path)
|
||||
defer file.Close()
|
||||
|
||||
_, err = s.AddTorrent(ctx, file)
|
||||
if err != nil {
|
||||
s.log.Error("failed adding torrent", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) ListTorrents(ctx context.Context) ([]*controller.Torrent, error) {
|
||||
<-s.torrentLoaded
|
||||
|
||||
out := []*controller.Torrent{}
|
||||
for _, v := range s.c.Torrents() {
|
||||
out = append(out, controller.NewTorrent(v, s.excludedFiles))
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Service) GetTorrent(infohashHex string) (*controller.Torrent, error) {
|
||||
<-s.torrentLoaded
|
||||
|
||||
t, ok := s.c.Torrent(infohash.FromHexString(infohashHex))
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return controller.NewTorrent(t, s.excludedFiles), nil
|
||||
}
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/dht/v2"
|
||||
"github.com/anacrolix/dht/v2/bep44"
|
||||
tlog "github.com/anacrolix/log"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
dlog "git.kmsign.ru/royalcat/tstor/src/log"
|
||||
)
|
||||
|
||||
func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) {
|
||||
// TODO download and upload limits
|
||||
torrentCfg := torrent.NewDefaultClientConfig()
|
||||
torrentCfg.PeerID = string(id[:])
|
||||
torrentCfg.DefaultStorage = st
|
||||
|
||||
l := log.Logger.With().Str("component", "torrent-client").Logger()
|
||||
|
||||
tl := tlog.NewLogger()
|
||||
tl.SetHandlers(&dlog.Torrent{L: l})
|
||||
torrentCfg.Logger = tl
|
||||
|
||||
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
|
||||
cfg.Store = fis
|
||||
cfg.Exp = 2 * time.Hour
|
||||
cfg.NoSecurity = false
|
||||
}
|
||||
|
||||
return torrent.NewClient(torrentCfg)
|
||||
}
|
|
@ -1,105 +0,0 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type FileStorageDeleter interface {
|
||||
storage.ClientImplCloser
|
||||
DeleteFile(file *torrent.File) error
|
||||
Cleanup(expected []*torrent.Torrent) error
|
||||
}
|
||||
|
||||
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
|
||||
func NewFileStorage(baseDir string, pc storage.PieceCompletion) FileStorageDeleter {
|
||||
|
||||
return &FileStorage{
|
||||
baseDir: baseDir,
|
||||
ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{
|
||||
ClientBaseDir: baseDir,
|
||||
PieceCompletion: pc,
|
||||
TorrentDirMaker: torrentDir,
|
||||
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
|
||||
return filePath(opts.File)
|
||||
},
|
||||
}),
|
||||
pieceCompletion: pc,
|
||||
log: log.Logger.With().Str("component", "torrent-client").Logger(),
|
||||
}
|
||||
}
|
||||
|
||||
// File-based storage for torrents, that isn't yet bound to a particular torrent.
|
||||
type FileStorage struct {
|
||||
baseDir string
|
||||
storage.ClientImplCloser
|
||||
pieceCompletion storage.PieceCompletion
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
func (me *FileStorage) Close() error {
|
||||
return me.pieceCompletion.Close()
|
||||
}
|
||||
|
||||
func torrentDir(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string {
|
||||
return filepath.Join(baseDir, info.Name)
|
||||
}
|
||||
|
||||
func filePath(file *metainfo.FileInfo) string {
|
||||
return filepath.Join(file.Path...)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) DeleteFile(file *torrent.File) error {
|
||||
info := file.Torrent().Info()
|
||||
infoHash := file.Torrent().InfoHash()
|
||||
torrentDir := torrentDir(fs.baseDir, info, infoHash)
|
||||
fileInfo := file.FileInfo()
|
||||
relFilePath := filePath(&fileInfo)
|
||||
filePath := path.Join(torrentDir, relFilePath)
|
||||
for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ {
|
||||
pk := metainfo.PieceKey{InfoHash: infoHash, Index: i}
|
||||
err := fs.pieceCompletion.Set(pk, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return os.Remove(filePath)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) Cleanup(expected []*torrent.Torrent) error {
|
||||
expectedEntries := []string{}
|
||||
for _, e := range expected {
|
||||
expectedEntries = append(expectedEntries, e.Name())
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(fs.baseDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
toDelete := []string{}
|
||||
for _, v := range entries {
|
||||
if !slices.Contains(expectedEntries, v.Name()) {
|
||||
toDelete = append(toDelete, v.Name())
|
||||
}
|
||||
}
|
||||
|
||||
fs.log.Info().Int("count", len(toDelete)).Msg("start deleting trash data")
|
||||
for _, name := range toDelete {
|
||||
p := path.Join(fs.baseDir, name)
|
||||
fs.log.Info().Str("path", p).Msg("deleting trash data")
|
||||
err := os.RemoveAll(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
47
src/host/store/client.go
Normal file
47
src/host/store/client.go
Normal file
|
@ -0,0 +1,47 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
|
||||
"github.com/anacrolix/dht/v2/bep44"
|
||||
tlog "github.com/anacrolix/log"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
dlog "git.kmsign.ru/royalcat/tstor/src/log"
|
||||
)
|
||||
|
||||
// MOVE
|
||||
func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) {
|
||||
l := slog.With("component", "torrent-client")
|
||||
|
||||
// TODO download and upload limits
|
||||
torrentCfg := torrent.NewDefaultClientConfig()
|
||||
torrentCfg.PeerID = string(id[:])
|
||||
torrentCfg.DefaultStorage = st
|
||||
// torrentCfg.AlwaysWantConns = true
|
||||
// torrentCfg.DisableAggressiveUpload = true
|
||||
// torrentCfg.Seed = true
|
||||
// torrentCfg.DownloadRateLimiter = rate.NewLimiter(rate.Inf, 0)
|
||||
// torrentCfg
|
||||
|
||||
tl := tlog.NewLogger()
|
||||
tl.SetHandlers(&dlog.Torrent{L: l})
|
||||
torrentCfg.Logger = tl
|
||||
torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.NewPeer, func(p *torrent.Peer) {
|
||||
l.Debug("new peer", "ip", p.RemoteAddr.String())
|
||||
})
|
||||
|
||||
torrentCfg.Callbacks.NewPeer = append(torrentCfg.Callbacks.PeerClosed, func(p *torrent.Peer) {
|
||||
l.Debug("peer closed", "ip", p.RemoteAddr.String())
|
||||
})
|
||||
|
||||
// torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
|
||||
// cfg.Store = fis
|
||||
// cfg.Exp = 2 * time.Hour
|
||||
// cfg.NoSecurity = false
|
||||
// }
|
||||
|
||||
return torrent.NewClient(torrentCfg)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package store
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
@ -7,18 +7,12 @@ import (
|
|||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
atstorage "github.com/anacrolix/torrent/storage"
|
||||
"github.com/philippgille/gokv"
|
||||
"github.com/philippgille/gokv/badgerdb"
|
||||
"github.com/philippgille/gokv/encoding"
|
||||
)
|
||||
|
||||
type ExlcudedFiles interface {
|
||||
ExcludeFile(file *torrent.File) error
|
||||
ExcludedFiles(hash metainfo.Hash) ([]string, error)
|
||||
}
|
||||
|
||||
func NewExcludedFiles(metaDir string, storage atstorage.ClientImplCloser) (ExlcudedFiles, error) {
|
||||
func NewExcludedFiles(metaDir string, storage TorrentFileDeleter) (*ExlcudedFiles, error) {
|
||||
excludedFilesStore, err := badgerdb.NewStore(badgerdb.Options{
|
||||
Dir: filepath.Join(metaDir, "excluded-files"),
|
||||
Codec: encoding.JSON,
|
||||
|
@ -28,7 +22,7 @@ func NewExcludedFiles(metaDir string, storage atstorage.ClientImplCloser) (Exlcu
|
|||
return nil, err
|
||||
}
|
||||
|
||||
r := &torrentRepositoryImpl{
|
||||
r := &ExlcudedFiles{
|
||||
excludedFiles: excludedFilesStore,
|
||||
storage: storage,
|
||||
}
|
||||
|
@ -36,15 +30,19 @@ func NewExcludedFiles(metaDir string, storage atstorage.ClientImplCloser) (Exlcu
|
|||
return r, nil
|
||||
}
|
||||
|
||||
type torrentRepositoryImpl struct {
|
||||
type ExlcudedFiles struct {
|
||||
m sync.RWMutex
|
||||
excludedFiles gokv.Store
|
||||
storage atstorage.ClientImplCloser
|
||||
storage TorrentFileDeleter
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
func (r *torrentRepositoryImpl) ExcludeFile(file *torrent.File) error {
|
||||
type TorrentFileDeleter interface {
|
||||
DeleteFile(file *torrent.File) error
|
||||
}
|
||||
|
||||
func (r *ExlcudedFiles) ExcludeFile(file *torrent.File) error {
|
||||
r.m.Lock()
|
||||
defer r.m.Unlock()
|
||||
|
||||
|
@ -59,17 +57,15 @@ func (r *torrentRepositoryImpl) ExcludeFile(file *torrent.File) error {
|
|||
}
|
||||
excludedFiles = unique(append(excludedFiles, file.Path()))
|
||||
|
||||
if storage, ok := r.storage.(FileStorageDeleter); ok {
|
||||
err = storage.DeleteFile(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.storage.DeleteFile(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.excludedFiles.Set(hash.AsString(), excludedFiles)
|
||||
}
|
||||
|
||||
func (r *torrentRepositoryImpl) ExcludedFiles(hash metainfo.Hash) ([]string, error) {
|
||||
func (r *ExlcudedFiles) ExcludedFiles(hash metainfo.Hash) ([]string, error) {
|
||||
r.m.Lock()
|
||||
defer r.m.Unlock()
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package store
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
79
src/host/store/info.go
Normal file
79
src/host/store/info.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
|
||||
dlog "git.kmsign.ru/royalcat/tstor/src/log"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/types/infohash"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
type InfoBytes struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func NewInfoBytes(metaDir string) (*InfoBytes, error) {
|
||||
l := slog.With("component", "badger", "db", "info-bytes")
|
||||
|
||||
opts := badger.
|
||||
DefaultOptions(filepath.Join(metaDir, "infobytes")).
|
||||
WithLogger(&dlog.Badger{L: l})
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &InfoBytes{db}, nil
|
||||
}
|
||||
|
||||
func (k *InfoBytes) GetBytes(ih infohash.T) ([]byte, error) {
|
||||
var data []byte
|
||||
err := k.db.View(func(tx *badger.Txn) error {
|
||||
item, err := tx.Get(ih.Bytes())
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
return fmt.Errorf("error getting value: %w", err)
|
||||
}
|
||||
|
||||
data, err = item.ValueCopy(data)
|
||||
return err
|
||||
})
|
||||
return data, err
|
||||
}
|
||||
|
||||
func (k *InfoBytes) Get(ih infohash.T) (*metainfo.MetaInfo, error) {
|
||||
data, err := k.GetBytes(ih)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return metainfo.Load(bytes.NewReader(data))
|
||||
}
|
||||
|
||||
func (me *InfoBytes) SetBytes(ih infohash.T, bytes []byte) error {
|
||||
return me.db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Set(ih.Bytes(), bytes)
|
||||
})
|
||||
}
|
||||
|
||||
func (me *InfoBytes) Set(ih infohash.T, info metainfo.MetaInfo) error {
|
||||
return me.db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Set(ih.Bytes(), info.InfoBytes)
|
||||
})
|
||||
}
|
||||
|
||||
func (k *InfoBytes) Delete(ih infohash.T) error {
|
||||
return k.db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Delete(ih.Bytes())
|
||||
})
|
||||
}
|
||||
|
||||
func (me *InfoBytes) Close() error {
|
||||
return me.db.Close()
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
@ -9,7 +9,6 @@ import (
|
|||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type PieceCompletionState byte
|
||||
|
@ -34,7 +33,7 @@ type badgerPieceCompletion struct {
|
|||
var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil)
|
||||
|
||||
func NewBadgerPieceCompletion(dir string) (storage.PieceCompletion, error) {
|
||||
l := log.Logger.With().Str("component", "badger").Str("db", "piece-completion").Logger()
|
||||
l := slog.With("component", "badger", "db", "piece-completion")
|
||||
|
||||
opts := badger.
|
||||
DefaultOptions(dir).
|
|
@ -1,14 +1,14 @@
|
|||
package storage
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
dlog "git.kmsign.ru/royalcat/tstor/src/log"
|
||||
"github.com/anacrolix/dht/v2/bep44"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var _ bep44.Store = &FileItemStore{}
|
||||
|
@ -19,7 +19,7 @@ type FileItemStore struct {
|
|||
}
|
||||
|
||||
func NewFileItemStore(path string, itemsTTL time.Duration) (*FileItemStore, error) {
|
||||
l := log.Logger.With().Str("component", "item-store").Logger()
|
||||
l := slog.With("component", "item-store")
|
||||
|
||||
opts := badger.DefaultOptions(path).
|
||||
WithLogger(&dlog.Badger{L: l}).
|
|
@ -8,7 +8,6 @@ import (
|
|||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/iio"
|
||||
"github.com/bodgit/sevenzip"
|
||||
|
@ -17,34 +16,79 @@ import (
|
|||
|
||||
var ArchiveFactories = map[string]FsFactory{
|
||||
".zip": func(f File) (Filesystem, error) {
|
||||
return NewArchive(f, f.Size(), ZipLoader), nil
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewArchive(stat.Name(), f, stat.Size(), ZipLoader), nil
|
||||
},
|
||||
".rar": func(f File) (Filesystem, error) {
|
||||
return NewArchive(f, f.Size(), RarLoader), nil
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewArchive(stat.Name(), f, stat.Size(), RarLoader), nil
|
||||
},
|
||||
".7z": func(f File) (Filesystem, error) {
|
||||
return NewArchive(f, f.Size(), SevenZipLoader), nil
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewArchive(stat.Name(), f, stat.Size(), SevenZipLoader), nil
|
||||
},
|
||||
}
|
||||
|
||||
type ArchiveLoader func(r iio.Reader, size int64) (map[string]*archiveFile, error)
|
||||
type archiveLoader func(r iio.Reader, size int64) (map[string]*archiveFile, error)
|
||||
|
||||
var _ Filesystem = &archive{}
|
||||
|
||||
type archive struct {
|
||||
name string
|
||||
|
||||
r iio.Reader
|
||||
|
||||
size int64
|
||||
|
||||
files func() (map[string]*archiveFile, error)
|
||||
files func() (map[string]File, error)
|
||||
}
|
||||
|
||||
func NewArchive(r iio.Reader, size int64, loader ArchiveLoader) *archive {
|
||||
func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *archive {
|
||||
return &archive{
|
||||
name: name,
|
||||
r: r,
|
||||
size: size,
|
||||
files: sync.OnceValues(func() (map[string]*archiveFile, error) {
|
||||
return loader(r, size)
|
||||
files: OnceValueWOErr(func() (map[string]File, error) {
|
||||
zipFiles, err := loader(r, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO make optional
|
||||
singleDir := true
|
||||
for k := range zipFiles {
|
||||
if !strings.HasPrefix(k, "/"+name+"/") {
|
||||
singleDir = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
files := make(map[string]File, len(zipFiles))
|
||||
for k, v := range zipFiles {
|
||||
// TODO make optional
|
||||
if strings.Contains(k, "/__MACOSX/") {
|
||||
continue
|
||||
}
|
||||
|
||||
if singleDir {
|
||||
k, _ = strings.CutPrefix(k, "/"+name)
|
||||
}
|
||||
|
||||
files[k] = v
|
||||
}
|
||||
|
||||
// FIXME
|
||||
files["/.forcegallery"] = NewMemoryFile(".forcegallery", []byte{})
|
||||
|
||||
return files, nil
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
@ -80,7 +124,7 @@ func (afs *archive) Stat(filename string) (fs.FileInfo, error) {
|
|||
}
|
||||
|
||||
if file, ok := files[filename]; ok {
|
||||
return newFileInfo(path.Base(filename), file.Size()), nil
|
||||
return file.Stat()
|
||||
}
|
||||
|
||||
for p, _ := range files {
|
||||
|
@ -90,7 +134,6 @@ func (afs *archive) Stat(filename string) (fs.FileInfo, error) {
|
|||
}
|
||||
|
||||
return nil, ErrNotExist
|
||||
|
||||
}
|
||||
|
||||
var _ File = &archiveFile{}
|
||||
|
@ -162,7 +205,7 @@ func (d *archiveFile) ReadAt(p []byte, off int64) (n int, err error) {
|
|||
return d.reader.ReadAt(p, off)
|
||||
}
|
||||
|
||||
var _ ArchiveLoader = ZipLoader
|
||||
var _ archiveLoader = ZipLoader
|
||||
|
||||
func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
|
||||
zr, err := zip.NewReader(reader, size)
|
||||
|
@ -171,14 +214,14 @@ func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
|
|||
}
|
||||
|
||||
out := make(map[string]*archiveFile)
|
||||
for _, f := range zr.File {
|
||||
f := f
|
||||
if f.FileInfo().IsDir() {
|
||||
for i := range zr.File {
|
||||
zipFile := zr.File[i]
|
||||
if zipFile.FileInfo().IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
rf := func() (iio.Reader, error) {
|
||||
zr, err := f.Open()
|
||||
zr, err := zipFile.Open()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -186,16 +229,13 @@ func ZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
|
|||
return iio.NewDiskTeeReader(zr)
|
||||
}
|
||||
|
||||
n := filepath.Join(string(os.PathSeparator), f.Name)
|
||||
af := NewArchiveFile(f.Name, rf, f.FileInfo().Size())
|
||||
|
||||
out[n] = af
|
||||
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, rf, zipFile.FileInfo().Size())
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
var _ ArchiveLoader = SevenZipLoader
|
||||
var _ archiveLoader = SevenZipLoader
|
||||
|
||||
func SevenZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
|
||||
r, err := sevenzip.NewReader(reader, size)
|
||||
|
@ -228,7 +268,7 @@ func SevenZipLoader(reader iio.Reader, size int64) (map[string]*archiveFile, err
|
|||
return out, nil
|
||||
}
|
||||
|
||||
var _ ArchiveLoader = RarLoader
|
||||
var _ archiveLoader = RarLoader
|
||||
|
||||
func RarLoader(reader iio.Reader, size int64) (map[string]*archiveFile, error) {
|
||||
r, err := rardecode.NewReader(iio.NewSeekerWrapper(reader, size))
|
||||
|
|
|
@ -18,7 +18,8 @@ func TestZipFilesystem(t *testing.T) {
|
|||
|
||||
zReader, size := createTestZip(require)
|
||||
|
||||
zfs := NewArchive(zReader, size, ZipLoader)
|
||||
// TODO add single dir collapse test
|
||||
zfs := NewArchive("test", zReader, size, ZipLoader)
|
||||
|
||||
files, err := zfs.ReadDir("/path/to/test/file")
|
||||
require.NoError(err)
|
||||
|
|
117
src/host/vfs/log.go
Normal file
117
src/host/vfs/log.go
Normal file
|
@ -0,0 +1,117 @@
|
|||
package vfs
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
type LogFS struct {
|
||||
fs Filesystem
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
var _ Filesystem = (*LogFS)(nil)
|
||||
|
||||
func WrapLogFS(fs Filesystem, log *slog.Logger) *LogFS {
|
||||
return &LogFS{
|
||||
fs: fs,
|
||||
log: log.With("component", "fs"),
|
||||
}
|
||||
}
|
||||
|
||||
// Open implements Filesystem.
|
||||
func (fs *LogFS) Open(filename string) (File, error) {
|
||||
file, err := fs.fs.Open(filename)
|
||||
if err != nil {
|
||||
fs.log.With("filename", filename).Error("Failed to open file")
|
||||
}
|
||||
file = WrapLogFile(file, filename, fs.log)
|
||||
return file, err
|
||||
}
|
||||
|
||||
// ReadDir implements Filesystem.
|
||||
func (fs *LogFS) ReadDir(path string) ([]fs.DirEntry, error) {
|
||||
file, err := fs.fs.ReadDir(path)
|
||||
if err != nil {
|
||||
fs.log.Error("Failed to read dir", "path", path, "error", err)
|
||||
}
|
||||
return file, err
|
||||
}
|
||||
|
||||
// Stat implements Filesystem.
|
||||
func (fs *LogFS) Stat(filename string) (fs.FileInfo, error) {
|
||||
file, err := fs.fs.Stat(filename)
|
||||
if err != nil {
|
||||
fs.log.Error("Failed to stat", "filename", filename, "error", err)
|
||||
}
|
||||
return file, err
|
||||
}
|
||||
|
||||
// Unlink implements Filesystem.
|
||||
func (fs *LogFS) Unlink(filename string) error {
|
||||
err := fs.fs.Unlink(filename)
|
||||
if err != nil {
|
||||
fs.log.Error("Failed to stat", "filename", filename, "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type LogFile struct {
|
||||
f File
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
var _ File = (*LogFile)(nil)
|
||||
|
||||
func WrapLogFile(f File, filename string, log *slog.Logger) *LogFile {
|
||||
return &LogFile{
|
||||
f: f,
|
||||
log: log.With("filename", filename),
|
||||
}
|
||||
}
|
||||
|
||||
// Close implements File.
|
||||
func (f *LogFile) Close() error {
|
||||
err := f.f.Close()
|
||||
if err != nil {
|
||||
f.log.Error("Failed to close", "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// IsDir implements File.
|
||||
func (f *LogFile) IsDir() bool {
|
||||
return f.f.IsDir()
|
||||
}
|
||||
|
||||
// Read implements File.
|
||||
func (f *LogFile) Read(p []byte) (n int, err error) {
|
||||
n, err = f.f.Read(p)
|
||||
if err != nil {
|
||||
f.log.Error("Failed to read", "error", err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// ReadAt implements File.
|
||||
func (f *LogFile) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
n, err = f.f.ReadAt(p, off)
|
||||
if err != nil {
|
||||
f.log.Error("Failed to read", "offset", off, "error", err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Size implements File.
|
||||
func (f *LogFile) Size() int64 {
|
||||
return f.f.Size()
|
||||
}
|
||||
|
||||
// Stat implements File.
|
||||
func (f *LogFile) Stat() (fs.FileInfo, error) {
|
||||
info, err := f.f.Stat()
|
||||
if err != nil {
|
||||
f.log.Error("Failed to read", "error", err)
|
||||
}
|
||||
return info, err
|
||||
}
|
|
@ -31,17 +31,12 @@ func (fs *OsFS) Open(filename string) (File, error) {
|
|||
return NewDir(filename), nil
|
||||
}
|
||||
|
||||
osfile, err := os.Open(path.Join(fs.hostDir, filename))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewOsFile(osfile), nil
|
||||
return NewLazyOsFile(path.Join(fs.hostDir, filename)), nil
|
||||
}
|
||||
|
||||
// ReadDir implements Filesystem.
|
||||
func (o *OsFS) ReadDir(dir string) ([]fs.DirEntry, error) {
|
||||
dir = path.Join(o.hostDir, dir)
|
||||
return os.ReadDir(dir)
|
||||
return os.ReadDir(path.Join(o.hostDir, dir))
|
||||
}
|
||||
|
||||
func NewOsFs(osDir string) *OsFS {
|
||||
|
@ -163,6 +158,7 @@ func (f *LazyOsFile) ReadAt(p []byte, off int64) (n int, err error) {
|
|||
|
||||
func (f *LazyOsFile) Stat() (fs.FileInfo, error) {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
if f.info == nil {
|
||||
if f.file == nil {
|
||||
info, err := os.Stat(f.path)
|
||||
|
@ -178,7 +174,6 @@ func (f *LazyOsFile) Stat() (fs.FileInfo, error) {
|
|||
f.info = info
|
||||
}
|
||||
}
|
||||
f.m.Unlock()
|
||||
return f.info, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -113,7 +113,29 @@ func (r *resolver) isNestedFs(f string) bool {
|
|||
return true
|
||||
}
|
||||
}
|
||||
return true
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *resolver) nestedFs(fsPath string, file File) (Filesystem, error) {
|
||||
for ext, nestFactory := range r.factories {
|
||||
if !strings.HasSuffix(fsPath, ext) {
|
||||
continue
|
||||
}
|
||||
|
||||
if nestedFs, ok := r.fsmap[fsPath]; ok {
|
||||
return nestedFs, nil
|
||||
}
|
||||
|
||||
nestedFs, err := nestFactory(file)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating filesystem from file: %s with error: %w", fsPath, err)
|
||||
}
|
||||
r.fsmap[fsPath] = nestedFs
|
||||
|
||||
return nestedFs, nil
|
||||
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// open requeue raw open, without resolver call
|
||||
|
@ -173,7 +195,7 @@ var ErrNotExist = fs.ErrNotExist
|
|||
|
||||
func getFile[F File](m map[string]F, name string) (File, error) {
|
||||
if name == Separator {
|
||||
return &dir{}, nil
|
||||
return NewDir(name), nil
|
||||
}
|
||||
|
||||
f, ok := m[name]
|
||||
|
@ -183,7 +205,7 @@ func getFile[F File](m map[string]F, name string) (File, error) {
|
|||
|
||||
for p := range m {
|
||||
if strings.HasPrefix(p, name) {
|
||||
return &dir{}, nil
|
||||
return NewDir(name), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/storage"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/controller"
|
||||
"git.kmsign.ru/royalcat/tstor/src/iio"
|
||||
"github.com/anacrolix/missinggo/v2"
|
||||
"github.com/anacrolix/torrent"
|
||||
|
@ -20,81 +20,115 @@ import (
|
|||
var _ Filesystem = &TorrentFs{}
|
||||
|
||||
type TorrentFs struct {
|
||||
mu sync.Mutex
|
||||
t *torrent.Torrent
|
||||
rep storage.ExlcudedFiles
|
||||
mu sync.Mutex
|
||||
c *controller.Torrent
|
||||
|
||||
readTimeout int
|
||||
|
||||
//cache
|
||||
filesCache map[string]*torrentFile
|
||||
filesCache map[string]File
|
||||
|
||||
resolver *resolver
|
||||
}
|
||||
|
||||
func NewTorrentFs(t *torrent.Torrent, rep storage.ExlcudedFiles, readTimeout int) *TorrentFs {
|
||||
func NewTorrentFs(c *controller.Torrent, readTimeout int) *TorrentFs {
|
||||
return &TorrentFs{
|
||||
t: t,
|
||||
rep: rep,
|
||||
c: c,
|
||||
readTimeout: readTimeout,
|
||||
resolver: newResolver(ArchiveFactories),
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *TorrentFs) files() (map[string]*torrentFile, error) {
|
||||
if fs.filesCache == nil {
|
||||
fs.mu.Lock()
|
||||
<-fs.t.GotInfo()
|
||||
files := fs.t.Files()
|
||||
func (fs *TorrentFs) files() (map[string]File, error) {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
excludedFiles, err := fs.rep.ExcludedFiles(fs.t.InfoHash())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if fs.filesCache != nil {
|
||||
return fs.filesCache, nil
|
||||
}
|
||||
|
||||
files, err := fs.c.Files()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fs.filesCache = make(map[string]File)
|
||||
for _, file := range files {
|
||||
p := AbsPath(file.Path())
|
||||
|
||||
fs.filesCache[p] = &torrentFile{
|
||||
name: path.Base(p),
|
||||
timeout: fs.readTimeout,
|
||||
file: file,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO optional
|
||||
if len(fs.filesCache) == 1 && fs.resolver.isNestedFs(fs.c.Name()) {
|
||||
filepath := "/" + fs.c.Name()
|
||||
if file, ok := fs.filesCache[filepath]; ok {
|
||||
nestedFs, err := fs.resolver.nestedFs(filepath, file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if nestedFs == nil {
|
||||
goto DEFAULT_DIR // FIXME
|
||||
}
|
||||
fs.filesCache, err = listFilesRecursive(nestedFs, "/")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return fs.filesCache, nil
|
||||
}
|
||||
|
||||
fs.filesCache = make(map[string]*torrentFile)
|
||||
for _, file := range files {
|
||||
p := file.Path()
|
||||
}
|
||||
|
||||
if slices.Contains(excludedFiles, p) {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(p, "/.pad/") {
|
||||
continue
|
||||
}
|
||||
|
||||
p = AbsPath(file.Path())
|
||||
|
||||
fs.filesCache[p] = &torrentFile{
|
||||
name: path.Base(p),
|
||||
timeout: fs.readTimeout,
|
||||
file: file,
|
||||
}
|
||||
DEFAULT_DIR:
|
||||
rootDir := "/" + fs.c.Name() + "/"
|
||||
singleDir := true
|
||||
for k, _ := range fs.filesCache {
|
||||
if !strings.HasPrefix(k, rootDir) {
|
||||
singleDir = false
|
||||
}
|
||||
|
||||
rootDir := "/" + fs.t.Name() + "/"
|
||||
singleDir := true
|
||||
for k, _ := range fs.filesCache {
|
||||
if !strings.HasPrefix(k, rootDir) {
|
||||
singleDir = false
|
||||
}
|
||||
}
|
||||
if singleDir {
|
||||
for k, f := range fs.filesCache {
|
||||
delete(fs.filesCache, k)
|
||||
k, _ = strings.CutPrefix(k, rootDir)
|
||||
k = AbsPath(k)
|
||||
fs.filesCache[k] = f
|
||||
}
|
||||
if singleDir {
|
||||
for k, f := range fs.filesCache {
|
||||
delete(fs.filesCache, k)
|
||||
k, _ = strings.CutPrefix(k, rootDir)
|
||||
k = AbsPath(k)
|
||||
fs.filesCache[k] = f
|
||||
}
|
||||
}
|
||||
|
||||
fs.mu.Unlock()
|
||||
}
|
||||
|
||||
return fs.filesCache, nil
|
||||
}
|
||||
|
||||
func listFilesRecursive(vfs Filesystem, start string) (map[string]File, error) {
|
||||
out := make(map[string]File, 0)
|
||||
entries, err := vfs.ReadDir(start)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, entry := range entries {
|
||||
filename := path.Join(start, entry.Name())
|
||||
if entry.IsDir() {
|
||||
rec, err := listFilesRecursive(vfs, filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
maps.Copy(out, rec)
|
||||
} else {
|
||||
file, err := vfs.Open(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[filename] = file
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (fs *TorrentFs) rawOpen(path string) (File, error) {
|
||||
files, err := fs.files()
|
||||
if err != nil {
|
||||
|
@ -113,12 +147,7 @@ func (fs *TorrentFs) rawStat(filename string) (fs.FileInfo, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if file.IsDir() {
|
||||
return newDirInfo(path.Base(filename)), nil
|
||||
} else {
|
||||
return newFileInfo(path.Base(filename), file.Size()), nil
|
||||
}
|
||||
|
||||
return file.Stat()
|
||||
}
|
||||
|
||||
// Stat implements Filesystem.
|
||||
|
@ -184,7 +213,12 @@ func (fs *TorrentFs) Unlink(name string) error {
|
|||
file := files[name]
|
||||
delete(fs.filesCache, name)
|
||||
|
||||
return fs.rep.ExcludeFile(file.file)
|
||||
tfile, ok := file.(*torrentFile)
|
||||
if !ok {
|
||||
return ErrNotImplemented
|
||||
}
|
||||
|
||||
return fs.c.ExcludeFile(tfile.file)
|
||||
}
|
||||
|
||||
type reader interface {
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package vfs
|
||||
|
||||
import "strings"
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func trimRelPath(p, t string) string {
|
||||
return strings.Trim(strings.TrimPrefix(p, t), "/")
|
||||
|
@ -23,3 +26,28 @@ func AddTrailSlash(p string) string {
|
|||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// OnceValueWOErr returns a function that invokes f only once and returns the value
|
||||
// returned by f . The returned function may be called concurrently.
|
||||
//
|
||||
// If f panics, the returned function will panic with the same value on every call.
|
||||
func OnceValueWOErr[T any](f func() (T, error)) func() (T, error) {
|
||||
var (
|
||||
mu sync.Mutex
|
||||
isExecuted bool
|
||||
r1 T
|
||||
err error
|
||||
)
|
||||
|
||||
return func() (T, error) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if isExecuted && err == nil {
|
||||
return r1, nil
|
||||
}
|
||||
|
||||
r1, err = f()
|
||||
return r1, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,19 +2,22 @@ package http
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor"
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"git.kmsign.ru/royalcat/tstor/src/delivery"
|
||||
"git.kmsign.ru/royalcat/tstor/src/host/service"
|
||||
"github.com/anacrolix/missinggo/v2/filecache"
|
||||
"github.com/gin-contrib/pprof"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/shurcooL/httpfs/html/vfstemplate"
|
||||
)
|
||||
|
||||
func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, logPath string, cfg *config.Config) error {
|
||||
log := slog.With()
|
||||
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
r := gin.New()
|
||||
r.Use(gin.Recovery())
|
||||
|
@ -37,6 +40,7 @@ func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, logPath str
|
|||
// r.GET("/routes", routesHandler(ss))
|
||||
r.GET("/logs", logsHandler)
|
||||
r.GET("/servers", serversFoldersHandler())
|
||||
r.Any("/graphql", gin.WrapH(delivery.GraphQLHandler(s)))
|
||||
|
||||
api := r.Group("/api")
|
||||
{
|
||||
|
@ -50,7 +54,7 @@ func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, logPath str
|
|||
|
||||
}
|
||||
|
||||
log.Info().Str("host", fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port)).Msg("starting webserver")
|
||||
log.Info("starting webserver", "host", fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port))
|
||||
|
||||
if err := r.Run(fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port)); err != nil {
|
||||
return fmt.Errorf("error initializing server: %w", err)
|
||||
|
@ -60,7 +64,7 @@ func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, logPath str
|
|||
}
|
||||
|
||||
func Logger() gin.HandlerFunc {
|
||||
l := log.Logger.With().Str("component", "http").Logger()
|
||||
l := slog.With("component", "http")
|
||||
return func(c *gin.Context) {
|
||||
path := c.Request.URL.Path
|
||||
raw := c.Request.URL.RawQuery
|
||||
|
@ -76,11 +80,11 @@ func Logger() gin.HandlerFunc {
|
|||
s := c.Writer.Status()
|
||||
switch {
|
||||
case s >= 400 && s < 500:
|
||||
l.Warn().Str("path", path).Int("status", s).Msg(msg)
|
||||
l.Warn(msg, "path", path, "status", s)
|
||||
case s >= 500:
|
||||
l.Error().Str("path", path).Int("status", s).Msg(msg)
|
||||
l.Error(msg, "path", path, "status", s)
|
||||
default:
|
||||
l.Debug().Str("path", path).Int("status", s).Msg(msg)
|
||||
l.Debug(msg, "path", path, "status", s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,30 +1,35 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var _ badger.Logger = (*Badger)(nil)
|
||||
|
||||
type Badger struct {
|
||||
L zerolog.Logger
|
||||
L *slog.Logger
|
||||
}
|
||||
|
||||
func fmtBadgerLog(m string, f ...any) string {
|
||||
return fmt.Sprintf(strings.ReplaceAll(m, "\n", ""), f...)
|
||||
}
|
||||
|
||||
func (l *Badger) Errorf(m string, f ...interface{}) {
|
||||
l.L.Error().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
|
||||
l.L.Error(fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
||||
func (l *Badger) Warningf(m string, f ...interface{}) {
|
||||
l.L.Warn().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
|
||||
l.L.Warn(fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
||||
func (l *Badger) Infof(m string, f ...interface{}) {
|
||||
l.L.Info().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
|
||||
l.L.Info(fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
||||
func (l *Badger) Debugf(m string, f ...interface{}) {
|
||||
l.L.Debug().Msgf(strings.ReplaceAll(m, "\n", ""), f...)
|
||||
l.L.Debug(fmtBadgerLog(m, f...))
|
||||
}
|
||||
|
|
|
@ -1,50 +1,41 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.kmsign.ru/royalcat/tstor/src/config"
|
||||
"github.com/mattn/go-colorable"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
"github.com/lmittmann/tint"
|
||||
)
|
||||
|
||||
const FileName = "tstor.log"
|
||||
|
||||
func Load(config *config.Log) {
|
||||
var writers []io.Writer
|
||||
|
||||
// fix console colors on windows
|
||||
cso := colorable.NewColorableStdout()
|
||||
|
||||
writers = append(writers, zerolog.ConsoleWriter{Out: cso})
|
||||
writers = append(writers, newRollingFile(config))
|
||||
mw := io.MultiWriter(writers...)
|
||||
|
||||
log.Logger = log.Output(mw)
|
||||
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
|
||||
|
||||
l := zerolog.InfoLevel
|
||||
level := slog.LevelInfo
|
||||
if config.Debug {
|
||||
l = zerolog.DebugLevel
|
||||
level = slog.LevelDebug
|
||||
}
|
||||
|
||||
zerolog.SetGlobalLevel(l)
|
||||
slog.SetDefault(slog.New(
|
||||
tint.NewHandler(os.Stdout, &tint.Options{
|
||||
Level: level,
|
||||
TimeFormat: time.Kitchen,
|
||||
// NoColor: !isatty.IsTerminal(os.Stdout.Fd()),
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
func newRollingFile(config *config.Log) io.Writer {
|
||||
if err := os.MkdirAll(config.Path, 0744); err != nil {
|
||||
log.Error().Err(err).Str("path", config.Path).Msg("can't create log directory")
|
||||
return nil
|
||||
}
|
||||
// func newRollingFile(config *config.Log) io.Writer {
|
||||
// if err := os.MkdirAll(config.Path, 0744); err != nil {
|
||||
// log.Error().Err(err).Str("path", config.Path).Msg("can't create log directory")
|
||||
// return nil
|
||||
// }
|
||||
|
||||
return &lumberjack.Logger{
|
||||
Filename: filepath.Join(config.Path, FileName),
|
||||
MaxBackups: config.MaxBackups, // files
|
||||
MaxSize: config.MaxSize, // megabytes
|
||||
MaxAge: config.MaxAge, // days
|
||||
}
|
||||
}
|
||||
// return &lumberjack.Logger{
|
||||
// Filename: filepath.Join(config.Path, FileName),
|
||||
// MaxBackups: config.MaxBackups, // files
|
||||
// MaxSize: config.MaxSize, // megabytes
|
||||
// MaxAge: config.MaxAge, // days
|
||||
// }
|
||||
// }
|
||||
|
|
169
src/log/nfs.go
169
src/log/nfs.go
|
@ -2,172 +2,179 @@ package log
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"log/slog"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
nfs "github.com/willscott/go-nfs"
|
||||
)
|
||||
|
||||
var _ nfs.Logger = (*NFSLog)(nil)
|
||||
|
||||
type NFSLog struct {
|
||||
r zerolog.Logger
|
||||
l zerolog.Logger
|
||||
// r *slog.Logger
|
||||
l *slog.Logger
|
||||
}
|
||||
|
||||
func NewNFSLog(r zerolog.Logger) nfs.Logger {
|
||||
func NewNFSLog(r *slog.Logger) nfs.Logger {
|
||||
return &NFSLog{
|
||||
r: r,
|
||||
l: r.Level(zerolog.DebugLevel),
|
||||
// r: r,
|
||||
// l: r.Level(zerolog.DebugLevel),
|
||||
l: r,
|
||||
}
|
||||
}
|
||||
|
||||
// Debug implements nfs.Logger.
|
||||
func (l *NFSLog) Debug(args ...interface{}) {
|
||||
l.l.Debug().Msg(fmt.Sprint(args...))
|
||||
l.l.Debug(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Debugf implements nfs.Logger.
|
||||
func (l *NFSLog) Debugf(format string, args ...interface{}) {
|
||||
l.l.Debug().Msgf(format, args...)
|
||||
l.l.Debug(fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
// Error implements nfs.Logger.
|
||||
func (l *NFSLog) Error(args ...interface{}) {
|
||||
l.l.Error().Msg(fmt.Sprint(args...))
|
||||
l.l.Error(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Errorf implements nfs.Logger.
|
||||
func (l *NFSLog) Errorf(format string, args ...interface{}) {
|
||||
l.l.Error().Msgf(format, args...)
|
||||
l.l.Error(fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
// Fatal implements nfs.Logger.
|
||||
func (l *NFSLog) Fatal(args ...interface{}) {
|
||||
l.l.Fatal().Msg(fmt.Sprint(args...))
|
||||
l.l.Error(fmt.Sprint(args...))
|
||||
log.Fatal(args...)
|
||||
}
|
||||
|
||||
// Fatalf implements nfs.Logger.
|
||||
func (l *NFSLog) Fatalf(format string, args ...interface{}) {
|
||||
l.l.Fatal().Msgf(format, args...)
|
||||
l.l.Error(fmt.Sprintf(format, args...))
|
||||
log.Fatalf(format, args...)
|
||||
}
|
||||
|
||||
// Info implements nfs.Logger.
|
||||
func (l *NFSLog) Info(args ...interface{}) {
|
||||
l.l.Info().Msg(fmt.Sprint(args...))
|
||||
l.l.Info(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Infof implements nfs.Logger.
|
||||
func (l *NFSLog) Infof(format string, args ...interface{}) {
|
||||
l.l.Info().Msgf(format, args...)
|
||||
l.l.Info(fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
// Panic implements nfs.Logger.
|
||||
func (l *NFSLog) Panic(args ...interface{}) {
|
||||
l.l.Panic().Msg(fmt.Sprint(args...))
|
||||
l.l.Error(fmt.Sprint(args...))
|
||||
panic(args)
|
||||
}
|
||||
|
||||
// Panicf implements nfs.Logger.
|
||||
func (l *NFSLog) Panicf(format string, args ...interface{}) {
|
||||
l.l.Panic().Msgf(format, args...)
|
||||
l.l.Error(fmt.Sprintf(format, args...))
|
||||
panic(args)
|
||||
}
|
||||
|
||||
// Print implements nfs.Logger.
|
||||
func (l *NFSLog) Print(args ...interface{}) {
|
||||
l.l.Print(args...)
|
||||
l.l.Info(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Printf implements nfs.Logger.
|
||||
func (l *NFSLog) Printf(format string, args ...interface{}) {
|
||||
l.l.Printf(format, args...)
|
||||
l.l.Info(fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
// Trace implements nfs.Logger.
|
||||
func (l *NFSLog) Trace(args ...interface{}) {
|
||||
l.l.Trace().Msg(fmt.Sprint(args...))
|
||||
l.l.Debug(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Tracef implements nfs.Logger.
|
||||
func (l *NFSLog) Tracef(format string, args ...interface{}) {
|
||||
l.l.Trace().Msgf(format, args...)
|
||||
l.l.Debug(fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
// Warn implements nfs.Logger.
|
||||
func (l *NFSLog) Warn(args ...interface{}) {
|
||||
l.l.Warn().Msg(fmt.Sprint(args...))
|
||||
l.l.Warn(fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Warnf implements nfs.Logger.
|
||||
func (l *NFSLog) Warnf(format string, args ...interface{}) {
|
||||
l.l.Warn().Msgf(format, args...)
|
||||
l.l.Warn(fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
// GetLevel implements nfs.Logger.
|
||||
func (l *NFSLog) GetLevel() nfs.LogLevel {
|
||||
zl := l.l.GetLevel()
|
||||
switch zl {
|
||||
case zerolog.PanicLevel, zerolog.Disabled:
|
||||
return nfs.PanicLevel
|
||||
case zerolog.FatalLevel:
|
||||
return nfs.FatalLevel
|
||||
case zerolog.ErrorLevel:
|
||||
return nfs.ErrorLevel
|
||||
case zerolog.WarnLevel:
|
||||
return nfs.WarnLevel
|
||||
case zerolog.InfoLevel:
|
||||
return nfs.InfoLevel
|
||||
case zerolog.DebugLevel:
|
||||
return nfs.DebugLevel
|
||||
case zerolog.TraceLevel:
|
||||
return nfs.TraceLevel
|
||||
}
|
||||
return nfs.DebugLevel
|
||||
// zl := l.l.Handler()
|
||||
// switch zl {
|
||||
// case zerolog.PanicLevel, zerolog.Disabled:
|
||||
// return nfs.PanicLevel
|
||||
// case zerolog.FatalLevel:
|
||||
// return nfs.FatalLevel
|
||||
// case zerolog.ErrorLevel:
|
||||
// return nfs.ErrorLevel
|
||||
// case zerolog.WarnLevel:
|
||||
// return nfs.WarnLevel
|
||||
// case zerolog.InfoLevel:
|
||||
// return nfs.InfoLevel
|
||||
// case zerolog.DebugLevel:
|
||||
// return nfs.DebugLevel
|
||||
// case zerolog.TraceLevel:
|
||||
// return nfs.TraceLevel
|
||||
// }
|
||||
return nfs.TraceLevel
|
||||
}
|
||||
|
||||
// ParseLevel implements nfs.Logger.
|
||||
func (l *NFSLog) ParseLevel(level string) (nfs.LogLevel, error) {
|
||||
switch level {
|
||||
case "panic":
|
||||
return nfs.PanicLevel, nil
|
||||
case "fatal":
|
||||
return nfs.FatalLevel, nil
|
||||
case "error":
|
||||
return nfs.ErrorLevel, nil
|
||||
case "warn":
|
||||
return nfs.WarnLevel, nil
|
||||
case "info":
|
||||
return nfs.InfoLevel, nil
|
||||
case "debug":
|
||||
return nfs.DebugLevel, nil
|
||||
case "trace":
|
||||
return nfs.TraceLevel, nil
|
||||
}
|
||||
var ll nfs.LogLevel
|
||||
return ll, fmt.Errorf("invalid log level %q", level)
|
||||
// switch level {
|
||||
// case "panic":
|
||||
// return nfs.PanicLevel, nil
|
||||
// case "fatal":
|
||||
// return nfs.FatalLevel, nil
|
||||
// case "error":
|
||||
// return nfs.ErrorLevel, nil
|
||||
// case "warn":
|
||||
// return nfs.WarnLevel, nil
|
||||
// case "info":
|
||||
// return nfs.InfoLevel, nil
|
||||
// case "debug":
|
||||
// return nfs.DebugLevel, nil
|
||||
// case "trace":
|
||||
// return nfs.TraceLevel, nil
|
||||
// }
|
||||
// var ll nfs.LogLevel
|
||||
// return ll, fmt.Errorf("invalid log level %q", level)
|
||||
return nfs.TraceLevel, fmt.Errorf("level change not supported")
|
||||
}
|
||||
|
||||
// SetLevel implements nfs.Logger.
|
||||
func (l *NFSLog) SetLevel(level nfs.LogLevel) {
|
||||
switch level {
|
||||
case nfs.PanicLevel:
|
||||
l.l = l.r.Level(zerolog.PanicLevel)
|
||||
return
|
||||
case nfs.FatalLevel:
|
||||
l.l = l.r.Level(zerolog.FatalLevel)
|
||||
return
|
||||
case nfs.ErrorLevel:
|
||||
l.l = l.r.Level(zerolog.ErrorLevel)
|
||||
return
|
||||
case nfs.WarnLevel:
|
||||
l.l = l.r.Level(zerolog.WarnLevel)
|
||||
return
|
||||
case nfs.InfoLevel:
|
||||
l.l = l.r.Level(zerolog.InfoLevel)
|
||||
return
|
||||
case nfs.DebugLevel:
|
||||
l.l = l.r.Level(zerolog.DebugLevel)
|
||||
return
|
||||
case nfs.TraceLevel:
|
||||
l.l = l.r.Level(zerolog.TraceLevel)
|
||||
return
|
||||
}
|
||||
// switch level {
|
||||
// case nfs.PanicLevel:
|
||||
// l.l = l.r.Level(zerolog.PanicLevel)
|
||||
// return
|
||||
// case nfs.FatalLevel:
|
||||
// l.l = l.r.Level(zerolog.FatalLevel)
|
||||
// return
|
||||
// case nfs.ErrorLevel:
|
||||
// l.l = l.r.Level(zerolog.ErrorLevel)
|
||||
// return
|
||||
// case nfs.WarnLevel:
|
||||
// l.l = l.r.Level(zerolog.WarnLevel)
|
||||
// return
|
||||
// case nfs.InfoLevel:
|
||||
// l.l = l.r.Level(zerolog.InfoLevel)
|
||||
// return
|
||||
// case nfs.DebugLevel:
|
||||
// l.l = l.r.Level(zerolog.DebugLevel)
|
||||
// return
|
||||
// case nfs.TraceLevel:
|
||||
// l.l = l.r.Level(zerolog.TraceLevel)
|
||||
// return
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -1,32 +1,34 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/anacrolix/log"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var _ log.Handler = &Torrent{}
|
||||
|
||||
type Torrent struct {
|
||||
L zerolog.Logger
|
||||
L *slog.Logger
|
||||
}
|
||||
|
||||
func (l *Torrent) Handle(r log.Record) {
|
||||
e := l.L.Info()
|
||||
lvl := slog.LevelInfo
|
||||
switch r.Level {
|
||||
case log.Debug:
|
||||
e = l.L.Debug()
|
||||
lvl = slog.LevelInfo
|
||||
case log.Info:
|
||||
e = l.L.Debug().Str("error-type", "info")
|
||||
lvl = slog.LevelInfo
|
||||
case log.Warning:
|
||||
e = l.L.Warn()
|
||||
lvl = slog.LevelWarn
|
||||
case log.Error:
|
||||
e = l.L.Warn().Str("error-type", "error")
|
||||
lvl = slog.LevelError
|
||||
case log.Critical:
|
||||
e = l.L.Warn().Str("error-type", "critical")
|
||||
lvl = slog.LevelError
|
||||
}
|
||||
|
||||
// TODO set log values somehow
|
||||
|
||||
e.Msgf(r.Text())
|
||||
l.L.Log(context.Background(), lvl, r.Msg.String())
|
||||
}
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
package proto
|
||||
|
||||
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go_opt=Mtstor.proto=git.kmsign.ru/royalcat/tstor/src/proto --go-grpc_out=. --go-grpc_opt=paths=source_relative --go-grpc_opt=Mtstor.proto=git.kmsign.ru/royalcat/tstor/src/proto --proto_path=../../proto tstor.proto
|
Loading…
Add table
Add a link
Reference in a new issue