This commit is contained in:
royalcat 2024-05-13 19:56:20 +03:00
parent 0d7aac068c
commit 974814c281
20 changed files with 1532 additions and 716 deletions
src
export/nfs
host
telemetry

View file

@ -9,6 +9,8 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/config"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"github.com/google/uuid"
"github.com/royalcat/kv"
@ -40,6 +42,8 @@ func bytesToPath(path []string) string {
return strings.Join(path, sep)
}
var kvhandlerMeter = otel.Meter("git.kmsign.ru/royalcat/tstor/src/export/nfs.kvhandler")
// NewKvHandler provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem) (nfs.Handler, error) {
activeHandles, err := kv.NewBadgerKVMarhsler[uuid.UUID, handle](path.Join(config.Config.Mounts.NFS.CachePath, "handlers"))
@ -54,12 +58,24 @@ func NewKvHandler(h nfs.Handler, fs nfs.Filesystem) (nfs.Handler, error) {
return true
})
return &CachingHandler{
c := &CachingHandler{
Handler: h,
fs: fs,
activeHandles: activeHandles,
reverseCache: reverseCache,
}, nil
}
_, err = kvhandlerMeter.Int64ObservableGauge("nfs.activehandles",
metric.WithInt64Callback(func(ctx context.Context, io metric.Int64Observer) error {
io.Observe(int64(c.ActiveHandlers()))
return nil
}),
)
if err != nil {
return nil, err
}
return c, nil
}
// CachingHandler implements to/from handle via an LRU cache.
@ -135,11 +151,20 @@ func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) erro
return c.activeHandles.Delete(ctx, id)
}
const maxInt = int(^uint(0) >> 1)
// const maxInt = int(^uint(0) >> 1)
const maxHandlers = 8129
// HandleLimit exports how many file handles can be safely stored by this cache.
func (c *CachingHandler) HandleLimit() int {
return maxInt
return maxHandlers
}
// HandleLimit exports how many file handles can be safely stored by this cache.
func (c *CachingHandler) ActiveHandlers() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.reverseCache)
}
// func hasPrefix(path, prefix []string) bool {

View file

@ -0,0 +1,47 @@
package controller
import (
"context"
"github.com/lrstanley/go-ytdlp"
)
type SourcedDirSource string
const (
SourcedDirYtDlp SourcedDirSource = "yt-dlp-playlist"
)
type VirtDirSource interface {
Source() SourcedDirSource
}
var _ VirtDirSource = (*SourcedDirYtDlpPlaylist)(nil)
type SourcedDirYtDlpPlaylist struct {
URL string `json:"url"`
}
func (SourcedDirYtDlpPlaylist) Source() SourcedDirSource {
return SourcedDirYtDlp
}
type SDController struct {
sources []VirtDirSource
}
func (sd *SourcedDirYtDlpPlaylist) Update(ctx context.Context) error {
_, err := ytdlp.Install(ctx, nil)
if err != nil {
return err
}
dl := ytdlp.New().PrintJSON()
_, err = dl.Run(ctx, sd.URL)
if err != nil {
return err
}
return nil
}

View file

@ -3,6 +3,7 @@ package service
import (
"bufio"
"context"
"errors"
"fmt"
"log/slog"
"os"
@ -22,7 +23,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"golang.org/x/exp/maps"
"github.com/anacrolix/torrent"
@ -97,9 +97,11 @@ func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs
func (s *Service) Close() error {
err := multierr.Combine(s.c.Close()...)
err = multierr.Append(err, s.Storage.Close())
return err
return errors.Join(append(
s.c.Close(),
s.Storage.Close(),
)...)
}
func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) {

View file

@ -2,7 +2,9 @@ package store
import (
"log/slog"
"time"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/bep44"
tlog "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
@ -54,11 +56,11 @@ func NewClient(st storage.ClientImpl, fis bep44.Store, cfg *config.TorrentClient
// l.Debug("peer closed", "ip", c.RemoteAddr.String())
// })
// torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
// cfg.Store = fis
// cfg.Exp = 2 * time.Hour
// cfg.NoSecurity = false
// }
torrentCfg.ConfigureAnacrolixDhtServer = func(cfg *dht.ServerConfig) {
cfg.Store = fis
cfg.Exp = 2 * time.Hour
cfg.NoSecurity = false
}
return torrent.NewClient(torrentCfg)
}

View file

@ -168,34 +168,27 @@ func (a *ArchiveFS) Type() fs.FileMode {
var _ File = (*archiveFile)(nil)
func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archiveFile {
readerat := newReaderAtNoCache(func(ctx context.Context) (ctxio.ReadCloser, error) {
rc, err := af(ctx)
if err != nil {
return nil, err
}
return ctxio.WrapIoReadCloser(rc), nil
})
return &archiveFile{
name: name,
size: size,
af: af,
reader: ctxio.NewReaderReaderAtWrapper(readerat),
buffer: ctxio.NewFileBuffer(nil),
}
}
const readahead = 1024 * 16
type archiveFile struct {
name string
size int64
af archiveFileReaderFactory
reader interface {
ctxio.Reader
ctxio.ReaderAt
ctxio.Closer
}
m sync.Mutex
offset int64
readen int64
buffer *ctxio.FileBuffer
}
// Name implements File.
@ -221,15 +214,56 @@ func (d *archiveFile) IsDir() bool {
}
func (d *archiveFile) Close(ctx context.Context) error {
return d.reader.Close(ctx)
return d.buffer.Close(ctx)
}
func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
d.m.Lock()
defer d.m.Unlock()
if to < d.readen {
return nil
}
reader, err := d.af(ctx)
if err != nil {
return fmt.Errorf("failed to get file reader: %w", err)
}
defer reader.Close()
_, err = d.buffer.Seek(0, io.SeekStart)
if err != nil {
return fmt.Errorf("failed to seek to start of the file: %w", err)
}
d.readen, err = ctxio.CopyN(ctx, d.buffer, ctxio.WrapIoReader(reader), to+readahead)
if err != nil && err != io.EOF {
return fmt.Errorf("error copying from archive file reader: %w", err)
}
return nil
}
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
return d.reader.Read(ctx, p)
err = d.loadMore(ctx, d.offset+int64(len(p)))
if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
}
n, err = d.buffer.Read(ctx, p)
if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err)
}
return n, nil
}
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return d.reader.ReadAt(ctx, p, off)
err = d.loadMore(ctx, off+int64(len(p)))
if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
}
n, err = d.buffer.ReadAt(ctx, p, off)
if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err)
}
return n, nil
}
type archiveFileReaderFactory func(ctx context.Context) (io.ReadCloser, error)
@ -254,6 +288,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
i := i
af := func(ctx context.Context) (io.ReadCloser, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := zip.NewReader(reader, size)
if err != nil {
return nil, err
@ -276,7 +311,7 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
var _ archiveLoader = SevenZipLoader
func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader)
reader := ctxio.IoReaderAt(context.Background(), ctxreader)
r, err := sevenzip.NewReader(reader, size)
if err != nil {
@ -356,76 +391,3 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
return out, nil
}
func newReaderAtNoCache(newReader func(ctx context.Context) (ctxio.ReadCloser, error)) *readerAtNoCache {
return &readerAtNoCache{
newReader: newReader,
}
}
type readerAtNoCache struct {
newReader func(ctx context.Context) (ctxio.ReadCloser, error)
mu sync.Mutex
readerCtx context.Context
r ctxio.ReadCloser
nread int
isClosed bool
}
func (f *readerAtNoCache) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.isClosed {
return 0, fs.ErrClosed
}
if f.r == nil || int64(f.nread) > off || f.readerCtx.Err() == context.DeadlineExceeded || f.readerCtx.Err() == context.Canceled {
err = f.recreateReader(ctx)
if err != nil {
return 0, err
}
}
if f.nread != int(off) {
_, err = ctxio.CopyN(ctx, ctxio.Discard, f.r, off-int64(f.nread))
if err != nil {
return 0, err
}
}
n, err = f.r.Read(ctx, p)
f.nread += n
if err != nil {
return n, err
}
return n, err
}
func (f *readerAtNoCache) recreateReader(ctx context.Context) (err error) {
if f.r != nil {
err = f.r.Close(ctx)
if err != nil {
return err
}
}
f.r, err = f.newReader(ctx)
if err != nil {
return err
}
f.readerCtx = ctx
f.nread = 0
return nil
}
func (f *readerAtNoCache) Close(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()
f.isClosed = true
return f.r.Close(ctx)
}

3
src/host/vfs/sourced.go Normal file
View file

@ -0,0 +1,3 @@
package vfs
const sorcedDirExt = ".tsvd"

View file

@ -1,21 +0,0 @@
package virtdir
type SourceType string
const (
VirtDirYtDlp SourceType = "yt-dlp"
)
type VirtDirSource interface {
SourceType() SourceType
}
var _ VirtDirSource = (*VirtDirSourceYtDlp)(nil)
type VirtDirSourceYtDlp struct {
URL string `json:"url"`
}
func (VirtDirSourceYtDlp) SourceType() SourceType {
return VirtDirYtDlp
}

View file

@ -10,12 +10,12 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/agoda-com/opentelemetry-go/otelslog"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogsgrpc"
logsdk "github.com/agoda-com/opentelemetry-logs-go/sdk/logs"
otelpyroscope "github.com/grafana/otel-profiling-go"
"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
@ -55,13 +55,11 @@ func (client *Client) Shutdown(ctx context.Context) {
const appName = "tstor"
func Setup(ctx context.Context, endpoint string) (*Client, error) {
log := rlog.Component("telemetry")
client := &Client{
log: log,
log: rlog.Component("telemetry"),
}
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(cause error) {
log.Error(context.Background(), "otel error", rlog.Error(cause))
client.log.Error(context.Background(), "otel error", rlog.Error(cause))
}))
hostName, _ := os.Hostname()
@ -78,9 +76,9 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
return nil, err
}
meticExporter, err := otlpmetrichttp.New(ctx,
otlpmetrichttp.WithEndpoint(endpoint),
otlpmetrichttp.WithRetry(otlpmetrichttp.RetryConfig{
meticExporter, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint(endpoint),
otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{
Enabled: false,
}),
)
@ -93,7 +91,14 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
metric.WithResource(r),
)
otel.SetMeterProvider(client.metricProvider)
log.Info(ctx, "prometheus metrics provider initialized")
var meter = otel.Meter("git.kmsign.ru/royalcat/tstor/pkg/telemetry")
counter, err := meter.Int64Counter("up")
if err != nil {
return nil, err
}
counter.Add(ctx, 1)
client.log.Info(ctx, "metrics provider initialized")
traceExporter, err := otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint(endpoint),
@ -108,13 +113,14 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
trace.WithBatcher(traceExporter),
trace.WithResource(r),
)
// otel.SetTracerProvider(client.tracerProvider)
otel.SetTracerProvider(otelpyroscope.NewTracerProvider(client.tracerProvider))
log.Info(ctx, "otel tracing provider initialized")
client.log.Info(ctx, "tracing provider initialized")
logExporter, err := otlplogs.NewExporter(ctx,
otlplogs.WithClient(
otlplogshttp.NewClient(otlplogshttp.WithEndpoint(endpoint)),
otlplogsgrpc.NewClient(
otlplogsgrpc.WithEndpoint(endpoint),
),
),
)
if err != nil {
@ -124,13 +130,15 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
logsdk.WithBatcher(logExporter),
logsdk.WithResource(r),
)
rlog.AddHandler(otelslog.NewOtelHandler(client.loggerProvider,
&otelslog.HandlerOptions{
Level: slog.LevelDebug,
}),
)
client.log = rlog.Component("telemetry-client")
client.log.Info(ctx, "logger provider initialized")
// recreate telemetry logger
client.log = rlog.Component("telemetry")
runtime.SetMutexProfileFraction(5)
runtime.SetBlockProfileRate(5)