no cache archive reader

This commit is contained in:
royalcat 2024-04-17 11:36:14 +03:00
parent bcda69daad
commit 5591f145a9
16 changed files with 579 additions and 272 deletions
src

View file

@ -4,8 +4,8 @@ import (
"context"
"fmt"
"path"
"slices"
"time"
"strings"
"sync"
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/config"
@ -14,29 +14,51 @@ import (
"github.com/royalcat/kv"
)
const lifetime = time.Hour * 24
type handle []string
const sep = "\",\""
func (p handle) String() string {
return strings.Join(p, sep)
}
// MarshalBinary implements kv.Binary.
func (p handle) MarshalBinary() (data []byte, err error) {
return []byte(strings.Join(p, sep)), nil
}
// UnmarshalBinary implements kv.Binary.
func (p *handle) UnmarshalBinary(data []byte) error {
path := strings.Split(string(data), sep)
*p = path
return nil
}
var _ kv.Binary = (*handle)(nil)
func bytesToPath(path []string) string {
return strings.Join(path, sep)
}
// NewKvHandler provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem) (nfs.Handler, error) {
activeHandles, err := kv.NewBadgerKVMarhsler[uuid.UUID, []string](path.Join(config.Config.Mounts.NFS.CachePath, "handlers"))
activeHandles, err := kv.NewBadgerKVMarhsler[uuid.UUID, handle](path.Join(config.Config.Mounts.NFS.CachePath, "handlers"))
if err != nil {
return nil, err
}
// if s, ok := activeHandles.(kv.BadgerStore); ok {
// db := s.BadgerDB()
// enable with managed database
// go func() {
// for n := range time.NewTimer(lifetime / 2).C {
// db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
// }
// }()
// }
reverseCache := map[string]uuid.UUID{}
activeHandles.Range(context.Background(), func(k uuid.UUID, v handle) bool {
reverseCache[v.String()] = k
return true
})
return &CachingHandler{
Handler: h,
fs: fs,
activeHandles: activeHandles,
reverseCache: reverseCache,
}, nil
}
@ -44,32 +66,36 @@ func NewKvHandler(h nfs.Handler, fs nfs.Filesystem) (nfs.Handler, error) {
type CachingHandler struct {
nfs.Handler
fs nfs.Filesystem
activeHandles kv.Store[uuid.UUID, []string]
mu sync.RWMutex
fs nfs.Filesystem
activeHandles kv.Store[uuid.UUID, handle]
reverseCache map[string]uuid.UUID
}
// ToHandle takes a file and represents it with an opaque handle to reference it.
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
// but we can generalize with a stateful local cache of handed out IDs.
func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte {
ctx := context.Background()
var id uuid.UUID
c.activeHandles.Range(ctx, func(k uuid.UUID, v []string) bool {
if slices.Equal(path, v) {
id = k
return false
}
return true
})
cacheKey := handle(path).String()
if cacheId, ok := c.reverseCache[cacheKey]; ok {
id = cacheId
}
if id != uuid.Nil {
return id[:]
}
id = uuid.New()
c.mu.Lock()
defer c.mu.Unlock()
id = uuid.New()
c.reverseCache[cacheKey] = id
c.activeHandles.Set(ctx, id, path)
return id[:]
@ -77,11 +103,14 @@ func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte {
// FromHandle converts from an opaque handle to the file it represents
func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error) {
c.mu.Lock()
defer c.mu.Unlock()
ctx := context.Background()
id, err := uuid.FromBytes(fh)
if err != nil {
return nil, []string{}, err
return nil, nil, err
}
paths, found, err := c.activeHandles.Get(ctx, id)
@ -93,7 +122,7 @@ func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error)
return c.fs, paths, nil
}
return nil, []string{}, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
return nil, nil, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) error {
@ -103,8 +132,7 @@ func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) erro
if err != nil {
return err
}
c.activeHandles.Delete(ctx, id)
return nil
return c.activeHandles.Delete(ctx, id)
}
const maxInt = int(^uint(0) >> 1)
@ -114,14 +142,14 @@ func (c *CachingHandler) HandleLimit() int {
return maxInt
}
func hasPrefix(path, prefix []string) bool {
if len(prefix) > len(path) {
return false
}
for i, e := range prefix {
if path[i] != e {
return false
}
}
return true
}
// func hasPrefix(path, prefix []string) bool {
// if len(prefix) > len(path) {
// return false
// }
// for i, e := range prefix {
// if path[i] != e {
// return false
// }
// }
// return true
// }

View file

@ -12,7 +12,6 @@ import (
"path/filepath"
"slices"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
@ -223,7 +222,7 @@ func (s *DataStorage) Dedupe(ctx context.Context) (uint64, error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe"))
defer span.End()
log := rlog.FunctionLog(s.log, "Dedupe")
log := s.log
sizeMap := map[int64][]string{}
err := s.iterFiles(ctx, func(ctx context.Context, path string, info fs.FileInfo) error {
@ -314,7 +313,7 @@ func (s *DataStorage) dedupeFiles(ctx context.Context, paths []string) (deduped
span.End()
}()
log := rlog.FunctionLog(s.log, "dedupeFiles")
log := s.log
srcF, err := os.Open(paths[0])
if err != nil {

View file

@ -56,7 +56,7 @@ type Service struct {
dirsAquire kv.Store[string, DirAquire]
log *slog.Logger
log *rlog.Logger
}
func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
@ -68,7 +68,7 @@ func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
}
s := &Service{
log: slog.With("component", "torrent-service"),
log: rlog.Component("torrent-service"),
c: c,
DefaultPriority: types.PiecePriorityNone,
excludedFiles: excludedFiles,
@ -86,7 +86,7 @@ func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
ctx := context.Background()
err := s.loadTorrentFiles(ctx)
if err != nil {
s.log.Error("initial torrent load failed", "error", err)
s.log.Error(ctx, "initial torrent load failed", rlog.Error(err))
}
close(s.torrentLoaded)
}()
@ -105,8 +105,7 @@ func (s *Service) Close() error {
func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) {
ctx, span := tracer.Start(ctx, "LoadTorrent")
defer span.End()
log := rlog.FunctionLog(s.log, "LoadTorrent")
log := s.log
defer f.Close(ctx)
@ -126,7 +125,7 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent
if !ok {
span.AddEvent("torrent not found, loading from file")
log.InfoContext(ctx, "torrent not found, loading from file")
log.Info(ctx, "torrent not found, loading from file")
spec, err := torrent.TorrentSpecFromMetaInfoErr(mi)
if err != nil {
@ -135,12 +134,12 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent
infoBytes := spec.InfoBytes
if !isValidInfoHashBytes(infoBytes) {
log.WarnContext(ctx, "info loaded from spec not valid")
log.Warn(ctx, "info loaded from spec not valid")
infoBytes = nil
}
if len(infoBytes) == 0 {
log.InfoContext(ctx, "no info loaded from file, try to load from cache")
log.Info(ctx, "no info loaded from file, try to load from cache")
infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash)
if err != nil && err != store.ErrNotFound {
return nil, fmt.Errorf("get info bytes from database: %w", err)
@ -164,7 +163,10 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent
case <-t.GotInfo():
err := s.infoBytes.Set(t.InfoHash(), t.Metainfo())
if err != nil {
s.log.Error("error setting info bytes for torrent %s: %s", t.Name(), err.Error())
log.Error(ctx, "error setting info bytes for torrent",
slog.String("torrent-name", t.Name()),
rlog.Error(err),
)
}
}
span.AddEvent("got info")
@ -190,7 +192,10 @@ func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent
}
func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, info metainfo.Info) (compatable bool, tryLater bool, err error) {
log := s.log.With("new-name", info.BestName(), "new-infohash", ih.String())
log := s.log.With(
slog.String("new-name", info.BestName()),
slog.String("new-infohash", ih.String()),
)
name := info.BestName()
@ -207,12 +212,12 @@ func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, inf
return false, false, err
}
log.Debug("acquiring was not found, so created")
log.Debug(ctx, "acquiring was not found, so created")
return true, false, nil
}
if slices.Contains(aq.Hashes, ih) {
log.Debug("hash already know to be compatable")
log.Debug(ctx, "hash already know to be compatable")
return true, false, nil
}
@ -226,30 +231,30 @@ func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, inf
existingFiles := slices.Clone(existingInfo.Files)
newFiles := slices.Clone(info.Files)
if !s.checkTorrentFilesCompatable(aq, existingFiles, newFiles) {
if !s.checkTorrentFilesCompatable(ctx, aq, existingFiles, newFiles) {
return false, false, nil
}
aq.Hashes = slicesUnique(append(aq.Hashes, ih))
err = s.dirsAquire.Set(ctx, aq.Name, aq)
if err != nil {
log.Warn("torrent not compatible")
log.Warn(ctx, "torrent not compatible")
return false, false, err
}
}
if slices.Contains(aq.Hashes, ih) {
log.Debug("hash is compatable")
log.Debug(ctx, "hash is compatable")
return true, false, nil
}
log.Debug("torrent with same name not found, try later")
log.Debug(ctx, "torrent with same name not found, try later")
return false, true, nil
}
func (s *Service) checkTorrentFilesCompatable(aq DirAquire, existingFiles, newFiles []metainfo.FileInfo) bool {
log := s.log.With("name", aq.Name)
func (s *Service) checkTorrentFilesCompatable(ctx context.Context, aq DirAquire, existingFiles, newFiles []metainfo.FileInfo) bool {
log := s.log.With(slog.String("name", aq.Name))
pathCmp := func(a, b metainfo.FileInfo) int {
return slices.Compare(a.BestPath(), b.BestPath())
@ -286,7 +291,11 @@ func (s *Service) checkTorrentFilesCompatable(aq DirAquire, existingFiles, newFi
for _, e := range existingFiles {
if e.Path == n.Path && e.Length != n.Length {
log.Warn("torrents not compatible, has files with different length", "path", n.Path, "existing-length", e.Length, "new-length", e.Length)
log.Warn(ctx, "torrents not compatible, has files with different length",
slog.String("path", n.Path),
slog.Int64("existing-length", e.Length),
slog.Int64("new-length", e.Length),
)
return false
}
}
@ -343,8 +352,7 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
attribute.Int("workers", loadWorkers),
))
defer span.End()
log := rlog.FunctionLog(s.log, "loadTorrentFiles")
log := s.log
loaderPaths := make(chan string)
wg := sync.WaitGroup{}
@ -359,14 +367,14 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
for path := range loaderPaths {
file, err := vfs.NewLazyOsFile(path)
if err != nil {
log.Error("error opening torrent file", "filename", path, rlog.Err(err))
log.Error(ctx, "error opening torrent file", slog.String("filename", path), rlog.Error(err))
continue
}
defer file.Close(ctx)
_, err = s.LoadTorrent(ctx, file)
if err != nil {
s.log.Error("failed adding torrent", "error", err)
log.Error(ctx, "failed adding torrent", rlog.Error(err))
}
}
wg.Done()

View file

@ -168,27 +168,34 @@ func (a *ArchiveFS) Type() fs.FileMode {
var _ File = (*archiveFile)(nil)
func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archiveFile {
readerat := newReaderAtNoCache(func(ctx context.Context) (ctxio.ReadCloser, error) {
rc, err := af(ctx)
if err != nil {
return nil, err
}
return ctxio.WrapIoReadCloser(rc), nil
})
return &archiveFile{
name: name,
size: size,
af: af,
buffer: ctxio.NewFileBuffer(nil),
reader: ctxio.NewReaderReaderAtWrapper(readerat),
}
}
const readahead = 1024 * 16
type archiveFile struct {
name string
size int64
af archiveFileReaderFactory
m sync.Mutex
offset int64
readen int64
buffer *ctxio.FileBuffer
reader interface {
ctxio.Reader
ctxio.ReaderAt
ctxio.Closer
}
}
// Name implements File.
@ -214,55 +221,15 @@ func (d *archiveFile) IsDir() bool {
}
func (d *archiveFile) Close(ctx context.Context) error {
return d.buffer.Close(ctx)
}
func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
d.m.Lock()
defer d.m.Unlock()
if to < d.readen {
return nil
}
reader, err := d.af(ctx)
if err != nil {
return fmt.Errorf("failed to get file reader: %w", err)
}
_, err = d.buffer.Seek(0, io.SeekStart)
if err != nil {
return fmt.Errorf("failed to seek to start of the file: %w", err)
}
d.readen, err = ctxio.CopyN(ctx, d.buffer, ctxio.WrapIoReader(reader), to+readahead)
if err != nil && err != io.EOF {
return fmt.Errorf("error copying from archive file reader: %w", err)
}
return nil
return d.reader.Close(ctx)
}
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
err = d.loadMore(ctx, d.offset+int64(len(p)))
if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
}
n, err = d.buffer.Read(ctx, p)
if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err)
}
return n, nil
return d.reader.Read(ctx, p)
}
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
err = d.loadMore(ctx, off+int64(len(p)))
if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
}
n, err = d.buffer.ReadAt(ctx, p, off)
if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err)
}
return n, nil
return d.reader.ReadAt(ctx, p, off)
}
type archiveFileReaderFactory func(ctx context.Context) (io.ReadCloser, error)
@ -287,7 +254,6 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
i := i
af := func(ctx context.Context) (io.ReadCloser, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := zip.NewReader(reader, size)
if err != nil {
return nil, err
@ -349,7 +315,7 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
var _ archiveLoader = RarLoader
func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
reader := ctxio.IoReadSeekerWrapper(ctx, ctxreader, size)
reader := ctxio.WrapIoReadSeeker(ctx, ctxreader, size)
r, err := rardecode.NewReader(reader)
if err != nil {
@ -368,7 +334,7 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
name := header.Name
af := func(ctx context.Context) (io.ReadCloser, error) {
reader := ctxio.IoReadSeekerWrapper(ctx, ctxreader, size)
reader := ctxio.WrapIoReadSeeker(ctx, ctxreader, size)
r, err := rardecode.NewReader(reader)
if err != nil {
return nil, err
@ -390,3 +356,76 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
return out, nil
}
func newReaderAtNoCache(newReader func(ctx context.Context) (ctxio.ReadCloser, error)) *readerAtNoCache {
return &readerAtNoCache{
newReader: newReader,
}
}
type readerAtNoCache struct {
newReader func(ctx context.Context) (ctxio.ReadCloser, error)
mu sync.Mutex
readerCtx context.Context
r ctxio.ReadCloser
nread int
isClosed bool
}
func (f *readerAtNoCache) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.isClosed {
return 0, fs.ErrClosed
}
if f.r == nil || int64(f.nread) > off || f.readerCtx.Err() == context.DeadlineExceeded || f.readerCtx.Err() == context.Canceled {
err = f.recreateReader(ctx)
if err != nil {
return 0, err
}
}
if f.nread != int(off) {
_, err = ctxio.CopyN(ctx, ctxio.Discard, f.r, off-int64(f.nread))
if err != nil {
return 0, err
}
}
n, err = f.r.Read(ctx, p)
f.nread += n
if err != nil {
return n, err
}
return n, err
}
func (f *readerAtNoCache) recreateReader(ctx context.Context) (err error) {
if f.r != nil {
err = f.r.Close(ctx)
if err != nil {
return err
}
}
f.r, err = f.newReader(ctx)
if err != nil {
return err
}
f.readerCtx = ctx
f.nread = 0
return nil
}
func (f *readerAtNoCache) Close(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()
f.isClosed = true
return f.r.Close(ctx)
}

View file

@ -15,7 +15,7 @@ import (
type LogFS struct {
fs Filesystem
log *slog.Logger
log *rlog.Logger
timeout time.Duration
readTimeout time.Duration
@ -30,7 +30,7 @@ var _ Filesystem = (*LogFS)(nil)
func WrapLogFS(vfs Filesystem) *LogFS {
return &LogFS{
fs: vfs,
log: rlog.ComponentLog("fs"),
log: rlog.Component("logfs"),
timeout: time.Minute * 3,
readTimeout: time.Minute,
}
@ -102,7 +102,7 @@ func (fs *LogFS) Open(ctx context.Context, filename string) (file File, err erro
file, err = fs.fs.Open(ctx, filename)
if isLoggableError(err) {
fs.log.With("filename", filename).Error("Failed to open file")
fs.log.Error(ctx, "Failed to open file")
}
file = WrapLogFile(file, filename, fs.log, fs.readTimeout)
return file, err
@ -113,7 +113,10 @@ func (fs *LogFS) ReadDir(ctx context.Context, path string) (entries []fs.DirEntr
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "ReadDir",
fs.traceAttrs(attribute.String("path", path)),
fs.traceAttrs(
attribute.String("path", path),
attribute.String("fs-type", reflect.TypeOf(fs.fs).Name()),
),
)
defer func() {
if err != nil {
@ -124,7 +127,7 @@ func (fs *LogFS) ReadDir(ctx context.Context, path string) (entries []fs.DirEntr
entries, err = fs.fs.ReadDir(ctx, path)
if isLoggableError(err) {
fs.log.ErrorContext(ctx, "Failed to read dir", "path", path, "error", err.Error(), "fs-type", reflect.TypeOf(fs.fs).Name())
fs.log.Error(ctx, "Failed to read dir", rlog.Error(err))
}
return entries, err
}
@ -145,7 +148,7 @@ func (lfs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo,
info, err = lfs.fs.Stat(ctx, filename)
if isLoggableError(err) {
lfs.log.Error("Failed to stat", "filename", filename, "error", err)
lfs.log.Error(ctx, "Failed to stat", rlog.Error(err))
}
return info, err
}
@ -166,7 +169,7 @@ func (fs *LogFS) Unlink(ctx context.Context, filename string) (err error) {
err = fs.fs.Unlink(ctx, filename)
if isLoggableError(err) {
fs.log.Error("Failed to stat", "filename", filename, "error", err)
fs.log.Error(ctx, "Failed to stat", rlog.Error(err))
}
return err
}
@ -175,7 +178,7 @@ type LogFile struct {
filename string
f File
log *slog.Logger
log *rlog.Logger
timeout time.Duration
}
@ -191,11 +194,11 @@ func (f *LogFile) Type() fs.FileMode {
var _ File = (*LogFile)(nil)
func WrapLogFile(f File, filename string, log *slog.Logger, timeout time.Duration) *LogFile {
func WrapLogFile(f File, filename string, log *rlog.Logger, timeout time.Duration) *LogFile {
return &LogFile{
filename: filename,
f: f,
log: log.With("filename", filename),
log: log.With(slog.String("filename", filename)),
timeout: timeout,
}
}
@ -216,7 +219,7 @@ func (f *LogFile) Close(ctx context.Context) (err error) {
err = f.f.Close(ctx)
if isLoggableError(err) {
f.log.ErrorContext(ctx, "Failed to close", "error", err)
f.log.Error(ctx, "Failed to close", rlog.Error(err))
}
return err
}
@ -246,7 +249,7 @@ func (f *LogFile) Read(ctx context.Context, p []byte) (n int, err error) {
n, err = f.f.Read(ctx, p)
if isLoggableError(err) {
f.log.Error("Failed to read", "error", err)
f.log.Error(ctx, "Failed to read", rlog.Error(err))
}
return n, err
}
@ -259,6 +262,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e
trace.WithAttributes(
attribute.String("filename", f.filename),
attribute.Int("length", len(p)),
attribute.Int64("offset", off),
),
)
defer func() {
@ -271,7 +275,7 @@ func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err e
n, err = f.f.ReadAt(ctx, p, off)
if isLoggableError(err) {
f.log.Error("Failed to read", "offset", off, "error", err)
f.log.Error(ctx, "Failed to read")
}
return n, err
}
@ -285,7 +289,7 @@ func (f *LogFile) Size() int64 {
func (f *LogFile) Info() (fs.FileInfo, error) {
info, err := f.f.Info()
if isLoggableError(err) {
f.log.Error("Failed to info", "error", err)
f.log.Error(context.Background(), "Failed to info", rlog.Error(err))
}
return info, err
}

View file

@ -23,14 +23,14 @@ type ResolverFS struct {
rootFS Filesystem
resolver *resolver
log *slog.Logger
log *rlog.Logger
}
func NewResolveFS(rootFs Filesystem, factories map[string]FsFactory) *ResolverFS {
return &ResolverFS{
rootFS: rootFs,
resolver: newResolver(factories),
log: rlog.ComponentLog("fs/resolverfs"),
log: rlog.Component("fs.resolverfs"),
}
}
@ -126,7 +126,9 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
nestedfs, err := r.resolver.nestedFs(factoryCtx, filepath, file)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.log.ErrorContext(ctx, "creating fs timed out", "filename", e.Name())
r.log.Error(ctx, "creating fs timed out",
slog.String("filename", e.Name()),
)
return nil
}

View file

@ -15,8 +15,8 @@ import (
otelpyroscope "github.com/grafana/otel-profiling-go"
"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
@ -24,7 +24,7 @@ import (
)
type Client struct {
log *slog.Logger
log *rlog.Logger
tracerProvider *trace.TracerProvider
metricProvider *metric.MeterProvider
@ -32,23 +32,22 @@ type Client struct {
}
func (client *Client) Shutdown(ctx context.Context) {
log := rlog.FunctionLog(client.log, "Shutdown")
if client.metricProvider == nil {
err := client.metricProvider.Shutdown(ctx)
if err != nil {
log.Error("error shutting down metric provider", rlog.Err(err))
client.log.Error(ctx, "error shutting down metric provider", rlog.Error(err))
}
}
if client.tracerProvider == nil {
err := client.tracerProvider.Shutdown(ctx)
if err != nil {
log.Error("error shutting down tracer provider", rlog.Err(err))
client.log.Error(ctx, "error shutting down tracer provider", rlog.Error(err))
}
}
if client.loggerProvider == nil {
err := client.loggerProvider.Shutdown(ctx)
if err != nil {
log.Error("error shutting down logger provider", rlog.Err(err))
client.log.Error(ctx, "error shutting down logger provider", rlog.Error(err))
}
}
}
@ -56,13 +55,13 @@ func (client *Client) Shutdown(ctx context.Context) {
const appName = "tstor"
func Setup(ctx context.Context, endpoint string) (*Client, error) {
log := rlog.ComponentLog("telemetry")
log := rlog.Component("telemetry")
client := &Client{
log: log,
}
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(cause error) {
log.Error("otel error", rlog.Err(cause))
log.Error(context.Background(), "otel error", rlog.Error(cause))
}))
hostName, _ := os.Hostname()
@ -79,16 +78,22 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
return nil, err
}
metricExporter, err := prometheus.New(prometheus.WithNamespace(appName))
meticExporter, err := otlpmetrichttp.New(ctx,
otlpmetrichttp.WithEndpoint(endpoint),
otlpmetrichttp.WithRetry(otlpmetrichttp.RetryConfig{
Enabled: false,
}),
)
if err != nil {
return nil, err
}
client.metricProvider = metric.NewMeterProvider(
metric.WithReader(metricExporter),
metric.WithReader(metric.NewPeriodicReader(meticExporter)),
metric.WithResource(r),
)
otel.SetMeterProvider(client.metricProvider)
log.Info("prometheus metrics provider initialized")
log.Info(ctx, "prometheus metrics provider initialized")
traceExporter, err := otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint(endpoint),
@ -105,7 +110,7 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
)
// otel.SetTracerProvider(client.tracerProvider)
otel.SetTracerProvider(otelpyroscope.NewTracerProvider(client.tracerProvider))
log.Info("otel tracing provider initialized")
log.Info(ctx, "otel tracing provider initialized")
logExporter, err := otlplogs.NewExporter(ctx,
otlplogs.WithClient(
@ -125,7 +130,7 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
Level: slog.LevelDebug,
}),
)
client.log = slog.Default()
client.log = rlog.Component("telemetry-client")
runtime.SetMutexProfileFraction(5)
runtime.SetBlockProfileRate(5)
@ -135,7 +140,7 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
ServerAddress: "https://pyroscope.kmsign.ru",
// you can disable logging by setting this to nil
Logger: &pyroscopeLogger{
log: rlog.ComponentLog("metrics.pyroscope"),
log: client.log.WithComponent("pyroscope"),
},
ProfileTypes: []pyroscope.ProfileType{
// these profile types are enabled by default:
@ -160,22 +165,31 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
}
type pyroscopeLogger struct {
log *slog.Logger
log *rlog.Logger
}
var _ pyroscope.Logger = (*pyroscopeLogger)(nil)
// Debugf implements pyroscope.Logger.
func (p *pyroscopeLogger) Debugf(msg string, args ...any) {
p.log.Debug(fmt.Sprintf(msg, args...))
ctx := context.Background()
p.log.Debug(ctx, fmt.Sprintf(msg, args...))
}
// Errorf implements pyroscope.Logger.
func (p *pyroscopeLogger) Errorf(msg string, args ...any) {
p.log.Error(fmt.Sprintf(msg, args...))
ctx := context.Background()
p.log.Error(ctx, fmt.Sprintf(msg, args...))
}
// Infof implements pyroscope.Logger.
func (p *pyroscopeLogger) Infof(msg string, args ...any) {
p.log.Info(fmt.Sprintf(msg, args...))
ctx := context.Background()
p.log.Info(ctx, fmt.Sprintf(msg, args...))
}
func functionName() string {
var pcs [1]uintptr
runtime.Callers(1, pcs[:])
return runtime.FuncForPC(pcs[0]).Name()
}