Compare commits
2 commits
75d17267d7
...
c65fd89887
Author | SHA1 | Date | |
---|---|---|---|
c65fd89887 | |||
a8002616eb |
2 changed files with 77 additions and 28 deletions
6
.github/workflows/docker.yaml
vendored
6
.github/workflows/docker.yaml
vendored
|
@ -25,9 +25,6 @@ jobs:
|
||||||
# - linux/riscv64
|
# - linux/riscv64
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout repository
|
|
||||||
uses: actions/checkout@v3
|
|
||||||
|
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
@ -38,6 +35,9 @@ jobs:
|
||||||
username: ${{ github.actor }}
|
username: ${{ github.actor }}
|
||||||
password: ${{ secrets.PACKAGE_TOKEN }}
|
password: ${{ secrets.PACKAGE_TOKEN }}
|
||||||
|
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
- name: Docker meta
|
- name: Docker meta
|
||||||
id: meta
|
id: meta
|
||||||
uses: https://github.com/docker/metadata-action@v5
|
uses: https://github.com/docker/metadata-action@v5
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.kmsign.ru/royalcat/tstor/src/host/controller"
|
"git.kmsign.ru/royalcat/tstor/src/host/controller"
|
||||||
|
@ -25,6 +26,8 @@ type TorrentFs struct {
|
||||||
|
|
||||||
filesCache map[string]File
|
filesCache map[string]File
|
||||||
|
|
||||||
|
lastAccessTimeout atomic.Pointer[time.Time]
|
||||||
|
|
||||||
resolver *resolver
|
resolver *resolver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,62 +251,104 @@ func (fs *TorrentFs) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat implements Filesystem.
|
// Stat implements Filesystem.
|
||||||
func (fs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
|
func (tfs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
|
||||||
ctx, span := tracer.Start(ctx, "Stat",
|
ctx, span := tracer.Start(ctx, "Stat",
|
||||||
fs.traceAttrs(attribute.String("filename", filename)),
|
tfs.traceAttrs(attribute.String("filename", filename)),
|
||||||
)
|
)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if isRoot(filename) {
|
if isRoot(filename) {
|
||||||
return fs, nil
|
return tfs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen)
|
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if nestedFs != nil {
|
if nestedFs != nil {
|
||||||
|
lastReadTimeout := tfs.lastAccessTimeout.Load()
|
||||||
|
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
||||||
|
span.SetAttributes(attribute.Bool("short_timeout", true))
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err == context.DeadlineExceeded {
|
||||||
|
now := time.Now()
|
||||||
|
tfs.lastAccessTimeout.Store(&now)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nestedFs.Stat(ctx, nestedFsPath)
|
return nestedFs.Stat(ctx, nestedFsPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
return fs.rawStat(ctx, fsPath)
|
return tfs.rawStat(ctx, fsPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *TorrentFs) Open(ctx context.Context, filename string) (File, error) {
|
func (tfs *TorrentFs) Open(ctx context.Context, filename string) (file File, err error) {
|
||||||
ctx, span := tracer.Start(ctx, "Open",
|
ctx, span := tracer.Start(ctx, "Open",
|
||||||
fs.traceAttrs(attribute.String("filename", filename)),
|
tfs.traceAttrs(attribute.String("filename", filename)),
|
||||||
)
|
)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if isRoot(filename) {
|
if isRoot(filename) {
|
||||||
return newDirFile(fs.name), nil
|
return newDirFile(tfs.name), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen)
|
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, filename, tfs.rawOpen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if nestedFs != nil {
|
if nestedFs != nil {
|
||||||
|
lastReadTimeout := tfs.lastAccessTimeout.Load()
|
||||||
|
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
||||||
|
span.SetAttributes(attribute.Bool("short_timeout", true))
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err == context.DeadlineExceeded {
|
||||||
|
now := time.Now()
|
||||||
|
tfs.lastAccessTimeout.Store(&now)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nestedFs.Open(ctx, nestedFsPath)
|
return nestedFs.Open(ctx, nestedFsPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
return fs.rawOpen(ctx, fsPath)
|
return tfs.rawOpen(ctx, fsPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
func (tfs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
||||||
ctx, span := tracer.Start(ctx, "ReadDir",
|
ctx, span := tracer.Start(ctx, "ReadDir",
|
||||||
fs.traceAttrs(attribute.String("name", name)),
|
tfs.traceAttrs(attribute.String("name", name)),
|
||||||
)
|
)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, name, fs.rawOpen)
|
fsPath, nestedFs, nestedFsPath, err := tfs.resolver.resolvePath(ctx, name, tfs.rawOpen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if nestedFs != nil {
|
if nestedFs != nil {
|
||||||
|
lastReadTimeout := tfs.lastAccessTimeout.Load()
|
||||||
|
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
||||||
|
span.SetAttributes(attribute.Bool("short_timeout", true))
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err == context.DeadlineExceeded {
|
||||||
|
now := time.Now()
|
||||||
|
tfs.lastAccessTimeout.Store(&now)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nestedFs.ReadDir(ctx, nestedFsPath)
|
return nestedFs.ReadDir(ctx, nestedFsPath)
|
||||||
}
|
}
|
||||||
files, err := fs.files(ctx)
|
files, err := tfs.files(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -347,16 +392,16 @@ var _ File = (*torrentFile)(nil)
|
||||||
type torrentFile struct {
|
type torrentFile struct {
|
||||||
name string
|
name string
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.RWMutex
|
||||||
|
|
||||||
tr torrent.Reader
|
tr torrent.Reader
|
||||||
|
|
||||||
lastReadTimeout time.Time
|
lastReadTimeout atomic.Pointer[time.Time]
|
||||||
|
|
||||||
file *torrent.File
|
file *torrent.File
|
||||||
}
|
}
|
||||||
|
|
||||||
const secondaryTimeout = time.Hour
|
const secondaryTimeout = time.Hour * 24
|
||||||
|
|
||||||
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
|
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
|
||||||
// select {
|
// select {
|
||||||
|
@ -416,10 +461,11 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||||
span.End()
|
span.End()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tf.mu.Lock()
|
tf.mu.RLock()
|
||||||
defer tf.mu.Unlock()
|
defer tf.mu.RUnlock()
|
||||||
|
|
||||||
if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
lastReadTimeout := tf.lastReadTimeout.Load()
|
||||||
|
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
||||||
span.SetAttributes(attribute.Bool("short_timeout", true))
|
span.SetAttributes(attribute.Bool("short_timeout", true))
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
||||||
|
@ -427,7 +473,8 @@ func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == context.DeadlineExceeded {
|
if err == context.DeadlineExceeded {
|
||||||
tf.lastReadTimeout = time.Now()
|
now := time.Now()
|
||||||
|
tf.lastReadTimeout.Store(&now)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -443,10 +490,11 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int,
|
||||||
span.End()
|
span.End()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tf.mu.Lock()
|
tf.mu.RLock()
|
||||||
defer tf.mu.Unlock()
|
defer tf.mu.RUnlock()
|
||||||
|
|
||||||
if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
|
lastReadTimeout := tf.lastReadTimeout.Load()
|
||||||
|
if lastReadTimeout != nil && time.Since(*lastReadTimeout) < secondaryTimeout { /// make short timeout for already faliled files
|
||||||
span.SetAttributes(attribute.Bool("short_timeout", true))
|
span.SetAttributes(attribute.Bool("short_timeout", true))
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
||||||
|
@ -454,7 +502,8 @@ func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int,
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == context.DeadlineExceeded {
|
if err == context.DeadlineExceeded {
|
||||||
tf.lastReadTimeout = time.Now()
|
now := time.Now()
|
||||||
|
tf.lastReadTimeout.Store(&now)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue