This commit is contained in:
royalcat 2024-06-03 00:20:43 +03:00
parent bd75492b02
commit 06153d61c9
8 changed files with 108 additions and 135 deletions

View file

@ -1,53 +0,0 @@
package sources
import (
"context"
"encoding/json"
"fmt"
"reflect"
)
type UpdateTask interface{}
type Source interface {
Name() string // unique name within source type
SourceType() string
Fetch(ctx context.Context, task UpdateTask, dir string) error
}
var sourceTypesRegistry = map[string]reflect.Type{}
// func RegisterSource[T Source]() {
// var s T
// t := reflect.TypeOf(s)
// if t.Kind() == reflect.Ptr {
// RegisterSource[T]()
// return
// }
// sourceTypesRegistry[s.SourceType()] = t
// }
type sourceType struct {
Type string `json:"type"`
}
func parseSource(data []byte) (Source, error) {
var sourceType sourceType
err := json.Unmarshal(data, &sourceType)
if err != nil {
return nil, err
}
st, ok := sourceTypesRegistry[sourceType.Type]
if !ok {
return nil, fmt.Errorf("source type %s not registred", sourceType.Type)
}
s := reflect.New(st).Interface().(Source)
err = json.Unmarshal(data, &s)
if err != nil {
return nil, err
}
return s, nil
}

View file

@ -92,11 +92,11 @@ func NewService(sourceFs billy.Filesystem, conf config.TorrentClient) (*Service,
return nil, fmt.Errorf("error creating node ID: %w", err)
}
client, err := newClient(s.Storage, s.fis, &conf, id)
s.client, err = newClient(s.Storage, s.fis, &conf, id)
if err != nil {
return nil, fmt.Errorf("error starting torrent client: %w", err)
}
client.AddDhtNodes(conf.DHTNodes)
s.client.AddDhtNodes(conf.DHTNodes)
s.dirsAquire, err = NewKV[string, DirAquire](conf.MetadataFolder, "dir-acquire")
if err != nil {

View file

@ -6,18 +6,39 @@ import (
"fmt"
"path"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
"git.kmsign.ru/royalcat/tstor/src/vfs"
"github.com/go-git/go-billy/v5/osfs"
"github.com/royalcat/ctxio"
"github.com/royalcat/ctxprogress"
)
func NewService(dataDir string) *Service {
return &Service{
s := &Service{
dataDir: dataDir,
sources: make(map[string]ytdlpSource, 0),
}
go func() {
for {
ctx := context.Background()
ctx = ctxprogress.New(ctx)
ctxprogress.AddCallback(ctx, func(p ctxprogress.Progress) {
cur, total := p.Progress()
fmt.Printf("updating sources: %d/%d\n", cur, total)
})
err := s.Update(ctx)
if err != nil {
fmt.Println("failed to update sources:", err)
}
time.Sleep(time.Minute)
}
}()
return s
}
type Service struct {
@ -27,7 +48,7 @@ type Service struct {
sources map[string]ytdlpSource
}
func (c *Service) AddSource(s ytdlpSource) {
func (c *Service) addSource(s ytdlpSource) {
c.mu.Lock()
defer c.mu.Unlock()
@ -65,9 +86,9 @@ func (c *Service) BuildFS(ctx context.Context, f vfs.File) (vfs.Filesystem, erro
return nil, err
}
c.AddSource(s)
c.addSource(s)
downloadFS := ctxbilly.WrapFileSystem(osfs.New(c.sourceDir(s)))
return newSourceFS(s.Name(), downloadFS, c, s), nil
return newSourceFS(path.Base(f.Name()), downloadFS, c, s), nil
}

View file

@ -3,6 +3,7 @@ package ytdlp
import (
"context"
"crypto/sha1"
"encoding/base64"
"git.kmsign.ru/royalcat/tstor/pkg/ytdlp"
"github.com/royalcat/ctxprogress"
@ -15,7 +16,7 @@ type ytdlpSource struct {
var hasher = sha1.New()
func (s *ytdlpSource) Name() string {
return string(hasher.Sum([]byte(s.Url)))
return base64.URLEncoding.EncodeToString(hasher.Sum([]byte(s.Url)))
}
func (s *ytdlpSource) Download(ctx context.Context, task TaskUpdater, dir string) error {

View file

@ -10,17 +10,17 @@ import (
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"github.com/agoda-com/opentelemetry-go/otelslog"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogsgrpc"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp"
logsdk "github.com/agoda-com/opentelemetry-logs-go/sdk/logs"
otelpyroscope "github.com/grafana/otel-profiling-go"
"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
)
type Client struct {
@ -76,9 +76,9 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
return nil, err
}
meticExporter, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint(endpoint),
otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{
meticExporter, err := otlpmetrichttp.New(ctx,
otlpmetrichttp.WithEndpoint(endpoint),
otlpmetrichttp.WithRetry(otlpmetrichttp.RetryConfig{
Enabled: false,
}),
)
@ -118,8 +118,8 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
logExporter, err := otlplogs.NewExporter(ctx,
otlplogs.WithClient(
otlplogsgrpc.NewClient(
otlplogsgrpc.WithEndpoint(endpoint),
otlplogshttp.NewClient(
otlplogshttp.WithEndpoint(endpoint),
),
),
)
@ -140,34 +140,34 @@ func Setup(ctx context.Context, endpoint string) (*Client, error) {
// recreate telemetry logger
client.log = rlog.Component("telemetry")
runtime.SetMutexProfileFraction(5)
runtime.SetBlockProfileRate(5)
_, err = pyroscope.Start(pyroscope.Config{
ApplicationName: appName,
// replace this with the address of pyroscope server
ServerAddress: "https://pyroscope.kmsign.ru",
// you can disable logging by setting this to nil
Logger: &pyroscopeLogger{
log: client.log.WithComponent("pyroscope"),
},
ProfileTypes: []pyroscope.ProfileType{
// these profile types are enabled by default:
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileInuseSpace,
// these profile types are optional:
// pyroscope.ProfileGoroutines,
// pyroscope.ProfileMutexCount,
// pyroscope.ProfileMutexDuration,
// pyroscope.ProfileBlockCount,
// pyroscope.ProfileBlockDuration,
},
})
if err != nil {
return client, nil
}
// runtime.SetMutexProfileFraction(5)
// runtime.SetBlockProfileRate(5)
// _, err = pyroscope.Start(pyroscope.Config{
// ApplicationName: appName,
// // replace this with the address of pyroscope server
// ServerAddress: "https://pyroscope.kmsign.ru",
// // you can disable logging by setting this to nil
// Logger: &pyroscopeLogger{
// log: client.log.WithComponent("pyroscope"),
// },
// ProfileTypes: []pyroscope.ProfileType{
// // these profile types are enabled by default:
// pyroscope.ProfileCPU,
// pyroscope.ProfileAllocObjects,
// pyroscope.ProfileAllocSpace,
// pyroscope.ProfileInuseObjects,
// pyroscope.ProfileInuseSpace,
// // these profile types are optional:
// // pyroscope.ProfileGoroutines,
// // pyroscope.ProfileMutexCount,
// // pyroscope.ProfileMutexDuration,
// // pyroscope.ProfileBlockCount,
// // pyroscope.ProfileBlockDuration,
// },
// })
// if err != nil {
// return client, nil
// }
return client, nil
}