112 lines
2 KiB
Go
112 lines
2 KiB
Go
package uring
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
|
|
"github.com/iceber/iouring-go"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
var tracer = otel.Tracer("github.com/royalcat/tstor/pkg/uring")
|
|
|
|
type FS struct {
|
|
ur *iouring.IOURing
|
|
}
|
|
|
|
func NewFS(ur *iouring.IOURing) *FS {
|
|
return &FS{
|
|
ur: ur,
|
|
}
|
|
}
|
|
|
|
func (o *FS) OpenFile(ctx context.Context, name string) (File, error) {
|
|
ctx, span := tracer.Start(ctx, "uring.FS.OpenFile", trace.WithAttributes(attribute.String("name", name)))
|
|
defer span.End()
|
|
|
|
f, err := os.Open(name)
|
|
if err != nil {
|
|
return File{}, err
|
|
}
|
|
|
|
return File{
|
|
ur: o.ur,
|
|
f: f,
|
|
}, nil
|
|
}
|
|
|
|
func NewFile(ur *iouring.IOURing, f *os.File) *File {
|
|
return &File{
|
|
ur: ur,
|
|
f: f,
|
|
}
|
|
}
|
|
|
|
type File struct {
|
|
ur *iouring.IOURing
|
|
f *os.File
|
|
}
|
|
|
|
func (o *File) pread(ctx context.Context, b []byte, off uint64) (int, error) {
|
|
ctx, span := tracer.Start(ctx, "uring.File.pread", trace.WithAttributes(attribute.Int("size", len(b))))
|
|
defer span.End()
|
|
|
|
req, err := o.ur.Pread(o.f, b, off, nil)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
select {
|
|
case <-req.Done():
|
|
return req.GetRes()
|
|
case <-ctx.Done():
|
|
if _, err := req.Cancel(); err != nil {
|
|
return 0, err
|
|
}
|
|
<-req.Done()
|
|
return 0, ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (f *File) ReadAt(ctx context.Context, b []byte, off int64) (n int, err error) {
|
|
ctx, span := tracer.Start(ctx, "uring.File.ReadAt", trace.WithAttributes(attribute.Int("size", len(b))))
|
|
defer span.End()
|
|
|
|
return f.f.ReadAt(b, off)
|
|
|
|
for len(b) > 0 {
|
|
if ctx.Err() != nil {
|
|
err = ctx.Err()
|
|
break
|
|
}
|
|
|
|
m, e := f.pread(ctx, b, uint64(off))
|
|
if e != nil {
|
|
err = e
|
|
break
|
|
}
|
|
n += m
|
|
b = b[m:]
|
|
off += int64(m)
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
func (o *File) Close(ctx context.Context) error {
|
|
return o.f.Close()
|
|
}
|
|
|
|
func waitRequest(ctx context.Context, req iouring.Request) (int, error) {
|
|
select {
|
|
case <-req.Done():
|
|
return req.GetRes()
|
|
case <-ctx.Done():
|
|
if _, err := req.Cancel(); err != nil {
|
|
return 0, err
|
|
}
|
|
return 0, ctx.Err()
|
|
}
|
|
}
|