gql dir ls

This commit is contained in:
royalcat 2024-03-20 00:30:37 +03:00
parent 6a1e338af4
commit e576e62599
23 changed files with 1671 additions and 138 deletions

View file

@ -8,6 +8,8 @@ type Config struct {
Log Log `koanf:"log"`
SourceDir string `koanf:"source_dir"`
OtelHttp string `koanf:"otel_http"`
}
type WebUi struct {

View file

@ -1,4 +1,4 @@
package http
package delivery
import (
"bytes"

File diff suppressed because it is too large Load diff

View file

@ -9,12 +9,25 @@ import (
"github.com/anacrolix/torrent"
)
type DirEntry interface {
IsDirEntry()
GetName() string
}
type Progress interface {
IsProgress()
GetCurrent() int64
GetTotal() int64
}
type ArchiveFs struct {
Name string `json:"name"`
Size int64 `json:"size"`
}
func (ArchiveFs) IsDirEntry() {}
func (this ArchiveFs) GetName() string { return this.Name }
type BooleanFilter struct {
Eq *bool `json:"eq,omitempty"`
}
@ -27,10 +40,25 @@ type DateTimeFilter struct {
Lte *time.Time `json:"lte,omitempty"`
}
type Dir struct {
Name string `json:"name"`
}
func (Dir) IsDirEntry() {}
func (this Dir) GetName() string { return this.Name }
type DownloadTorrentResponse struct {
Task *Task `json:"task,omitempty"`
}
type File struct {
Name string `json:"name"`
Size int64 `json:"size"`
}
func (File) IsDirEntry() {}
func (this File) GetName() string { return this.Name }
type IntFilter struct {
Eq *int64 `json:"eq,omitempty"`
Gt *int64 `json:"gt,omitempty"`
@ -51,6 +79,13 @@ type Pagination struct {
type Query struct {
}
type ResolverFs struct {
Name string `json:"name"`
}
func (ResolverFs) IsDirEntry() {}
func (this ResolverFs) GetName() string { return this.Name }
type Schema struct {
Query *Query `json:"query,omitempty"`
Mutation *Mutation `json:"mutation,omitempty"`
@ -81,6 +116,14 @@ type Torrent struct {
T *controller.Torrent `json:"-"`
}
type TorrentFs struct {
Name string `json:"name"`
Torrent *Torrent `json:"torrent"`
}
func (TorrentFs) IsDirEntry() {}
func (this TorrentFs) GetName() string { return this.Name }
type TorrentFile struct {
Filename string `json:"filename"`
Size int64 `json:"size"`

View file

@ -9,6 +9,7 @@ import (
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
)
// Torrents is the resolver for the torrents field.
@ -63,6 +64,42 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt
return tr, nil
}
// FsListDir is the resolver for the fsListDir field.
func (r *queryResolver) FsListDir(ctx context.Context, path string) ([]model.DirEntry, error) {
entries, err := r.VFS.ReadDir(path)
if err != nil {
return nil, err
}
out := []model.DirEntry{}
for _, e := range entries {
switch e.(type) {
case *vfs.TorrentFs:
e := e.(*vfs.TorrentFs)
out = append(out, model.TorrentFs{
Name: e.Name(),
Torrent: model.MapTorrent(e.Torrent),
})
default:
if e.IsDir() {
out = append(out, model.Dir{Name: e.Name()})
} else {
info, err := e.Info()
if err != nil {
return nil, err
}
out = append(out, model.File{
Name: e.Name(),
Size: info.Size(),
})
}
}
}
return out, nil
}
// Query returns graph.QueryResolver implementation.
func (r *Resolver) Query() graph.QueryResolver { return &queryResolver{r} }

View file

@ -1,6 +1,9 @@
package resolver
import "git.kmsign.ru/royalcat/tstor/src/host/service"
import (
"git.kmsign.ru/royalcat/tstor/src/host/service"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
)
// This file will not be regenerated automatically.
//
@ -8,4 +11,5 @@ import "git.kmsign.ru/royalcat/tstor/src/host/service"
type Resolver struct {
Service *service.Service
VFS vfs.Filesystem
}

View file

@ -1,4 +1,4 @@
package http
package delivery
import (
"fmt"
@ -7,15 +7,15 @@ import (
"git.kmsign.ru/royalcat/tstor"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/delivery"
"git.kmsign.ru/royalcat/tstor/src/host/service"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/shurcooL/httpfs/html/vfstemplate"
)
func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, logPath string, cfg *config.Config) error {
func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, vfs vfs.Filesystem, logPath string, cfg *config.Config) error {
log := slog.With()
gin.SetMode(gin.ReleaseMode)
@ -40,18 +40,16 @@ func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, logPath str
// r.GET("/routes", routesHandler(ss))
r.GET("/logs", logsHandler)
r.GET("/servers", serversFoldersHandler())
r.Any("/graphql", gin.WrapH(delivery.GraphQLHandler(s)))
r.Any("/graphql", gin.WrapH(GraphQLHandler(s, vfs)))
api := r.Group("/api")
{
api.GET("/log", apiLogHandler(logPath))
api.GET("/status", apiStatusHandler(fc, ss))
// api.GET("/servers", apiServersHandler(tss))
// api.GET("/routes", apiRoutesHandler(ss))
// api.POST("/routes/:route/torrent", apiAddTorrentHandler(s))
// api.DELETE("/routes/:route/torrent/:torrent_hash", apiDelTorrentHandler(s))
}
log.Info("starting webserver", "host", fmt.Sprintf("%s:%d", cfg.WebUi.IP, cfg.WebUi.Port))

View file

@ -1,4 +1,4 @@
package http
package delivery
type RouteAdd struct {
Magnet string `json:"magnet" binding:"required"`

View file

@ -6,23 +6,26 @@ import (
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver"
"git.kmsign.ru/royalcat/tstor/src/host/service"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/handler/extension"
"github.com/99designs/gqlgen/graphql/handler/lru"
"github.com/99designs/gqlgen/graphql/handler/transport"
"github.com/ravilushqa/otelgqlgen"
)
func GraphQLHandler(service *service.Service) http.Handler {
func GraphQLHandler(service *service.Service, vfs vfs.Filesystem) http.Handler {
graphqlHandler := handler.NewDefaultServer(
graph.NewExecutableSchema(
graph.Config{
Resolvers: &resolver.Resolver{Service: service},
Resolvers: &resolver.Resolver{Service: service, VFS: vfs},
Directives: graph.DirectiveRoot{
OneOf: graph.OneOf,
},
},
),
)
graphqlHandler.AddTransport(&transport.POST{})
graphqlHandler.AddTransport(&transport.Websocket{})
graphqlHandler.AddTransport(&transport.SSE{})
@ -30,6 +33,7 @@ func GraphQLHandler(service *service.Service) http.Handler {
graphqlHandler.SetQueryCache(lru.New(1000))
graphqlHandler.Use(extension.Introspection{})
graphqlHandler.Use(extension.AutomaticPersistedQuery{Cache: lru.New(100)})
graphqlHandler.Use(otelgqlgen.Middleware())
return graphqlHandler
}

View file

@ -1,4 +1,4 @@
package http
package delivery
import (
"net/http"

View file

@ -5,24 +5,33 @@ import (
"fmt"
"log/slog"
"os"
"path"
"path/filepath"
"slices"
"strings"
"time"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"git.kmsign.ru/royalcat/tstor/src/host/datastorage"
"git.kmsign.ru/royalcat/tstor/src/host/store"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"go.uber.org/multierr"
"golang.org/x/exp/maps"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types"
"github.com/anacrolix/torrent/types/infohash"
"github.com/royalcat/kv"
)
type DirAquire struct {
Name string
Hashes []infohash.T
}
type Service struct {
c *torrent.Client
excludedFiles *store.FilesMappings
@ -35,11 +44,21 @@ type Service struct {
Storage datastorage.DataStorage
SourceDir string
dirsAquire kv.Store[string, DirAquire]
log *slog.Logger
addTimeout, readTimeout int
}
func NewService(sourceDir string, c *torrent.Client, storage datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes, addTimeout, readTimeout int) *Service {
func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
storage datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes,
addTimeout, readTimeout int,
) (*Service, error) {
dirsAcquire, err := kv.NewBadgerKV[string, DirAquire](path.Join(cfg.MetadataFolder, "dir-acquire"))
if err != nil {
return nil, err
}
s := &Service{
log: slog.With("component", "torrent-service"),
c: c,
@ -49,6 +68,7 @@ func NewService(sourceDir string, c *torrent.Client, storage datastorage.DataSto
Storage: storage,
SourceDir: sourceDir,
torrentLoaded: make(chan struct{}),
dirsAquire: dirsAcquire,
// stats: newStats(), // TODO persistent
addTimeout: addTimeout,
readTimeout: readTimeout,
@ -62,7 +82,7 @@ func NewService(sourceDir string, c *torrent.Client, storage datastorage.DataSto
close(s.torrentLoaded)
}()
return s
return s, nil
}
var _ vfs.FsFactory = (*Service)(nil).NewTorrentFs
@ -109,17 +129,16 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
if err != nil {
infoBytes = nil
} else {
// for _, t := range s.c.Torrents() {
// if t.Name() == info.BestName() && t.InfoHash() != spec.InfoHash {
// <-t.GotInfo()
// if !isTorrentCompatable(*t.Info(), info) {
// return nil, fmt.Errorf(
// "torrent with name '%s' not compatable existing infohash: %s, new: %s",
// t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
// )
// }
// }
// }
compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, info)
if err != nil {
return nil, err
}
if !compatable {
return nil, fmt.Errorf(
"torrent with name '%s' not compatable existing infohash: %s, new: %s",
t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
)
}
}
t, _ = s.c.AddTorrentOpt(torrent.AddTorrentOpts{
@ -149,9 +168,67 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
return t, nil
}
func isTorrentCompatable(existingInfo, newInfo metainfo.Info) bool {
existingFiles := slices.Clone(existingInfo.Files)
newFiles := slices.Clone(newInfo.Files)
func (s *Service) checkTorrentCompatable(ctx context.Context, ih infohash.T, info metainfo.Info) (compatable bool, tryLater bool, err error) {
log := s.log.With("new-name", info.BestName(), "new-infohash", ih.String())
name := info.BestName()
aq, found, err := s.dirsAquire.Get(ctx, info.BestName())
if err != nil {
return false, false, err
}
if !found {
err = s.dirsAquire.Set(ctx, name, DirAquire{
Name: name,
Hashes: slices.Compact([]infohash.T{ih}),
})
if err != nil {
return false, false, err
}
log.Debug("acquiring was not found, so created")
return true, false, nil
}
if slices.Contains(aq.Hashes, ih) {
log.Debug("hash already know to be compatable")
return true, false, nil
}
for _, existingTorrent := range s.c.Torrents() {
if existingTorrent.Name() != name || existingTorrent.InfoHash() == ih {
continue
}
existingInfo := existingTorrent.Info()
existingFiles := slices.Clone(existingInfo.Files)
newFiles := slices.Clone(info.Files)
if !s.checkTorrentFilesCompatable(aq, existingFiles, newFiles) {
return false, false, nil
}
aq.Hashes = slicesUnique(append(aq.Hashes, ih))
err = s.dirsAquire.Set(ctx, aq.Name, aq)
if err != nil {
log.Warn("torrent not compatible")
return false, false, err
}
}
if slices.Contains(aq.Hashes, ih) {
log.Debug("hash is compatable")
return true, false, nil
}
log.Debug("torrent with same name not found, try later")
return false, true, nil
}
func (s *Service) checkTorrentFilesCompatable(aq DirAquire, existingFiles, newFiles []metainfo.FileInfo) bool {
log := s.log.With("name", aq.Name)
pathCmp := func(a, b metainfo.FileInfo) int {
return slices.Compare(a.BestPath(), b.BestPath())
@ -167,14 +244,45 @@ func isTorrentCompatable(existingInfo, newInfo metainfo.Info) bool {
}
if len(newFiles) > len(existingFiles) {
all := append(existingFiles, newFiles...)
slices.SortStableFunc(all, pathCmp)
slices.CompactFunc(all, func(fi1, fi2 metainfo.FileInfo) bool {
return slices.Equal(fi1.BestPath(), fi2.BestPath()) && fi1.Length == fi2.Length
})
type fileInfo struct {
Path string
Length int64
}
mapInfo := func(fi metainfo.FileInfo) fileInfo {
return fileInfo{
Path: strings.Join(fi.BestPath(), "/"),
Length: fi.Length,
}
}
existingFiles := apply(existingFiles, mapInfo)
newFiles := apply(newFiles, mapInfo)
for _, n := range newFiles {
if slices.Contains(existingFiles, n) {
continue
}
for _, e := range existingFiles {
if e.Path == n.Path && e.Length != n.Length {
log.Warn("torrents not compatible, has files with different length", "path", n.Path, "existing-length", e.Length, "new-length", e.Length)
return false
}
}
}
}
return false
return true
}
func (s *Service) getTorrentsByName(name string) []*torrent.Torrent {
out := []*torrent.Torrent{}
for _, t := range s.c.Torrents() {
if t.Name() == name {
out = append(out, t)
}
}
return out
}
func isValidInfoHashBytes(d []byte) bool {
@ -188,12 +296,17 @@ func (s *Service) NewTorrentFs(f vfs.File) (vfs.Filesystem, error) {
defer cancel()
defer f.Close()
info, err := f.Stat()
if err != nil {
return nil, err
}
t, err := s.AddTorrent(ctx, f)
if err != nil {
return nil, err
}
return vfs.NewTorrentFs(controller.NewTorrent(t, s.excludedFiles), s.readTimeout), nil
return vfs.NewTorrentFs(info.Name(), controller.NewTorrent(t, s.excludedFiles), s.readTimeout), nil
}
func (s *Service) Stats() (*Stats, error) {
@ -252,3 +365,20 @@ func (s *Service) GetTorrent(infohashHex string) (*controller.Torrent, error) {
return controller.NewTorrent(t, s.excludedFiles), nil
}
func slicesUnique[S ~[]E, E comparable](in S) S {
m := map[E]struct{}{}
for _, v := range in {
m[v] = struct{}{}
}
return maps.Keys(m)
}
func apply[I, O any](in []I, f func(e I) O) []O {
out := []O{}
for _, v := range in {
out = append(out, f(v))
}
return out
}

View file

@ -40,9 +40,9 @@ var ArchiveFactories = map[string]FsFactory{
type archiveLoader func(r iio.Reader, size int64) (map[string]*archiveFile, error)
var _ Filesystem = &archive{}
var _ Filesystem = &ArchiveFS{}
type archive struct {
type ArchiveFS struct {
name string
r iio.Reader
@ -52,8 +52,8 @@ type archive struct {
files func() (map[string]File, error)
}
func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *archive {
return &archive{
func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *ArchiveFS {
return &ArchiveFS{
name: name,
r: r,
size: size,
@ -94,11 +94,11 @@ func NewArchive(name string, r iio.Reader, size int64, loader archiveLoader) *ar
}
// Unlink implements Filesystem.
func (a *archive) Unlink(filename string) error {
func (a *ArchiveFS) Unlink(filename string) error {
return ErrNotImplemented
}
func (a *archive) Open(filename string) (File, error) {
func (a *ArchiveFS) Open(filename string) (File, error) {
files, err := a.files()
if err != nil {
return nil, err
@ -107,7 +107,7 @@ func (a *archive) Open(filename string) (File, error) {
return getFile(files, filename)
}
func (fs *archive) ReadDir(path string) ([]fs.DirEntry, error) {
func (fs *ArchiveFS) ReadDir(path string) ([]fs.DirEntry, error) {
files, err := fs.files()
if err != nil {
return nil, err
@ -117,7 +117,7 @@ func (fs *archive) ReadDir(path string) ([]fs.DirEntry, error) {
}
// Stat implements Filesystem.
func (afs *archive) Stat(filename string) (fs.FileInfo, error) {
func (afs *ArchiveFS) Stat(filename string) (fs.FileInfo, error) {
files, err := afs.files()
if err != nil {
return nil, err

View file

@ -51,7 +51,21 @@ func (r *ResolveFS) ReadDir(dir string) ([]fs.DirEntry, error) {
out := make([]fs.DirEntry, 0, len(entries))
for _, e := range entries {
if r.resolver.isNestedFs(e.Name()) {
out = append(out, newDirInfo(e.Name()))
filepath := path.Join(dir, e.Name())
file, err := r.Open(filepath)
if err != nil {
return nil, err
}
nfs, err := r.resolver.nestedFs(filepath, file)
if err != nil {
return nil, err
}
if e, ok := nfs.(fs.DirEntry); ok {
out = append(out, e)
} else {
out = append(out, newDirInfo(e.Name()))
}
} else {
out = append(out, e)
}

View file

@ -83,7 +83,7 @@ func TestResolver(t *testing.T) {
require.NoError(err)
require.Equal("/f1.rar", fsPath)
require.Equal("/f2.rar", nestedFsPath)
require.IsType(&archive{}, nestedFs)
require.IsType(&ArchiveFS{}, nestedFs)
})
t.Run("root", func(t *testing.T) {
t.Parallel()
@ -123,7 +123,7 @@ func TestResolver(t *testing.T) {
require.NoError(err)
require.Equal("/f1.rar", fsPath)
require.Equal("/", nestedFsPath)
require.IsType(&archive{}, nestedFs)
require.IsType(&ArchiveFS{}, nestedFs)
})
t.Run("inside folder", func(t *testing.T) {
t.Parallel()
@ -134,7 +134,7 @@ func TestResolver(t *testing.T) {
return &Dummy{}, nil
})
require.NoError(err)
require.IsType(&archive{}, nestedFs)
require.IsType(&ArchiveFS{}, nestedFs)
require.Equal("/test1/f1.rar", fsPath)
require.Equal("/", nestedFsPath)
})

View file

@ -21,8 +21,10 @@ import (
var _ Filesystem = &TorrentFs{}
type TorrentFs struct {
mu sync.Mutex
c *controller.Torrent
name string
mu sync.Mutex
Torrent *controller.Torrent
readTimeout int
@ -31,14 +33,37 @@ type TorrentFs struct {
resolver *resolver
}
func NewTorrentFs(c *controller.Torrent, readTimeout int) *TorrentFs {
func NewTorrentFs(name string, c *controller.Torrent, readTimeout int) *TorrentFs {
return &TorrentFs{
c: c,
name: name,
Torrent: c,
readTimeout: readTimeout,
resolver: newResolver(ArchiveFactories),
}
}
var _ fs.DirEntry = (*TorrentFs)(nil)
// Name implements fs.DirEntry.
func (tfs *TorrentFs) Name() string {
return tfs.name
}
// Info implements fs.DirEntry.
func (tfs *TorrentFs) Info() (fs.FileInfo, error) {
return newDirInfo(tfs.name), nil
}
// IsDir implements fs.DirEntry.
func (tfs *TorrentFs) IsDir() bool {
return true
}
// Type implements fs.DirEntry.
func (tfs *TorrentFs) Type() fs.FileMode {
return fs.ModeDir
}
func (fs *TorrentFs) files() (map[string]File, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
@ -47,7 +72,7 @@ func (fs *TorrentFs) files() (map[string]File, error) {
return fs.filesCache, nil
}
files, err := fs.c.Files(context.Background())
files, err := fs.Torrent.Files(context.Background())
if err != nil {
return nil, err
}
@ -65,8 +90,8 @@ func (fs *TorrentFs) files() (map[string]File, error) {
}
// TODO optional
if len(fs.filesCache) == 1 && fs.resolver.isNestedFs(fs.c.Name()) {
filepath := "/" + fs.c.Name()
if len(fs.filesCache) == 1 && fs.resolver.isNestedFs(fs.Torrent.Name()) {
filepath := "/" + fs.Torrent.Name()
if file, ok := fs.filesCache[filepath]; ok {
nestedFs, err := fs.resolver.nestedFs(filepath, file)
if err != nil {
@ -86,7 +111,7 @@ func (fs *TorrentFs) files() (map[string]File, error) {
}
DEFAULT_DIR:
rootDir := "/" + fs.c.Name() + "/"
rootDir := "/" + fs.Torrent.Name() + "/"
singleDir := true
for k, _ := range fs.filesCache {
if !strings.HasPrefix(k, rootDir) {
@ -238,7 +263,7 @@ func (fs *TorrentFs) Unlink(name string) error {
return ErrNotImplemented
}
return fs.c.ExcludeFile(context.Background(), tfile.file)
return fs.Torrent.ExcludeFile(context.Background(), tfile.file)
}
type reader interface {

View file

@ -1,30 +1,21 @@
package log
import (
"log/slog"
"os"
"time"
"git.kmsign.ru/royalcat/tstor/src/config"
"github.com/lmittmann/tint"
)
const FileName = "tstor.log"
func Load(config *config.Log) {
level := slog.LevelInfo
if config.Debug {
level = slog.LevelDebug
}
// func Load(config *config.Log) {
// level := slog.LevelInfo
// if config.Debug {
// level = slog.LevelDebug
// }
slog.SetDefault(slog.New(
tint.NewHandler(os.Stdout, &tint.Options{
Level: level,
TimeFormat: time.Kitchen,
// NoColor: !isatty.IsTerminal(os.Stdout.Fd()),
}),
))
}
// slog.SetDefault(slog.New(
// tint.NewHandler(os.Stdout, &tint.Options{
// Level: level,
// TimeFormat: time.Kitchen,
// // NoColor: !isatty.IsTerminal(os.Stdout.Fd()),
// }),
// ))
// }
// func newRollingFile(config *config.Log) io.Writer {
// if err := os.MkdirAll(config.Path, 0744); err != nil {

126
src/telemetry/setup.go Normal file
View file

@ -0,0 +1,126 @@
package telemetry
import (
"context"
"log/slog"
"os"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/agoda-com/opentelemetry-go/otelslog"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp"
logsdk "github.com/agoda-com/opentelemetry-logs-go/sdk/logs"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
)
type Client struct {
log *slog.Logger
tracerProvider *trace.TracerProvider
metricProvider *metric.MeterProvider
loggerProvider *logsdk.LoggerProvider
}
func (client *Client) Shutdown(ctx context.Context) {
log := rlog.FunctionLog(client.log, "Shutdown")
if client.metricProvider == nil {
err := client.metricProvider.Shutdown(ctx)
if err != nil {
log.Error("error shutting down metric provider", rlog.Err(err))
}
}
if client.tracerProvider == nil {
err := client.tracerProvider.Shutdown(ctx)
if err != nil {
log.Error("error shutting down tracer provider", rlog.Err(err))
}
}
if client.loggerProvider == nil {
err := client.loggerProvider.Shutdown(ctx)
if err != nil {
log.Error("error shutting down logger provider", rlog.Err(err))
}
}
}
const appName = "tstor"
func Setup(ctx context.Context, endpoint string) (*Client, error) {
log := rlog.ComponentLog("telemetry")
client := &Client{
log: log,
}
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(cause error) {
log.Error("otel error", rlog.Err(cause))
}))
hostName, _ := os.Hostname()
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(appName),
semconv.HostName(hostName),
),
)
if err != nil {
return nil, err
}
metricExporter, err := prometheus.New(prometheus.WithNamespace(appName))
if err != nil {
return nil, err
}
client.metricProvider = metric.NewMeterProvider(
metric.WithReader(metricExporter),
metric.WithResource(r),
)
otel.SetMeterProvider(client.metricProvider)
log.Info("prometheus metrics provider initialized")
traceExporter, err := otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint(endpoint),
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: false,
}),
)
if err != nil {
return nil, err
}
client.tracerProvider = trace.NewTracerProvider(
trace.WithBatcher(traceExporter),
trace.WithResource(r),
)
otel.SetTracerProvider(client.tracerProvider)
log.Info("otel tracing provider initialized")
logExporter, err := otlplogs.NewExporter(ctx,
otlplogs.WithClient(
otlplogshttp.NewClient(otlplogshttp.WithEndpoint(endpoint)),
),
)
if err != nil {
return nil, err
}
client.loggerProvider = logsdk.NewLoggerProvider(
logsdk.WithBatcher(logExporter),
logsdk.WithResource(r),
)
rlog.AddHandler(otelslog.NewOtelHandler(client.loggerProvider,
&otelslog.HandlerOptions{
Level: slog.LevelDebug,
}),
)
client.log = slog.Default()
return client, nil
}