refactor continue
All checks were successful
docker / build-docker (linux/386) (push) Successful in 1m23s
docker / build-docker (linux/amd64) (push) Successful in 1m25s
docker / build-docker (linux/arm64) (push) Successful in 8m18s
docker / build-docker (linux/arm/v7) (push) Successful in 9m8s
docker / build-docker (linux/arm64/v8) (push) Successful in 8m7s

This commit is contained in:
royalcat 2024-05-20 00:36:22 +03:00
parent 991c15fdef
commit ef6680b854
8 changed files with 51 additions and 73 deletions

View file

@ -1,4 +1,4 @@
package store package torrent
import ( import (
"log/slog" "log/slog"
@ -15,7 +15,7 @@ import (
) )
// MOVE // MOVE
func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) { func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) {
l := slog.With("component", "torrent-client") l := slog.With("component", "torrent-client")
// TODO download and upload limits // TODO download and upload limits

View file

@ -1,4 +1,4 @@
package store package torrent
import ( import (
"bytes" "bytes"
@ -11,14 +11,14 @@ import (
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
) )
var _ bep44.Store = &FileItemStore{} var _ bep44.Store = &fileItemStore{}
type FileItemStore struct { type fileItemStore struct {
ttl time.Duration ttl time.Duration
db *badger.DB db *badger.DB
} }
func NewFileItemStore(path string, itemsTTL time.Duration) (*FileItemStore, error) { func newFileItemStore(path string, itemsTTL time.Duration) (*fileItemStore, error) {
l := slog.With("component", "item-store") l := slog.With("component", "item-store")
opts := badger.DefaultOptions(path). opts := badger.DefaultOptions(path).
@ -35,13 +35,13 @@ func NewFileItemStore(path string, itemsTTL time.Duration) (*FileItemStore, erro
return nil, err return nil, err
} }
return &FileItemStore{ return &fileItemStore{
db: db, db: db,
ttl: itemsTTL, ttl: itemsTTL,
}, nil }, nil
} }
func (fis *FileItemStore) Put(i *bep44.Item) error { func (fis *fileItemStore) Put(i *bep44.Item) error {
tx := fis.db.NewTransaction(true) tx := fis.db.NewTransaction(true)
defer tx.Discard() defer tx.Discard()
@ -61,7 +61,7 @@ func (fis *FileItemStore) Put(i *bep44.Item) error {
return tx.Commit() return tx.Commit()
} }
func (fis *FileItemStore) Get(t bep44.Target) (*bep44.Item, error) { func (fis *fileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
tx := fis.db.NewTransaction(false) tx := fis.db.NewTransaction(false)
defer tx.Discard() defer tx.Discard()
@ -87,11 +87,11 @@ func (fis *FileItemStore) Get(t bep44.Target) (*bep44.Item, error) {
return i, nil return i, nil
} }
func (fis *FileItemStore) Del(t bep44.Target) error { func (fis *fileItemStore) Del(t bep44.Target) error {
// ignore this // ignore this
return nil return nil
} }
func (fis *FileItemStore) Close() error { func (fis *fileItemStore) Close() error {
return fis.db.Close() return fis.db.Close()
} }

View file

@ -1,4 +1,4 @@
package store package torrent
import ( import (
"crypto/rand" "crypto/rand"
@ -7,7 +7,7 @@ import (
var emptyBytes [20]byte var emptyBytes [20]byte
func GetOrCreatePeerID(p string) ([20]byte, error) { func getOrCreatePeerID(p string) ([20]byte, error) {
idb, err := os.ReadFile(p) idb, err := os.ReadFile(p)
if err == nil { if err == nil {
var out [20]byte var out [20]byte

View file

@ -1,4 +1,4 @@
package store package torrent
import ( import (
"encoding/binary" "encoding/binary"
@ -32,7 +32,7 @@ type badgerPieceCompletion struct {
var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil) var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil)
func NewBadgerPieceCompletion(dir string) (storage.PieceCompletion, error) { func newPieceCompletion(dir string) (storage.PieceCompletion, error) {
l := slog.With("component", "badger", "db", "piece-completion") l := slog.With("component", "badger", "db", "piece-completion")
opts := badger. opts := badger.

View file

@ -16,7 +16,6 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host/store"
"git.kmsign.ru/royalcat/tstor/src/host/tkv" "git.kmsign.ru/royalcat/tstor/src/host/tkv"
"git.kmsign.ru/royalcat/tstor/src/host/vfs" "git.kmsign.ru/royalcat/tstor/src/host/vfs"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
@ -44,8 +43,8 @@ type Service struct {
client *torrent.Client client *torrent.Client
excludedFiles *filesMappingsStore excludedFiles *filesMappingsStore
infoBytes *infoBytesStore infoBytes *infoBytesStore
Storage *DataStorage Storage *fileStorage
fis *store.FileItemStore fis *fileItemStore
dirsAquire kv.Store[string, DirAquire] dirsAquire kv.Store[string, DirAquire]
loadMutex sync.Mutex loadMutex sync.Mutex
@ -69,7 +68,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service,
return nil, fmt.Errorf("error creating metadata folder: %w", err) return nil, fmt.Errorf("error creating metadata folder: %w", err)
} }
s.fis, err = store.NewFileItemStore(filepath.Join(conf.MetadataFolder, "items"), 2*time.Hour) s.fis, err = newFileItemStore(filepath.Join(conf.MetadataFolder, "items"), 2*time.Hour)
if err != nil { if err != nil {
return nil, fmt.Errorf("error starting item store: %w", err) return nil, fmt.Errorf("error starting item store: %w", err)
} }
@ -89,12 +88,12 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service,
return nil, err return nil, err
} }
id, err := store.GetOrCreatePeerID(filepath.Join(conf.MetadataFolder, "ID")) id, err := getOrCreatePeerID(filepath.Join(conf.MetadataFolder, "ID"))
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating node ID: %w", err) return nil, fmt.Errorf("error creating node ID: %w", err)
} }
client, err := store.NewClient(s.Storage, s.fis, &conf, id) client, err := newClient(s.Storage, s.fis, &conf, id)
if err != nil { if err != nil {
return nil, fmt.Errorf("error starting torrent client: %w", err) return nil, fmt.Errorf("error starting torrent client: %w", err)
} }

View file

@ -6,16 +6,15 @@ import (
"path/filepath" "path/filepath"
"git.kmsign.ru/royalcat/tstor/src/config" "git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host/store"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
func setupStorage(cfg config.TorrentClient) (*DataStorage, storage.PieceCompletion, error) { func setupStorage(cfg config.TorrentClient) (*fileStorage, storage.PieceCompletion, error) {
pcp := filepath.Join(cfg.MetadataFolder, "piece-completion") pcp := filepath.Join(cfg.MetadataFolder, "piece-completion")
if err := os.MkdirAll(pcp, 0744); err != nil { if err := os.MkdirAll(pcp, 0744); err != nil {
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err) return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
} }
pc, err := store.NewBadgerPieceCompletion(pcp) pc, err := newPieceCompletion(pcp)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err) return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
} }

View file

@ -1,4 +1,4 @@
package store package torrent
import ( import (
"context" "context"
@ -11,7 +11,19 @@ import (
"github.com/dgraph-io/ristretto/z" "github.com/dgraph-io/ristretto/z"
) )
func NewStatsHistory(metaDir string, lifetime time.Duration) (*StatsHistory, error) { type TorrentStat struct {
Name string `json:"name"`
Hash string `json:"hash"`
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
Peers int `json:"peers"`
Seeders int `json:"seeders"`
PieceChunks []*PieceChunk `json:"pieceChunks"`
TotalPieces int `json:"totalPieces"`
PieceSize int64 `json:"pieceSize"`
}
func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) {
db, err := badger.OpenManaged( db, err := badger.OpenManaged(
badger. badger.
DefaultOptions(path.Join(metaDir, "stats-history")). DefaultOptions(path.Join(metaDir, "stats-history")).
@ -26,45 +38,18 @@ func NewStatsHistory(metaDir string, lifetime time.Duration) (*StatsHistory, err
db.SetDiscardTs(uint64(n.Add(-lifetime).Unix())) db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
} }
}() }()
r := &StatsHistory{ r := &statsStore{
db: db, db: db,
} }
return r, nil return r, nil
} }
type StatsHistory struct { type statsStore struct {
db *badger.DB db *badger.DB
} }
type TorrentStat struct { func (r *statsStore) AddStat(ih infohash.T, stat TorrentStat) error {
Name string `json:"name"`
Hash string `json:"hash"`
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
Peers int `json:"peers"`
Seeders int `json:"seeders"`
PieceChunks []*PieceChunk `json:"pieceChunks"`
TotalPieces int `json:"totalPieces"`
PieceSize int64 `json:"pieceSize"`
}
type PieceChunk struct {
Status PieceStatus `json:"status"`
NumPieces int `json:"numPieces"`
}
type PieceStatus string
const (
Checking PieceStatus = "H"
Partial PieceStatus = "P"
Complete PieceStatus = "C"
Waiting PieceStatus = "W"
Error PieceStatus = "?"
)
func (r *StatsHistory) AddStat(ih infohash.T, stat TorrentStat) error {
data, err := json.Marshal(stat) data, err := json.Marshal(stat)
if err != nil { if err != nil {
return err return err
@ -75,7 +60,7 @@ func (r *StatsHistory) AddStat(ih infohash.T, stat TorrentStat) error {
}) })
} }
func (r *StatsHistory) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) { func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) {
var stats GlobalTorrentStats var stats GlobalTorrentStats
stream := r.db.NewStream() stream := r.db.NewStream()
stream.SinceTs = uint64(since.Unix()) stream.SinceTs = uint64(since.Unix())
@ -99,8 +84,3 @@ func (r *StatsHistory) ReadStatsHistory(ctx context.Context, since time.Time) (G
} }
return stats, nil return stats, nil
} }
type GlobalTorrentStats struct {
DownloadedBytes int64 `json:"downloadedBytes"`
UploadedBytes int64 `json:"uploadedBytes"`
}

View file

@ -23,8 +23,8 @@ import (
) )
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem. // NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage { func NewFileStorage(baseDir string, pc storage.PieceCompletion) *fileStorage {
return &DataStorage{ return &fileStorage{
ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{ ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{
ClientBaseDir: baseDir, ClientBaseDir: baseDir,
PieceCompletion: pc, PieceCompletion: pc,
@ -38,14 +38,14 @@ func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage {
} }
// File-based storage for torrents, that isn't yet bound to a particular torrent. // File-based storage for torrents, that isn't yet bound to a particular torrent.
type DataStorage struct { type fileStorage struct {
baseDir string baseDir string
storage.ClientImplCloser storage.ClientImplCloser
pieceCompletion storage.PieceCompletion pieceCompletion storage.PieceCompletion
log *slog.Logger log *slog.Logger
} }
func (me *DataStorage) Close() error { func (me *fileStorage) Close() error {
return me.pieceCompletion.Close() return me.pieceCompletion.Close()
} }
@ -62,14 +62,14 @@ func filePath(opts storage.FilePathMakerOpts) string {
return filepath.Join(opts.File.Path...) return filepath.Join(opts.File.Path...)
} }
func (fs *DataStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string { func (fs *fileStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string {
return filepath.Join(torrentDir(fs.baseDir, info, infoHash), filePath(storage.FilePathMakerOpts{ return filepath.Join(torrentDir(fs.baseDir, info, infoHash), filePath(storage.FilePathMakerOpts{
Info: info, Info: info,
File: fileInfo, File: fileInfo,
})) }))
} }
func (fs *DataStorage) DeleteFile(file *torrent.File) error { func (fs *fileStorage) DeleteFile(file *torrent.File) error {
info := file.Torrent().Info() info := file.Torrent().Info()
infoHash := file.Torrent().InfoHash() infoHash := file.Torrent().InfoHash()
torrentDir := torrentDir(fs.baseDir, info, infoHash) torrentDir := torrentDir(fs.baseDir, info, infoHash)
@ -89,7 +89,7 @@ func (fs *DataStorage) DeleteFile(file *torrent.File) error {
return os.Remove(filePath) return os.Remove(filePath)
} }
func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) { func (fs *fileStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) {
log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun) log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun)
expectedEntries := []string{} expectedEntries := []string{}
@ -128,7 +128,7 @@ func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*Controller,
return toDelete, nil return toDelete, nil
} }
func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) { func (s *fileStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) {
log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun) log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun)
expectedEntries := []string{} expectedEntries := []string{}
@ -189,7 +189,7 @@ func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*Controller,
return toDelete, nil return toDelete, nil
} }
func (s *DataStorage) iterFiles(ctx context.Context, iter func(ctx context.Context, path string, entry fs.FileInfo) error) error { func (s *fileStorage) iterFiles(ctx context.Context, iter func(ctx context.Context, path string, entry fs.FileInfo) error) error {
return filepath.Walk(s.baseDir, return filepath.Walk(s.baseDir,
func(path string, info fs.FileInfo, err error) error { func(path string, info fs.FileInfo, err error) error {
if err != nil { if err != nil {
@ -207,7 +207,7 @@ func (s *DataStorage) iterFiles(ctx context.Context, iter func(ctx context.Conte
}) })
} }
func (s *DataStorage) Dedupe(ctx context.Context) (uint64, error) { func (s *fileStorage) Dedupe(ctx context.Context) (uint64, error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe")) ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe"))
defer span.End() defer span.End()
@ -290,7 +290,7 @@ func applyErr[E, O any](in []E, apply func(E) (O, error)) ([]O, error) {
// const blockSize uint64 = 4096 // const blockSize uint64 = 4096
func (s *DataStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) { func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("dedupeFiles"), trace.WithAttributes( ctx, span := tracer.Start(ctx, fmt.Sprintf("dedupeFiles"), trace.WithAttributes(
attribute.StringSlice("files", paths), attribute.StringSlice("files", paths),
)) ))