diff --git a/src/host/store/client.go b/src/host/torrent/client.go similarity index 95% rename from src/host/store/client.go rename to src/host/torrent/client.go index d7598c1..6bb29f3 100644 --- a/src/host/store/client.go +++ b/src/host/torrent/client.go @@ -1,4 +1,4 @@ -package store +package torrent import ( "log/slog" @@ -15,7 +15,7 @@ import ( ) // MOVE -func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) { +func newClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient, id [20]byte) (*torrent.Client, error) { l := slog.With("component", "torrent-client") // TODO download and upload limits diff --git a/src/host/store/fileitem.go b/src/host/torrent/fileitem.go similarity index 78% rename from src/host/store/fileitem.go rename to src/host/torrent/fileitem.go index 8192b67..259cb29 100644 --- a/src/host/store/fileitem.go +++ b/src/host/torrent/fileitem.go @@ -1,4 +1,4 @@ -package store +package torrent import ( "bytes" @@ -11,14 +11,14 @@ import ( "github.com/dgraph-io/badger/v4" ) -var _ bep44.Store = &FileItemStore{} +var _ bep44.Store = &fileItemStore{} -type FileItemStore struct { +type fileItemStore struct { ttl time.Duration db *badger.DB } -func NewFileItemStore(path string, itemsTTL time.Duration) (*FileItemStore, error) { +func newFileItemStore(path string, itemsTTL time.Duration) (*fileItemStore, error) { l := slog.With("component", "item-store") opts := badger.DefaultOptions(path). @@ -35,13 +35,13 @@ func NewFileItemStore(path string, itemsTTL time.Duration) (*FileItemStore, erro return nil, err } - return &FileItemStore{ + return &fileItemStore{ db: db, ttl: itemsTTL, }, nil } -func (fis *FileItemStore) Put(i *bep44.Item) error { +func (fis *fileItemStore) Put(i *bep44.Item) error { tx := fis.db.NewTransaction(true) defer tx.Discard() @@ -61,7 +61,7 @@ func (fis *FileItemStore) Put(i *bep44.Item) error { return tx.Commit() } -func (fis *FileItemStore) Get(t bep44.Target) (*bep44.Item, error) { +func (fis *fileItemStore) Get(t bep44.Target) (*bep44.Item, error) { tx := fis.db.NewTransaction(false) defer tx.Discard() @@ -87,11 +87,11 @@ func (fis *FileItemStore) Get(t bep44.Target) (*bep44.Item, error) { return i, nil } -func (fis *FileItemStore) Del(t bep44.Target) error { +func (fis *fileItemStore) Del(t bep44.Target) error { // ignore this return nil } -func (fis *FileItemStore) Close() error { +func (fis *fileItemStore) Close() error { return fis.db.Close() } diff --git a/src/host/store/id.go b/src/host/torrent/id.go similarity index 83% rename from src/host/store/id.go rename to src/host/torrent/id.go index d8b59a7..047d8d8 100644 --- a/src/host/store/id.go +++ b/src/host/torrent/id.go @@ -1,4 +1,4 @@ -package store +package torrent import ( "crypto/rand" @@ -7,7 +7,7 @@ import ( var emptyBytes [20]byte -func GetOrCreatePeerID(p string) ([20]byte, error) { +func getOrCreatePeerID(p string) ([20]byte, error) { idb, err := os.ReadFile(p) if err == nil { var out [20]byte diff --git a/src/host/store/piece-completion.go b/src/host/torrent/piece_completion.go similarity index 96% rename from src/host/store/piece-completion.go rename to src/host/torrent/piece_completion.go index 61011bb..9766c3a 100644 --- a/src/host/store/piece-completion.go +++ b/src/host/torrent/piece_completion.go @@ -1,4 +1,4 @@ -package store +package torrent import ( "encoding/binary" @@ -32,7 +32,7 @@ type badgerPieceCompletion struct { var _ storage.PieceCompletion = (*badgerPieceCompletion)(nil) -func NewBadgerPieceCompletion(dir string) (storage.PieceCompletion, error) { +func newPieceCompletion(dir string) (storage.PieceCompletion, error) { l := slog.With("component", "badger", "db", "piece-completion") opts := badger. diff --git a/src/host/torrent/service.go b/src/host/torrent/service.go index b14e812..bab9867 100644 --- a/src/host/torrent/service.go +++ b/src/host/torrent/service.go @@ -16,7 +16,6 @@ import ( "git.kmsign.ru/royalcat/tstor/pkg/ctxio" "git.kmsign.ru/royalcat/tstor/pkg/rlog" "git.kmsign.ru/royalcat/tstor/src/config" - "git.kmsign.ru/royalcat/tstor/src/host/store" "git.kmsign.ru/royalcat/tstor/src/host/tkv" "git.kmsign.ru/royalcat/tstor/src/host/vfs" "go.opentelemetry.io/otel" @@ -44,8 +43,8 @@ type Service struct { client *torrent.Client excludedFiles *filesMappingsStore infoBytes *infoBytesStore - Storage *DataStorage - fis *store.FileItemStore + Storage *fileStorage + fis *fileItemStore dirsAquire kv.Store[string, DirAquire] loadMutex sync.Mutex @@ -69,7 +68,7 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, return nil, fmt.Errorf("error creating metadata folder: %w", err) } - s.fis, err = store.NewFileItemStore(filepath.Join(conf.MetadataFolder, "items"), 2*time.Hour) + s.fis, err = newFileItemStore(filepath.Join(conf.MetadataFolder, "items"), 2*time.Hour) if err != nil { return nil, fmt.Errorf("error starting item store: %w", err) } @@ -89,12 +88,12 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service, return nil, err } - id, err := store.GetOrCreatePeerID(filepath.Join(conf.MetadataFolder, "ID")) + id, err := getOrCreatePeerID(filepath.Join(conf.MetadataFolder, "ID")) if err != nil { return nil, fmt.Errorf("error creating node ID: %w", err) } - client, err := store.NewClient(s.Storage, s.fis, &conf, id) + client, err := newClient(s.Storage, s.fis, &conf, id) if err != nil { return nil, fmt.Errorf("error starting torrent client: %w", err) } diff --git a/src/host/torrent/setup.go b/src/host/torrent/setup.go index 3940cdb..667176a 100644 --- a/src/host/torrent/setup.go +++ b/src/host/torrent/setup.go @@ -6,16 +6,15 @@ import ( "path/filepath" "git.kmsign.ru/royalcat/tstor/src/config" - "git.kmsign.ru/royalcat/tstor/src/host/store" "github.com/anacrolix/torrent/storage" ) -func setupStorage(cfg config.TorrentClient) (*DataStorage, storage.PieceCompletion, error) { +func setupStorage(cfg config.TorrentClient) (*fileStorage, storage.PieceCompletion, error) { pcp := filepath.Join(cfg.MetadataFolder, "piece-completion") if err := os.MkdirAll(pcp, 0744); err != nil { return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err) } - pc, err := store.NewBadgerPieceCompletion(pcp) + pc, err := newPieceCompletion(pcp) if err != nil { return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err) } diff --git a/src/host/store/stats.go b/src/host/torrent/stats_store.go similarity index 68% rename from src/host/store/stats.go rename to src/host/torrent/stats_store.go index 0986566..417df1a 100644 --- a/src/host/store/stats.go +++ b/src/host/torrent/stats_store.go @@ -1,4 +1,4 @@ -package store +package torrent import ( "context" @@ -11,7 +11,19 @@ import ( "github.com/dgraph-io/ristretto/z" ) -func NewStatsHistory(metaDir string, lifetime time.Duration) (*StatsHistory, error) { +type TorrentStat struct { + Name string `json:"name"` + Hash string `json:"hash"` + DownloadedBytes int64 `json:"downloadedBytes"` + UploadedBytes int64 `json:"uploadedBytes"` + Peers int `json:"peers"` + Seeders int `json:"seeders"` + PieceChunks []*PieceChunk `json:"pieceChunks"` + TotalPieces int `json:"totalPieces"` + PieceSize int64 `json:"pieceSize"` +} + +func newStatsStore(metaDir string, lifetime time.Duration) (*statsStore, error) { db, err := badger.OpenManaged( badger. DefaultOptions(path.Join(metaDir, "stats-history")). @@ -26,45 +38,18 @@ func NewStatsHistory(metaDir string, lifetime time.Duration) (*StatsHistory, err db.SetDiscardTs(uint64(n.Add(-lifetime).Unix())) } }() - r := &StatsHistory{ + r := &statsStore{ db: db, } return r, nil } -type StatsHistory struct { +type statsStore struct { db *badger.DB } -type TorrentStat struct { - Name string `json:"name"` - Hash string `json:"hash"` - DownloadedBytes int64 `json:"downloadedBytes"` - UploadedBytes int64 `json:"uploadedBytes"` - Peers int `json:"peers"` - Seeders int `json:"seeders"` - PieceChunks []*PieceChunk `json:"pieceChunks"` - TotalPieces int `json:"totalPieces"` - PieceSize int64 `json:"pieceSize"` -} - -type PieceChunk struct { - Status PieceStatus `json:"status"` - NumPieces int `json:"numPieces"` -} - -type PieceStatus string - -const ( - Checking PieceStatus = "H" - Partial PieceStatus = "P" - Complete PieceStatus = "C" - Waiting PieceStatus = "W" - Error PieceStatus = "?" -) - -func (r *StatsHistory) AddStat(ih infohash.T, stat TorrentStat) error { +func (r *statsStore) AddStat(ih infohash.T, stat TorrentStat) error { data, err := json.Marshal(stat) if err != nil { return err @@ -75,7 +60,7 @@ func (r *StatsHistory) AddStat(ih infohash.T, stat TorrentStat) error { }) } -func (r *StatsHistory) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) { +func (r *statsStore) ReadStatsHistory(ctx context.Context, since time.Time) (GlobalTorrentStats, error) { var stats GlobalTorrentStats stream := r.db.NewStream() stream.SinceTs = uint64(since.Unix()) @@ -99,8 +84,3 @@ func (r *StatsHistory) ReadStatsHistory(ctx context.Context, since time.Time) (G } return stats, nil } - -type GlobalTorrentStats struct { - DownloadedBytes int64 `json:"downloadedBytes"` - UploadedBytes int64 `json:"uploadedBytes"` -} diff --git a/src/host/torrent/storage.go b/src/host/torrent/storage.go index fe88714..9636424 100644 --- a/src/host/torrent/storage.go +++ b/src/host/torrent/storage.go @@ -23,8 +23,8 @@ import ( ) // NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem. -func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage { - return &DataStorage{ +func NewFileStorage(baseDir string, pc storage.PieceCompletion) *fileStorage { + return &fileStorage{ ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{ ClientBaseDir: baseDir, PieceCompletion: pc, @@ -38,14 +38,14 @@ func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage { } // File-based storage for torrents, that isn't yet bound to a particular torrent. -type DataStorage struct { +type fileStorage struct { baseDir string storage.ClientImplCloser pieceCompletion storage.PieceCompletion log *slog.Logger } -func (me *DataStorage) Close() error { +func (me *fileStorage) Close() error { return me.pieceCompletion.Close() } @@ -62,14 +62,14 @@ func filePath(opts storage.FilePathMakerOpts) string { return filepath.Join(opts.File.Path...) } -func (fs *DataStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string { +func (fs *fileStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string { return filepath.Join(torrentDir(fs.baseDir, info, infoHash), filePath(storage.FilePathMakerOpts{ Info: info, File: fileInfo, })) } -func (fs *DataStorage) DeleteFile(file *torrent.File) error { +func (fs *fileStorage) DeleteFile(file *torrent.File) error { info := file.Torrent().Info() infoHash := file.Torrent().InfoHash() torrentDir := torrentDir(fs.baseDir, info, infoHash) @@ -89,7 +89,7 @@ func (fs *DataStorage) DeleteFile(file *torrent.File) error { return os.Remove(filePath) } -func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) { +func (fs *fileStorage) CleanupDirs(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) { log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun) expectedEntries := []string{} @@ -128,7 +128,7 @@ func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*Controller, return toDelete, nil } -func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) { +func (s *fileStorage) CleanupFiles(ctx context.Context, expected []*Controller, dryRun bool) ([]string, error) { log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun) expectedEntries := []string{} @@ -189,7 +189,7 @@ func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*Controller, return toDelete, nil } -func (s *DataStorage) iterFiles(ctx context.Context, iter func(ctx context.Context, path string, entry fs.FileInfo) error) error { +func (s *fileStorage) iterFiles(ctx context.Context, iter func(ctx context.Context, path string, entry fs.FileInfo) error) error { return filepath.Walk(s.baseDir, func(path string, info fs.FileInfo, err error) error { if err != nil { @@ -207,7 +207,7 @@ func (s *DataStorage) iterFiles(ctx context.Context, iter func(ctx context.Conte }) } -func (s *DataStorage) Dedupe(ctx context.Context) (uint64, error) { +func (s *fileStorage) Dedupe(ctx context.Context) (uint64, error) { ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe")) defer span.End() @@ -290,7 +290,7 @@ func applyErr[E, O any](in []E, apply func(E) (O, error)) ([]O, error) { // const blockSize uint64 = 4096 -func (s *DataStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) { +func (s *fileStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) { ctx, span := tracer.Start(ctx, fmt.Sprintf("dedupeFiles"), trace.WithAttributes( attribute.StringSlice("files", paths), ))