This commit is contained in:
royalcat 2024-03-28 16:09:42 +03:00
parent 7b1863109c
commit ef751771d2
107 changed files with 9435 additions and 850 deletions

View file

@ -1,6 +1,6 @@
package config
var defaultConfig = Config{
var defaultConfig = Settings{
SourceDir: "./data",
WebUi: WebUi{
Port: 4444,
@ -21,8 +21,9 @@ var defaultConfig = Config{
Enabled: false,
},
NFS: NFS{
Enabled: false,
Port: 8122,
Enabled: false,
Port: 8122,
CachePath: "./nfs-cache",
},
},

View file

@ -13,8 +13,9 @@ import (
var k = koanf.New(".")
func Load(path string) (*Config, error) {
var Config = defaultConfig
func Load(path string) (*Settings, error) {
err := k.Load(structs.Provider(defaultConfig, "koanf"), nil)
if err != nil {
return nil, err
@ -50,7 +51,7 @@ func Load(path string) (*Config, error) {
return nil, err
}
conf := Config{}
conf := Settings{}
err = k.Unmarshal("", &conf)
if err != nil {
return nil, err

View file

@ -1,7 +1,7 @@
package config
// Config is the main config object
type Config struct {
type Settings struct {
WebUi WebUi `koanf:"webUi"`
TorrentClient TorrentClient `koanf:"torrent"`
Mounts Mounts `koanf:"mounts"`
@ -67,8 +67,9 @@ type Mounts struct {
}
type NFS struct {
Enabled bool `koanf:"enabled"`
Port int `koanf:"port"`
Enabled bool `koanf:"enabled"`
Port int `koanf:"port"`
CachePath string `koanf:"cache_path"`
}
type HttpFs struct {

View file

@ -57,6 +57,11 @@ type ComplexityRoot struct {
Size func(childComplexity int) int
}
CleanupResponse struct {
Count func(childComplexity int) int
List func(childComplexity int) int
}
Dir struct {
Name func(childComplexity int) int
}
@ -70,8 +75,14 @@ type ComplexityRoot struct {
Size func(childComplexity int) int
}
ListDirResponse struct {
Entries func(childComplexity int) int
Root func(childComplexity int) int
}
Mutation struct {
CleanupTorrents func(childComplexity int, files *bool, dryRun bool) int
DedupeStorage func(childComplexity int) int
DownloadTorrent func(childComplexity int, infohash string, file *string) int
ValidateTorrents func(childComplexity int, filter model.TorrentFilter) int
}
@ -138,12 +149,13 @@ type ComplexityRoot struct {
type MutationResolver interface {
ValidateTorrents(ctx context.Context, filter model.TorrentFilter) (bool, error)
CleanupTorrents(ctx context.Context, files *bool, dryRun bool) (int64, error)
CleanupTorrents(ctx context.Context, files *bool, dryRun bool) (*model.CleanupResponse, error)
DownloadTorrent(ctx context.Context, infohash string, file *string) (*model.DownloadTorrentResponse, error)
DedupeStorage(ctx context.Context) (int64, error)
}
type QueryResolver interface {
Torrents(ctx context.Context, filter *model.TorrentsFilter, pagination *model.Pagination) ([]*model.Torrent, error)
FsListDir(ctx context.Context, path string) ([]model.DirEntry, error)
FsListDir(ctx context.Context, path string) (*model.ListDirResponse, error)
}
type SubscriptionResolver interface {
TaskProgress(ctx context.Context, taskID string) (<-chan model.Progress, error)
@ -190,6 +202,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.ArchiveFS.Size(childComplexity), true
case "CleanupResponse.count":
if e.complexity.CleanupResponse.Count == nil {
break
}
return e.complexity.CleanupResponse.Count(childComplexity), true
case "CleanupResponse.list":
if e.complexity.CleanupResponse.List == nil {
break
}
return e.complexity.CleanupResponse.List(childComplexity), true
case "Dir.name":
if e.complexity.Dir.Name == nil {
break
@ -218,6 +244,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.File.Size(childComplexity), true
case "ListDirResponse.entries":
if e.complexity.ListDirResponse.Entries == nil {
break
}
return e.complexity.ListDirResponse.Entries(childComplexity), true
case "ListDirResponse.root":
if e.complexity.ListDirResponse.Root == nil {
break
}
return e.complexity.ListDirResponse.Root(childComplexity), true
case "Mutation.cleanupTorrents":
if e.complexity.Mutation.CleanupTorrents == nil {
break
@ -230,6 +270,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Mutation.CleanupTorrents(childComplexity, args["files"].(*bool), args["dryRun"].(bool)), true
case "Mutation.dedupeStorage":
if e.complexity.Mutation.DedupeStorage == nil {
break
}
return e.complexity.Mutation.DedupeStorage(childComplexity), true
case "Mutation.downloadTorrent":
if e.complexity.Mutation.DownloadTorrent == nil {
break
@ -602,28 +649,34 @@ func (ec *executionContext) introspectType(name string) (*introspection.Type, er
var sources = []*ast.Source{
{Name: "../../../graphql/mutation.graphql", Input: `type Mutation {
validateTorrents(filter: TorrentFilter!): Boolean!
cleanupTorrents(files: Boolean, dryRun: Boolean!): Int!
downloadTorrent(infohash: String!, file: String): DownloadTorrentResponse
validateTorrents(filter: TorrentFilter!): Boolean!
cleanupTorrents(files: Boolean, dryRun: Boolean!): CleanupResponse!
downloadTorrent(infohash: String!, file: String): DownloadTorrentResponse
dedupeStorage: Int!
}
input TorrentFilter @oneOf {
everything: Boolean
infohash: String
# pathGlob: String!
everything: Boolean
infohash: String
# pathGlob: String!
}
type DownloadTorrentResponse {
task: Task
task: Task
}
type CleanupResponse {
count: Int!
list: [String!]!
}
type Task {
id: ID!
}`, BuiltIn: false},
id: ID!
}
`, BuiltIn: false},
{Name: "../../../graphql/query.graphql", Input: `type Query {
torrents(filter: TorrentsFilter, pagination: Pagination): [Torrent!]!
fsListDir(path: String!): [DirEntry!]!
fsListDir(path: String!): ListDirResponse!
}
input TorrentsFilter {
@ -634,6 +687,11 @@ input TorrentsFilter {
peersCount: IntFilter
}
type ListDirResponse {
root: DirEntry!
entries: [DirEntry!]!
}
input Pagination {
offset: Int!
limit: Int!
@ -1008,6 +1066,94 @@ func (ec *executionContext) fieldContext_ArchiveFS_size(ctx context.Context, fie
return fc, nil
}
func (ec *executionContext) _CleanupResponse_count(ctx context.Context, field graphql.CollectedField, obj *model.CleanupResponse) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_CleanupResponse_count(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Count, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int64)
fc.Result = res
return ec.marshalNInt2int64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_CleanupResponse_count(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "CleanupResponse",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _CleanupResponse_list(ctx context.Context, field graphql.CollectedField, obj *model.CleanupResponse) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_CleanupResponse_list(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.List, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.([]string)
fc.Result = res
return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_CleanupResponse_list(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "CleanupResponse",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Dir_name(ctx context.Context, field graphql.CollectedField, obj *model.Dir) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Dir_name(ctx, field)
if err != nil {
@ -1185,6 +1331,94 @@ func (ec *executionContext) fieldContext_File_size(ctx context.Context, field gr
return fc, nil
}
func (ec *executionContext) _ListDirResponse_root(ctx context.Context, field graphql.CollectedField, obj *model.ListDirResponse) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_ListDirResponse_root(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Root, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(model.DirEntry)
fc.Result = res
return ec.marshalNDirEntry2gitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐDirEntry(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_ListDirResponse_root(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "ListDirResponse",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("FieldContext.Child cannot be called on type INTERFACE")
},
}
return fc, nil
}
func (ec *executionContext) _ListDirResponse_entries(ctx context.Context, field graphql.CollectedField, obj *model.ListDirResponse) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_ListDirResponse_entries(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Entries, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.([]model.DirEntry)
fc.Result = res
return ec.marshalNDirEntry2ᚕgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐDirEntryᚄ(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_ListDirResponse_entries(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "ListDirResponse",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("FieldContext.Child cannot be called on type INTERFACE")
},
}
return fc, nil
}
func (ec *executionContext) _Mutation_validateTorrents(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Mutation_validateTorrents(ctx, field)
if err != nil {
@ -1266,9 +1500,9 @@ func (ec *executionContext) _Mutation_cleanupTorrents(ctx context.Context, field
}
return graphql.Null
}
res := resTmp.(int64)
res := resTmp.(*model.CleanupResponse)
fc.Result = res
return ec.marshalNInt2int64(ctx, field.Selections, res)
return ec.marshalNCleanupResponse2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐCleanupResponse(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Mutation_cleanupTorrents(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
@ -1278,7 +1512,13 @@ func (ec *executionContext) fieldContext_Mutation_cleanupTorrents(ctx context.Co
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
switch field.Name {
case "count":
return ec.fieldContext_CleanupResponse_count(ctx, field)
case "list":
return ec.fieldContext_CleanupResponse_list(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type CleanupResponse", field.Name)
},
}
defer func() {
@ -1351,6 +1591,50 @@ func (ec *executionContext) fieldContext_Mutation_downloadTorrent(ctx context.Co
return fc, nil
}
func (ec *executionContext) _Mutation_dedupeStorage(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Mutation_dedupeStorage(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Mutation().DedupeStorage(rctx)
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int64)
fc.Result = res
return ec.marshalNInt2int64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Mutation_dedupeStorage(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Mutation",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Query_torrents(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Query_torrents(ctx, field)
if err != nil {
@ -1450,9 +1734,9 @@ func (ec *executionContext) _Query_fsListDir(ctx context.Context, field graphql.
}
return graphql.Null
}
res := resTmp.([]model.DirEntry)
res := resTmp.(*model.ListDirResponse)
fc.Result = res
return ec.marshalNDirEntry2ᚕgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐDirEntryᚄ(ctx, field.Selections, res)
return ec.marshalNListDirResponse2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐListDirResponse(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Query_fsListDir(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
@ -1462,7 +1746,13 @@ func (ec *executionContext) fieldContext_Query_fsListDir(ctx context.Context, fi
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("FieldContext.Child cannot be called on type INTERFACE")
switch field.Name {
case "root":
return ec.fieldContext_ListDirResponse_root(ctx, field)
case "entries":
return ec.fieldContext_ListDirResponse_entries(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type ListDirResponse", field.Name)
},
}
defer func() {
@ -1723,6 +2013,8 @@ func (ec *executionContext) fieldContext_Schema_mutation(ctx context.Context, fi
return ec.fieldContext_Mutation_cleanupTorrents(ctx, field)
case "downloadTorrent":
return ec.fieldContext_Mutation_downloadTorrent(ctx, field)
case "dedupeStorage":
return ec.fieldContext_Mutation_dedupeStorage(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type Mutation", field.Name)
},
@ -5400,6 +5692,50 @@ func (ec *executionContext) _ArchiveFS(ctx context.Context, sel ast.SelectionSet
return out
}
var cleanupResponseImplementors = []string{"CleanupResponse"}
func (ec *executionContext) _CleanupResponse(ctx context.Context, sel ast.SelectionSet, obj *model.CleanupResponse) graphql.Marshaler {
fields := graphql.CollectFields(ec.OperationContext, sel, cleanupResponseImplementors)
out := graphql.NewFieldSet(fields)
deferred := make(map[string]*graphql.FieldSet)
for i, field := range fields {
switch field.Name {
case "__typename":
out.Values[i] = graphql.MarshalString("CleanupResponse")
case "count":
out.Values[i] = ec._CleanupResponse_count(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
case "list":
out.Values[i] = ec._CleanupResponse_list(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
default:
panic("unknown field " + strconv.Quote(field.Name))
}
}
out.Dispatch(ctx)
if out.Invalids > 0 {
return graphql.Null
}
atomic.AddInt32(&ec.deferred, int32(len(deferred)))
for label, dfs := range deferred {
ec.processDeferredGroup(graphql.DeferredGroup{
Label: label,
Path: graphql.GetPath(ctx),
FieldSet: dfs,
Context: ctx,
})
}
return out
}
var dirImplementors = []string{"Dir", "DirEntry"}
func (ec *executionContext) _Dir(ctx context.Context, sel ast.SelectionSet, obj *model.Dir) graphql.Marshaler {
@ -5519,6 +5855,50 @@ func (ec *executionContext) _File(ctx context.Context, sel ast.SelectionSet, obj
return out
}
var listDirResponseImplementors = []string{"ListDirResponse"}
func (ec *executionContext) _ListDirResponse(ctx context.Context, sel ast.SelectionSet, obj *model.ListDirResponse) graphql.Marshaler {
fields := graphql.CollectFields(ec.OperationContext, sel, listDirResponseImplementors)
out := graphql.NewFieldSet(fields)
deferred := make(map[string]*graphql.FieldSet)
for i, field := range fields {
switch field.Name {
case "__typename":
out.Values[i] = graphql.MarshalString("ListDirResponse")
case "root":
out.Values[i] = ec._ListDirResponse_root(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
case "entries":
out.Values[i] = ec._ListDirResponse_entries(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
default:
panic("unknown field " + strconv.Quote(field.Name))
}
}
out.Dispatch(ctx)
if out.Invalids > 0 {
return graphql.Null
}
atomic.AddInt32(&ec.deferred, int32(len(deferred)))
for label, dfs := range deferred {
ec.processDeferredGroup(graphql.DeferredGroup{
Label: label,
Path: graphql.GetPath(ctx),
FieldSet: dfs,
Context: ctx,
})
}
return out
}
var mutationImplementors = []string{"Mutation"}
func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler {
@ -5556,6 +5936,13 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet)
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
return ec._Mutation_downloadTorrent(ctx, field)
})
case "dedupeStorage":
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
return ec._Mutation_dedupeStorage(ctx, field)
})
if out.Values[i] == graphql.Null {
out.Invalids++
}
default:
panic("unknown field " + strconv.Quote(field.Name))
}
@ -6551,6 +6938,20 @@ func (ec *executionContext) marshalNBoolean2bool(ctx context.Context, sel ast.Se
return res
}
func (ec *executionContext) marshalNCleanupResponse2gitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐCleanupResponse(ctx context.Context, sel ast.SelectionSet, v model.CleanupResponse) graphql.Marshaler {
return ec._CleanupResponse(ctx, sel, &v)
}
func (ec *executionContext) marshalNCleanupResponse2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐCleanupResponse(ctx context.Context, sel ast.SelectionSet, v *model.CleanupResponse) graphql.Marshaler {
if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
return graphql.Null
}
return ec._CleanupResponse(ctx, sel, v)
}
func (ec *executionContext) marshalNDirEntry2gitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐDirEntry(ctx context.Context, sel ast.SelectionSet, v model.DirEntry) graphql.Marshaler {
if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
@ -6650,6 +7051,20 @@ func (ec *executionContext) marshalNInt2int64(ctx context.Context, sel ast.Selec
return res
}
func (ec *executionContext) marshalNListDirResponse2gitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐListDirResponse(ctx context.Context, sel ast.SelectionSet, v model.ListDirResponse) graphql.Marshaler {
return ec._ListDirResponse(ctx, sel, &v)
}
func (ec *executionContext) marshalNListDirResponse2ᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐListDirResponse(ctx context.Context, sel ast.SelectionSet, v *model.ListDirResponse) graphql.Marshaler {
if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
return graphql.Null
}
return ec._ListDirResponse(ctx, sel, v)
}
func (ec *executionContext) unmarshalNString2string(ctx context.Context, v interface{}) (string, error) {
res, err := graphql.UnmarshalString(v)
return res, graphql.ErrorOnPath(ctx, err)
@ -6665,6 +7080,38 @@ func (ec *executionContext) marshalNString2string(ctx context.Context, sel ast.S
return res
}
func (ec *executionContext) unmarshalNString2ᚕstringᚄ(ctx context.Context, v interface{}) ([]string, error) {
var vSlice []interface{}
if v != nil {
vSlice = graphql.CoerceList(v)
}
var err error
res := make([]string, len(vSlice))
for i := range vSlice {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithIndex(i))
res[i], err = ec.unmarshalNString2string(ctx, vSlice[i])
if err != nil {
return nil, err
}
}
return res, nil
}
func (ec *executionContext) marshalNString2ᚕstringᚄ(ctx context.Context, sel ast.SelectionSet, v []string) graphql.Marshaler {
ret := make(graphql.Array, len(v))
for i := range v {
ret[i] = ec.marshalNString2string(ctx, sel, v[i])
}
for _, e := range ret {
if e == graphql.Null {
return graphql.Null
}
}
return ret
}
func (ec *executionContext) marshalNTorrent2ᚕᚖgitᚗkmsignᚗruᚋroyalcatᚋtstorᚋsrcᚋdeliveryᚋgraphqlᚋmodelᚐTorrentᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.Torrent) graphql.Marshaler {
ret := make(graphql.Array, len(v))
var wg sync.WaitGroup

View file

@ -32,6 +32,11 @@ type BooleanFilter struct {
Eq *bool `json:"eq,omitempty"`
}
type CleanupResponse struct {
Count int64 `json:"count"`
List []string `json:"list"`
}
type DateTimeFilter struct {
Eq *time.Time `json:"eq,omitempty"`
Gt *time.Time `json:"gt,omitempty"`
@ -68,6 +73,11 @@ type IntFilter struct {
In []int64 `json:"in,omitempty"`
}
type ListDirResponse struct {
Root DirEntry `json:"root"`
Entries []DirEntry `json:"entries"`
}
type Mutation struct {
}

View file

@ -46,18 +46,24 @@ func (r *mutationResolver) ValidateTorrents(ctx context.Context, filter model.To
}
// CleanupTorrents is the resolver for the cleanupTorrents field.
func (r *mutationResolver) CleanupTorrents(ctx context.Context, files *bool, dryRun bool) (int64, error) {
func (r *mutationResolver) CleanupTorrents(ctx context.Context, files *bool, dryRun bool) (*model.CleanupResponse, error) {
torrents, err := r.Service.ListTorrents(ctx)
if err != nil {
return 0, err
return nil, err
}
if files != nil && *files {
r, err := r.Service.Storage.CleanupFiles(ctx, torrents, dryRun)
return int64(r), err
return &model.CleanupResponse{
Count: int64(len(r)),
List: r,
}, err
} else {
r, err := r.Service.Storage.CleanupDirs(ctx, torrents, dryRun)
return int64(r), err
return &model.CleanupResponse{
Count: int64(len(r)),
List: r,
}, err
}
}
@ -80,6 +86,15 @@ func (r *mutationResolver) DownloadTorrent(ctx context.Context, infohash string,
return &model.DownloadTorrentResponse{}, nil
}
// DedupeStorage is the resolver for the dedupeStorage field.
func (r *mutationResolver) DedupeStorage(ctx context.Context) (int64, error) {
deduped, err := r.Service.Storage.Dedupe(ctx)
if err != nil {
return 0, err
}
return int64(deduped), nil
}
// Mutation returns graph.MutationResolver implementation.
func (r *Resolver) Mutation() graph.MutationResolver { return &mutationResolver{r} }

View file

@ -6,6 +6,7 @@ package resolver
import (
"context"
"io/fs"
graph "git.kmsign.ru/royalcat/tstor/src/delivery/graphql"
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/model"
@ -64,51 +65,75 @@ func (r *queryResolver) Torrents(ctx context.Context, filter *model.TorrentsFilt
return tr, nil
}
type dirEntry interface {
Name() string
IsDir() bool
}
func fillDirEntry(e dirEntry) model.DirEntry {
switch e.(type) {
case *vfs.ArchiveFS:
e := e.(*vfs.ArchiveFS)
return model.ArchiveFs{
Name: e.Name(),
Size: e.Size(),
}
case *vfs.ResolverFS:
e := e.(*vfs.ResolverFS)
return model.ResolverFs{
Name: e.Name(),
}
case *vfs.TorrentFs:
e := e.(*vfs.TorrentFs)
return model.TorrentFs{
Name: e.Name(),
Torrent: model.MapTorrent(e.Torrent),
}
default:
if e.IsDir() {
return model.Dir{
Name: e.Name(),
}
}
if de, ok := e.(fs.DirEntry); ok {
info, _ := de.Info()
return model.File{
Name: e.Name(),
Size: info.Size(),
}
}
if fe, ok := e.(fs.FileInfo); ok {
return model.File{
Name: fe.Name(),
Size: fe.Size(),
}
}
}
panic("this dir entry is strange af")
}
// FsListDir is the resolver for the fsListDir field.
func (r *queryResolver) FsListDir(ctx context.Context, path string) ([]model.DirEntry, error) {
func (r *queryResolver) FsListDir(ctx context.Context, path string) (*model.ListDirResponse, error) {
root, err := r.VFS.Stat(ctx, path)
if err != nil {
return nil, err
}
entries, err := r.VFS.ReadDir(ctx, path)
if err != nil {
return nil, err
}
out := []model.DirEntry{}
for _, e := range entries {
switch e.(type) {
case *vfs.ArchiveFS:
e := e.(*vfs.ArchiveFS)
out = append(out, model.ArchiveFs{
Name: e.Name(),
Size: e.Size,
})
case *vfs.ResolverFS:
e := e.(*vfs.ResolverFS)
out = append(out, model.ResolverFs{
Name: e.Name(),
})
case *vfs.TorrentFs:
e := e.(*vfs.TorrentFs)
out = append(out, model.TorrentFs{
Name: e.Name(),
Torrent: model.MapTorrent(e.Torrent),
})
default:
if e.IsDir() {
out = append(out, model.Dir{Name: e.Name()})
} else {
info, err := e.Info()
if err != nil {
return nil, err
}
out = append(out, model.File{
Name: e.Name(),
Size: info.Size(),
})
}
}
out = append(out, fillDirEntry(e))
}
return out, nil
return &model.ListDirResponse{
Root: fillDirEntry(root),
Entries: out,
}, nil
}
// Query returns graph.QueryResolver implementation.

View file

@ -15,7 +15,7 @@ import (
"github.com/shurcooL/httpfs/html/vfstemplate"
)
func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, vfs vfs.Filesystem, logPath string, cfg *config.Config) error {
func New(fc *filecache.Cache, ss *service.Stats, s *service.Service, vfs vfs.Filesystem, logPath string, cfg *config.Settings) error {
log := slog.With()
gin.SetMode(gin.ReleaseMode)

View file

@ -7,6 +7,7 @@ import (
"git.kmsign.ru/royalcat/tstor/src/delivery/graphql/resolver"
"git.kmsign.ru/royalcat/tstor/src/host/service"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/99designs/gqlgen/graphql"
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/handler/extension"
"github.com/99designs/gqlgen/graphql/handler/lru"
@ -33,7 +34,11 @@ func GraphQLHandler(service *service.Service, vfs vfs.Filesystem) http.Handler {
graphqlHandler.SetQueryCache(lru.New(1000))
graphqlHandler.Use(extension.Introspection{})
graphqlHandler.Use(extension.AutomaticPersistedQuery{Cache: lru.New(100)})
graphqlHandler.Use(otelgqlgen.Middleware())
graphqlHandler.Use(otelgqlgen.Middleware(
otelgqlgen.WithCreateSpanFromFields(func(ctx *graphql.FieldContext) bool {
return ctx.Field.Directives.ForName("link") != nil
}),
))
return graphqlHandler
}

View file

@ -76,7 +76,7 @@ func (hfs *HTTPFS) filesToFileInfo(name string) ([]fs.FileInfo, error) {
return out, nil
}
var _ http.File = &httpFile{}
var _ http.File = (*httpFile)(nil)
type httpFile struct {
f vfs.File
@ -128,5 +128,5 @@ func (f *httpFile) Readdir(count int) ([]fs.FileInfo, error) {
}
func (f *httpFile) Stat() (fs.FileInfo, error) {
return f.f.Stat()
return f.f.Info()
}

View file

@ -1,197 +0,0 @@
package nfs
import (
"crypto/sha256"
"encoding/binary"
"io/fs"
"reflect"
"slices"
"github.com/willscott/go-nfs"
"github.com/go-git/go-billy/v5"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
)
// NewCachingHandler wraps a handler to provide a basic to/from-file handle cache.
func NewCachingHandler(h nfs.Handler, limit int) nfs.Handler {
return NewCachingHandlerWithVerifierLimit(h, limit, limit)
}
// NewCachingHandlerWithVerifierLimit provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
func NewCachingHandlerWithVerifierLimit(h nfs.Handler, limit int, verifierLimit int) nfs.Handler {
if limit < 2 || verifierLimit < 2 {
nfs.Log.Warnf("Caching handler created with insufficient cache to support directory listing", "size", limit, "verifiers", verifierLimit)
}
cache, _ := lru.New[uuid.UUID, entry](limit)
reverseCache := make(map[string][]uuid.UUID)
verifiers, _ := lru.New[uint64, verifier](verifierLimit)
return &CachingHandler{
Handler: h,
activeHandles: cache,
reverseHandles: reverseCache,
activeVerifiers: verifiers,
cacheLimit: limit,
}
}
// CachingHandler implements to/from handle via an LRU cache.
type CachingHandler struct {
nfs.Handler
activeHandles *lru.Cache[uuid.UUID, entry]
reverseHandles map[string][]uuid.UUID
activeVerifiers *lru.Cache[uint64, verifier]
cacheLimit int
}
type entry struct {
f billy.Filesystem
p []string
}
// ToHandle takes a file and represents it with an opaque handle to reference it.
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
// but we can generalize with a stateful local cache of handed out IDs.
func (c *CachingHandler) ToHandle(f billy.Filesystem, path []string) []byte {
joinedPath := f.Join(path...)
if handle := c.searchReverseCache(f, joinedPath); handle != nil {
return handle
}
id := uuid.New()
newPath := make([]string, len(path))
copy(newPath, path)
evictedKey, evictedPath, ok := c.activeHandles.GetOldest()
if evicted := c.activeHandles.Add(id, entry{f, newPath}); evicted && ok {
rk := evictedPath.f.Join(evictedPath.p...)
c.evictReverseCache(rk, evictedKey)
}
if _, ok := c.reverseHandles[joinedPath]; !ok {
c.reverseHandles[joinedPath] = []uuid.UUID{}
}
c.reverseHandles[joinedPath] = append(c.reverseHandles[joinedPath], id)
b, _ := id.MarshalBinary()
return b
}
// FromHandle converts from an opaque handle to the file it represents
func (c *CachingHandler) FromHandle(fh []byte) (billy.Filesystem, []string, error) {
id, err := uuid.FromBytes(fh)
if err != nil {
return nil, []string{}, err
}
if f, ok := c.activeHandles.Get(id); ok {
for _, k := range c.activeHandles.Keys() {
candidate, _ := c.activeHandles.Peek(k)
if hasPrefix(f.p, candidate.p) {
_, _ = c.activeHandles.Get(k)
}
}
return f.f, slices.Clone(f.p), nil
}
return nil, []string{}, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
func (c *CachingHandler) searchReverseCache(f billy.Filesystem, path string) []byte {
uuids, exists := c.reverseHandles[path]
if !exists {
return nil
}
for _, id := range uuids {
if candidate, ok := c.activeHandles.Get(id); ok {
if reflect.DeepEqual(candidate.f, f) {
return id[:]
}
}
}
return nil
}
func (c *CachingHandler) evictReverseCache(path string, handle uuid.UUID) {
uuids, exists := c.reverseHandles[path]
if !exists {
return
}
for i, u := range uuids {
if u == handle {
uuids = append(uuids[:i], uuids[i+1:]...)
c.reverseHandles[path] = uuids
return
}
}
}
func (c *CachingHandler) InvalidateHandle(fs billy.Filesystem, handle []byte) error {
//Remove from cache
id, _ := uuid.FromBytes(handle)
entry, ok := c.activeHandles.Get(id)
if ok {
rk := entry.f.Join(entry.p...)
c.evictReverseCache(rk, id)
}
c.activeHandles.Remove(id)
return nil
}
// HandleLimit exports how many file handles can be safely stored by this cache.
func (c *CachingHandler) HandleLimit() int {
return c.cacheLimit
}
func hasPrefix(path, prefix []string) bool {
if len(prefix) > len(path) {
return false
}
for i, e := range prefix {
if path[i] != e {
return false
}
}
return true
}
type verifier struct {
path string
contents []fs.FileInfo
}
func hashPathAndContents(path string, contents []fs.FileInfo) uint64 {
//calculate a cookie-verifier.
vHash := sha256.New()
// Add the path to avoid collisions of directories with the same content
vHash.Write(binary.BigEndian.AppendUint64([]byte{}, uint64(len(path))))
vHash.Write([]byte(path))
for _, c := range contents {
vHash.Write([]byte(c.Name())) // Never fails according to the docs
}
verify := vHash.Sum(nil)[0:8]
return binary.BigEndian.Uint64(verify)
}
func (c *CachingHandler) VerifierFor(path string, contents []fs.FileInfo) uint64 {
id := hashPathAndContents(path, contents)
c.activeVerifiers.Add(id, verifier{path, contents})
return id
}
func (c *CachingHandler) DataForVerifier(path string, id uint64) []fs.FileInfo {
if cache, ok := c.activeVerifiers.Get(id); ok {
return cache.contents
}
return nil
}

View file

@ -2,11 +2,12 @@ package nfs
import (
"log/slog"
"time"
nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
nfshelper "git.kmsign.ru/royalcat/tstor/pkg/go-nfs/helpers"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"git.kmsign.ru/royalcat/tstor/src/log"
nfs "github.com/willscott/go-nfs"
nfshelper "github.com/willscott/go-nfs/helpers"
)
func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
@ -14,10 +15,13 @@ func NewNFSv3Handler(fs vfs.Filesystem) (nfs.Handler, error) {
nfs.SetLogger(log.NewNFSLog(nfslog))
nfs.Log.SetLevel(nfs.InfoLevel)
bfs := &billyFsWrapper{fs: fs, log: nfslog}
bfs := &fsWrapper{fs: fs, log: nfslog, timeout: time.Minute}
handler := nfshelper.NewNullAuthHandler(bfs)
cacheHelper := nfshelper.NewCachingHandler(handler, 1024)
cacheHelper, err := NewKvHandler(handler, bfs)
if err != nil {
return nil, err
}
// cacheHelper := NewCachingHandler(handler)

127
src/export/nfs/kvhandler.go Normal file
View file

@ -0,0 +1,127 @@
package nfs
import (
"context"
"fmt"
"path"
"slices"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/config"
"github.com/google/uuid"
"github.com/royalcat/kv"
)
const lifetime = time.Hour * 24
// NewKvHandler provides a basic to/from-file handle cache that can be tuned with a smaller cache of active directory listings.
func NewKvHandler(h nfs.Handler, fs nfs.Filesystem) (nfs.Handler, error) {
activeHandles, err := kv.NewBadgerKVMarhsler[uuid.UUID, []string](path.Join(config.Config.Mounts.NFS.CachePath, "handlers"))
if err != nil {
return nil, err
}
// if s, ok := activeHandles.(kv.BadgerStore); ok {
// db := s.BadgerDB()
// enable with managed database
// go func() {
// for n := range time.NewTimer(lifetime / 2).C {
// db.SetDiscardTs(uint64(n.Add(-lifetime).Unix()))
// }
// }()
// }
return &CachingHandler{
Handler: h,
fs: fs,
activeHandles: activeHandles,
}, nil
}
// CachingHandler implements to/from handle via an LRU cache.
type CachingHandler struct {
nfs.Handler
fs nfs.Filesystem
activeHandles kv.Store[uuid.UUID, []string]
}
// ToHandle takes a file and represents it with an opaque handle to reference it.
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
// but we can generalize with a stateful local cache of handed out IDs.
func (c *CachingHandler) ToHandle(_ nfs.Filesystem, path []string) []byte {
ctx := context.Background()
var id uuid.UUID
c.activeHandles.Range(ctx, func(k uuid.UUID, v []string) bool {
if slices.Equal(path, v) {
id = k
return false
}
return true
})
if id != uuid.Nil {
return id[:]
}
id = uuid.New()
c.activeHandles.Set(ctx, id, path)
return id[:]
}
// FromHandle converts from an opaque handle to the file it represents
func (c *CachingHandler) FromHandle(fh []byte) (nfs.Filesystem, []string, error) {
ctx := context.Background()
id, err := uuid.FromBytes(fh)
if err != nil {
return nil, []string{}, err
}
paths, found, err := c.activeHandles.Get(ctx, id)
if err != nil {
return nil, nil, fmt.Errorf("kv error: %w", err)
}
if found {
return c.fs, paths, nil
}
return nil, []string{}, &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
}
func (c *CachingHandler) InvalidateHandle(fs nfs.Filesystem, handle []byte) error {
ctx := context.Background()
//Remove from cache
id, err := uuid.FromBytes(handle)
if err != nil {
return err
}
c.activeHandles.Delete(ctx, id)
return nil
}
const maxInt = int(^uint(0) >> 1)
// HandleLimit exports how many file handles can be safely stored by this cache.
func (c *CachingHandler) HandleLimit() int {
return maxInt
}
func hasPrefix(path, prefix []string) bool {
if len(prefix) > len(path) {
return false
}
for i, e := range prefix {
if path[i] != e {
return false
}
}
return true
}

View file

@ -6,47 +6,45 @@ import (
"io/fs"
"log/slog"
"path/filepath"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxbilly"
nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/go-git/go-billy/v5"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var billyFsTracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/export/nfs.billyFsWrapper")
type billyFsWrapper struct {
type fsWrapper struct {
fs vfs.Filesystem
log *slog.Logger
timeout time.Duration
}
var _ billy.Filesystem = (*billyFsWrapper)(nil)
var _ billy.Dir = (*billyFsWrapper)(nil)
var _ nfs.Filesystem = (*fsWrapper)(nil)
func (*billyFsWrapper) ctx() context.Context {
return context.Background()
}
// var _ ctxbilly.Dir = (*billyFsWrapper)(nil)
// Chroot implements billy.Filesystem.
func (*billyFsWrapper) Chroot(path string) (billy.Filesystem, error) {
func (*fsWrapper) Chroot(path string) (nfs.Filesystem, error) {
return nil, billy.ErrNotSupported
}
// Create implements billy.Filesystem.
func (*billyFsWrapper) Create(filename string) (billy.File, error) {
func (*fsWrapper) Create(ctx context.Context, filename string) (nfs.File, error) {
return nil, billy.ErrNotSupported
}
// Join implements billy.Filesystem.
func (*billyFsWrapper) Join(elem ...string) string {
func (*fsWrapper) Join(elem ...string) string {
return filepath.Join(elem...)
}
// Lstat implements billy.Filesystem.
func (fs *billyFsWrapper) Lstat(filename string) (fs.FileInfo, error) {
ctx, span := billyFsTracer.Start(fs.ctx(), "Lstat", trace.WithAttributes(attribute.String("filename", filename)))
defer span.End()
func (fs *fsWrapper) Lstat(ctx context.Context, filename string) (fs.FileInfo, error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
info, err := fs.fs.Stat(ctx, filename)
if err != nil {
@ -56,16 +54,14 @@ func (fs *billyFsWrapper) Lstat(filename string) (fs.FileInfo, error) {
}
// MkdirAll implements billy.Filesystem.
func (*billyFsWrapper) MkdirAll(filename string, perm fs.FileMode) error {
func (*fsWrapper) MkdirAll(ctx context.Context, filename string, perm fs.FileMode) error {
return billy.ErrNotSupported
}
// Open implements billy.Filesystem.
func (fs *billyFsWrapper) Open(filename string) (billy.File, error) {
ctx, span := billyFsTracer.Start(fs.ctx(), "Open",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
func (fs *fsWrapper) Open(ctx context.Context, filename string) (nfs.File, error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
file, err := fs.fs.Open(ctx, filename)
if err != nil {
@ -79,11 +75,9 @@ func (fs *billyFsWrapper) Open(filename string) (billy.File, error) {
}
// OpenFile implements billy.Filesystem.
func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode) (billy.File, error) {
ctx, span := billyFsTracer.Start(fs.ctx(), "OpenFile",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
func (fs *fsWrapper) OpenFile(ctx context.Context, filename string, flag int, perm fs.FileMode) (nfs.File, error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
file, err := fs.fs.Open(ctx, filename)
if err != nil {
@ -97,11 +91,9 @@ func (fs *billyFsWrapper) OpenFile(filename string, flag int, perm fs.FileMode)
}
// ReadDir implements billy.Filesystem.
func (bfs *billyFsWrapper) ReadDir(path string) ([]fs.FileInfo, error) {
ctx, span := billyFsTracer.Start(bfs.ctx(), "OpenFile",
trace.WithAttributes(attribute.String("path", path)),
)
defer span.End()
func (bfs *fsWrapper) ReadDir(ctx context.Context, path string) ([]fs.FileInfo, error) {
ctx, cancel := context.WithTimeout(ctx, bfs.timeout)
defer cancel()
ffs, err := bfs.fs.ReadDir(ctx, path)
if err != nil {
@ -125,36 +117,32 @@ func (bfs *billyFsWrapper) ReadDir(path string) ([]fs.FileInfo, error) {
}
// Readlink implements billy.Filesystem.
func (*billyFsWrapper) Readlink(link string) (string, error) {
func (*fsWrapper) Readlink(ctx context.Context, link string) (string, error) {
return "", billy.ErrNotSupported
}
// Remove implements billy.Filesystem.
func (bfs *billyFsWrapper) Remove(filename string) error {
ctx, span := billyFsTracer.Start(bfs.ctx(), "Remove",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
func (bfs *fsWrapper) Remove(ctx context.Context, filename string) error {
ctx, cancel := context.WithTimeout(ctx, bfs.timeout)
defer cancel()
return bfs.fs.Unlink(ctx, filename)
}
// Rename implements billy.Filesystem.
func (*billyFsWrapper) Rename(oldpath string, newpath string) error {
func (*fsWrapper) Rename(ctx context.Context, oldpath string, newpath string) error {
return billy.ErrNotSupported
}
// Root implements billy.Filesystem.
func (*billyFsWrapper) Root() string {
func (*fsWrapper) Root() string {
return "/"
}
// Stat implements billy.Filesystem.
func (bfs *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) {
ctx, span := billyFsTracer.Start(bfs.ctx(), "Remove",
trace.WithAttributes(attribute.String("filename", filename)),
)
defer span.End()
func (bfs *fsWrapper) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
ctx, cancel := context.WithTimeout(ctx, bfs.timeout)
defer cancel()
info, err := bfs.fs.Stat(ctx, filename)
if err != nil {
@ -164,28 +152,21 @@ func (bfs *billyFsWrapper) Stat(filename string) (fs.FileInfo, error) {
}
// Symlink implements billy.Filesystem.
func (fs *billyFsWrapper) Symlink(target string, link string) error {
func (fs *fsWrapper) Symlink(ctx context.Context, target string, link string) error {
return billyErr(nil, vfs.ErrNotImplemented, fs.log)
}
// TempFile implements billy.Filesystem.
func (fs *billyFsWrapper) TempFile(dir string, prefix string) (billy.File, error) {
return nil, billyErr(nil, vfs.ErrNotImplemented, fs.log)
}
type billyFile struct {
ctx context.Context
name string
file vfs.File
log *slog.Logger
}
var _ billy.File = (*billyFile)(nil)
var _ ctxbilly.File = (*billyFile)(nil)
// Close implements billy.File.
func (f *billyFile) Close() error {
return f.Close()
func (f *billyFile) Close(ctx context.Context) error {
return f.file.Close(ctx)
}
// Name implements billy.File.
@ -194,31 +175,12 @@ func (f *billyFile) Name() string {
}
// Read implements billy.File.
func (bf *billyFile) Read(p []byte) (n int, err error) {
ctx, span := billyFsTracer.Start(bf.ctx, "Read",
trace.WithAttributes(attribute.Int("length", len(p))),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
func (bf *billyFile) Read(ctx context.Context, p []byte) (n int, err error) {
return bf.file.Read(ctx, p)
}
// ReadAt implements billy.File.
func (bf *billyFile) ReadAt(p []byte, off int64) (n int, err error) {
ctx, span := billyFsTracer.Start(bf.ctx, "Read",
trace.WithAttributes(
attribute.Int("length", len(p)),
attribute.Int64("offset", off),
),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
func (bf *billyFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return bf.file.ReadAt(ctx, p, off)
}
@ -228,12 +190,12 @@ func (f *billyFile) Seek(offset int64, whence int) (int64, error) {
}
// Truncate implements billy.File.
func (f *billyFile) Truncate(size int64) error {
func (f *billyFile) Truncate(ctx context.Context, size int64) error {
return billyErr(nil, vfs.ErrNotImplemented, f.log)
}
// Write implements billy.File.
func (f *billyFile) Write(p []byte) (n int, err error) {
func (f *billyFile) Write(ctx context.Context, p []byte) (n int, err error) {
return 0, billyErr(nil, vfs.ErrNotImplemented, f.log)
}

View file

@ -96,6 +96,7 @@ type webDAVFile struct {
func newFile(ctx context.Context, name string, f vfs.File, df func() ([]os.FileInfo, error)) *webDAVFile {
return &webDAVFile{
ctx: ctx,
f: f,
fi: newFileInfo(name, f.Size(), f.IsDir()),
dirFunc: df,
}

View file

@ -13,13 +13,12 @@ import (
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/hashicorp/go-multierror"
"github.com/royalcat/kv"
)
// NOT USED
type PieceStorage struct {
basePath string
completion storage.PieceCompletion
dirInfohash kv.Store[string, infohash.T]
basePath string
completion storage.PieceCompletion
}
func NewPieceStorage(path string, completion storage.PieceCompletion) *PieceStorage {
@ -29,8 +28,6 @@ func NewPieceStorage(path string, completion storage.PieceCompletion) *PieceStor
}
}
var _ DataStorage = (*PieceStorage)(nil)
// OpenTorrent implements FileStorageDeleter.
func (p *PieceStorage) OpenTorrent(info *metainfo.Info, infoHash infohash.T) (storage.TorrentImpl, error) {
torrentPath := path.Join(p.basePath, infoHash.HexString())

View file

@ -10,7 +10,7 @@ import (
"github.com/anacrolix/torrent/storage"
)
func Setup(cfg config.TorrentClient) (DataStorage, storage.PieceCompletion, error) {
func Setup(cfg config.TorrentClient) (*DataStorage, storage.PieceCompletion, error) {
pcp := filepath.Join(cfg.MetadataFolder, "piece-completion")
if err := os.MkdirAll(pcp, 0744); err != nil {
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)

View file

@ -2,49 +2,62 @@ package datastorage
import (
"context"
"crypto/sha1"
"fmt"
"io"
"io/fs"
"log/slog"
"os"
"path"
"path/filepath"
"slices"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/dustin/go-humanize"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/sys/unix"
)
type DataStorage interface {
storage.ClientImplCloser
DeleteFile(file *torrent.File) error
CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
}
// type DataStorage interface {
// storage.ClientImplCloser
// DeleteFile(file *torrent.File) error
// CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
// CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error)
// }
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/host/datastorage")
// NewFileStorage creates a new ClientImplCloser that stores files using the OS native filesystem.
func NewFileStorage(baseDir string, pc storage.PieceCompletion) *FileStorage {
return &FileStorage{
baseDir: baseDir,
func NewFileStorage(baseDir string, pc storage.PieceCompletion) *DataStorage {
return &DataStorage{
ClientImplCloser: storage.NewFileOpts(storage.NewFileClientOpts{
ClientBaseDir: baseDir,
PieceCompletion: pc,
TorrentDirMaker: torrentDir,
FilePathMaker: filePath,
}),
baseDir: baseDir,
pieceCompletion: pc,
log: slog.With("component", "torrent-client"),
}
}
// File-based storage for torrents, that isn't yet bound to a particular torrent.
type FileStorage struct {
type DataStorage struct {
baseDir string
storage.ClientImplCloser
pieceCompletion storage.PieceCompletion
log *slog.Logger
}
func (me *FileStorage) Close() error {
func (me *DataStorage) Close() error {
return me.pieceCompletion.Close()
}
@ -61,14 +74,14 @@ func filePath(opts storage.FilePathMakerOpts) string {
return filepath.Join(opts.File.Path...)
}
func (fs *FileStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string {
func (fs *DataStorage) filePath(info *metainfo.Info, infoHash metainfo.Hash, fileInfo *metainfo.FileInfo) string {
return filepath.Join(torrentDir(fs.baseDir, info, infoHash), filePath(storage.FilePathMakerOpts{
Info: info,
File: fileInfo,
}))
}
func (fs *FileStorage) DeleteFile(file *torrent.File) error {
func (fs *DataStorage) DeleteFile(file *torrent.File) error {
info := file.Torrent().Info()
infoHash := file.Torrent().InfoHash()
torrentDir := torrentDir(fs.baseDir, info, infoHash)
@ -88,7 +101,7 @@ func (fs *FileStorage) DeleteFile(file *torrent.File) error {
return os.Remove(filePath)
}
func (fs *FileStorage) CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) {
func (fs *DataStorage) CleanupDirs(ctx context.Context, expected []*controller.Torrent, dryRun bool) ([]string, error) {
log := fs.log.With("function", "CleanupDirs", "expectedTorrents", len(expected), "dryRun", dryRun)
expectedEntries := []string{}
@ -98,7 +111,7 @@ func (fs *FileStorage) CleanupDirs(ctx context.Context, expected []*controller.T
entries, err := os.ReadDir(fs.baseDir)
if err != nil {
return 0, err
return nil, err
}
toDelete := []string{}
@ -109,7 +122,7 @@ func (fs *FileStorage) CleanupDirs(ctx context.Context, expected []*controller.T
}
if ctx.Err() != nil {
return 0, ctx.Err()
return nil, ctx.Err()
}
log.Info("deleting trash data", "dirsCount", len(toDelete))
@ -119,40 +132,34 @@ func (fs *FileStorage) CleanupDirs(ctx context.Context, expected []*controller.T
log.Warn("deleting trash data", "path", p)
err := os.RemoveAll(p)
if err != nil {
return i, err
return toDelete[:i], err
}
}
}
return len(toDelete), nil
return toDelete, nil
}
// func (fs *FileStorage) IsCompatable(ctx context.Context, addition *controller.Torrent, dryRun bool) (bool, error) {
// log := fs.log.With("function", "IsCompatable", "addition", addition.Name())
// ifp
// }
func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) (int, error) {
log := fs.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun)
func (s *DataStorage) CleanupFiles(ctx context.Context, expected []*controller.Torrent, dryRun bool) ([]string, error) {
log := s.log.With("function", "CleanupFiles", "expectedTorrents", len(expected), "dryRun", dryRun)
expectedEntries := []string{}
{
for _, e := range expected {
files, err := e.Files(ctx)
if err != nil {
return 0, err
return nil, err
}
for _, f := range files {
expectedEntries = append(expectedEntries, fs.filePath(e.Torrent().Info(), e.Torrent().InfoHash(), ptr(f.FileInfo())))
expectedEntries = append(expectedEntries, s.filePath(e.Torrent().Info(), e.Torrent().InfoHash(), ptr(f.FileInfo())))
}
}
}
entries := []string{}
err := filepath.Walk(fs.baseDir,
func(path string, info os.FileInfo, err error) error {
err := filepath.WalkDir(s.baseDir,
func(path string, info fs.DirEntry, err error) error {
if err != nil {
return err
}
@ -167,7 +174,7 @@ func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.
return nil
})
if err != nil {
return 0, err
return nil, err
}
toDelete := []string{}
@ -178,20 +185,243 @@ func (fs *FileStorage) CleanupFiles(ctx context.Context, expected []*controller.
}
if ctx.Err() != nil {
return len(toDelete), ctx.Err()
return toDelete, ctx.Err()
}
log.Info("deleting trash data", "filesCount", len(toDelete))
if !dryRun {
for i, p := range toDelete {
fs.log.Warn("deleting trash data", "path", p)
s.log.Warn("deleting trash data", "path", p)
err := os.Remove(p)
if err != nil {
return i, err
return toDelete[i:], err
}
}
}
return len(toDelete), nil
return toDelete, nil
}
func (s *DataStorage) iterFiles(ctx context.Context, iter func(ctx context.Context, path string, entry fs.FileInfo) error) error {
return filepath.Walk(s.baseDir,
func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
if info.IsDir() {
return nil
}
return iter(ctx, path, info)
})
}
func (s *DataStorage) Dedupe(ctx context.Context) (uint64, error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("Dedupe"))
defer span.End()
log := rlog.FunctionLog(s.log, "Dedupe")
sizeMap := map[int64][]string{}
err := s.iterFiles(ctx, func(ctx context.Context, path string, info fs.FileInfo) error {
size := info.Size()
sizeMap[size] = append(sizeMap[size], path)
return nil
})
if err != nil {
return 0, err
}
maps.DeleteFunc(sizeMap, func(k int64, v []string) bool {
return len(v) <= 1
})
span.AddEvent("collected files with same size", trace.WithAttributes(
attribute.Int("count", len(sizeMap)),
))
var deduped uint64 = 0
i := 0
for _, paths := range sizeMap {
if i%100 == 0 {
log.Info("deduping in progress", "current", i, "total", len(sizeMap))
}
i++
if ctx.Err() != nil {
return deduped, ctx.Err()
}
slices.Sort(paths)
paths = slices.Compact(paths)
if len(paths) <= 1 {
continue
}
paths, err = applyErr(paths, filepath.Abs)
if err != nil {
return deduped, err
}
dedupedGroup, err := s.dedupeFiles(ctx, paths)
if err != nil {
log.Error("Error applying dedupe", "files", paths, "error", err.Error())
continue
}
if dedupedGroup > 0 {
deduped += dedupedGroup
log.Info("deduped file group",
slog.String("files", fmt.Sprint(paths)),
slog.String("deduped", humanize.Bytes(dedupedGroup)),
slog.String("deduped_total", humanize.Bytes(deduped)),
)
}
}
return deduped, nil
}
func applyErr[E, O any](in []E, apply func(E) (O, error)) ([]O, error) {
out := make([]O, 0, len(in))
for _, p := range in {
o, err := apply(p)
if err != nil {
return out, err
}
out = append(out, o)
}
return out, nil
}
// const blockSize uint64 = 4096
func (s *DataStorage) dedupeFiles(ctx context.Context, paths []string) (deduped uint64, err error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("dedupeFiles"), trace.WithAttributes(
attribute.StringSlice("files", paths),
))
defer func() {
span.SetAttributes(attribute.Int64("deduped", int64(deduped)))
if err != nil {
span.RecordError(err)
}
span.End()
}()
log := rlog.FunctionLog(s.log, "dedupeFiles")
srcF, err := os.Open(paths[0])
if err != nil {
return deduped, err
}
defer srcF.Close()
srcStat, err := srcF.Stat()
if err != nil {
return deduped, err
}
srcFd := int(srcF.Fd())
srcSize := srcStat.Size()
fsStat := unix.Statfs_t{}
err = unix.Fstatfs(srcFd, &fsStat)
if err != nil {
span.RecordError(err)
return deduped, err
}
srcHash, err := filehash(srcF)
if err != nil {
return deduped, err
}
if fsStat.Bsize > srcSize { // for btrfs it means file in residing in not deduplicatable metadata
return deduped, nil
}
blockSize := uint64((srcSize % fsStat.Bsize) * fsStat.Bsize)
span.SetAttributes(attribute.Int64("blocksize", int64(blockSize)))
rng := unix.FileDedupeRange{
Src_offset: 0,
Src_length: blockSize,
Info: []unix.FileDedupeRangeInfo{},
}
for _, dst := range paths[1:] {
if ctx.Err() != nil {
return deduped, ctx.Err()
}
destF, err := os.OpenFile(dst, os.O_RDWR, os.ModePerm)
if err != nil {
return deduped, err
}
defer destF.Close()
dstHash, err := filehash(destF)
if err != nil {
return deduped, err
}
if srcHash != dstHash {
destF.Close()
continue
}
rng.Info = append(rng.Info, unix.FileDedupeRangeInfo{
Dest_fd: int64(destF.Fd()),
Dest_offset: 0,
})
}
if len(rng.Info) == 0 {
return deduped, nil
}
log.Info("found same files, deduping", "files", paths, "size", humanize.Bytes(uint64(srcStat.Size())))
if ctx.Err() != nil {
return deduped, ctx.Err()
}
rng.Src_offset = 0
for i := range rng.Info {
rng.Info[i].Dest_offset = 0
}
err = unix.IoctlFileDedupeRange(srcFd, &rng)
if err != nil {
return deduped, err
}
for i := range rng.Info {
deduped += rng.Info[i].Bytes_deduped
rng.Info[i].Status = 0
rng.Info[i].Bytes_deduped = 0
}
return deduped, nil
}
const compareBlockSize = 1024 * 128
func filehash(r io.Reader) ([20]byte, error) {
buf := make([]byte, compareBlockSize)
_, err := r.Read(buf)
if err != nil && err != io.EOF {
return [20]byte{}, err
}
return sha1.Sum(buf), nil
}
func ptr[D any](v D) *D {

View file

@ -1,21 +1,27 @@
package service
import (
"bufio"
"context"
"fmt"
"log/slog"
"os"
"path"
"path/filepath"
"slices"
"strings"
"sync"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"git.kmsign.ru/royalcat/tstor/src/host/datastorage"
"git.kmsign.ru/royalcat/tstor/src/host/store"
"git.kmsign.ru/royalcat/tstor/src/host/tkv"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"golang.org/x/exp/maps"
@ -27,6 +33,8 @@ import (
"github.com/royalcat/kv"
)
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/service")
type DirAquire struct {
Name string
Hashes []infohash.T
@ -39,9 +47,11 @@ type Service struct {
torrentLoaded chan struct{}
loadMutex sync.Mutex
// stats *Stats
DefaultPriority types.PiecePriority
Storage datastorage.DataStorage
Storage *datastorage.DataStorage
SourceDir string
dirsAquire kv.Store[string, DirAquire]
@ -50,9 +60,9 @@ type Service struct {
}
func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
storage datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes,
storage *datastorage.DataStorage, excludedFiles *store.FilesMappings, infoBytes *store.InfoBytes,
) (*Service, error) {
dirsAcquire, err := kv.NewBadgerKV[string, DirAquire](path.Join(cfg.MetadataFolder, "dir-acquire"))
dirsAcquire, err := tkv.New[string, DirAquire](cfg.MetadataFolder, "dir-acquire")
if err != nil {
return nil, err
}
@ -66,12 +76,15 @@ func NewService(sourceDir string, cfg config.TorrentClient, c *torrent.Client,
Storage: storage,
SourceDir: sourceDir,
torrentLoaded: make(chan struct{}),
loadMutex: sync.Mutex{},
dirsAquire: dirsAcquire,
// stats: newStats(), // TODO persistent
}
go func() {
err := s.loadTorrentFiles(context.Background())
ctx := context.Background()
err := s.loadTorrentFiles(ctx)
if err != nil {
s.log.Error("initial torrent load failed", "error", err)
}
@ -89,20 +102,32 @@ func (s *Service) Close() error {
return err
}
func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) {
func (s *Service) LoadTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent, error) {
ctx, span := tracer.Start(ctx, "LoadTorrent")
defer span.End()
log := rlog.FunctionLog(s.log, "LoadTorrent")
defer f.Close(ctx)
stat, err := f.Stat()
stat, err := f.Info()
if err != nil {
return nil, fmt.Errorf("call stat failed: %w", err)
}
mi, err := metainfo.Load(ctxio.IoReader(ctx, f))
span.SetAttributes(attribute.String("filename", stat.Name()))
mi, err := metainfo.Load(bufio.NewReader(ctxio.IoReader(ctx, f)))
if err != nil {
return nil, fmt.Errorf("loading torrent metadata from file %s, error: %w", stat.Name(), err)
}
t, ok := s.c.Torrent(mi.HashInfoBytes())
if !ok {
span.AddEvent("torrent not found, loading from file")
log.InfoContext(ctx, "torrent not found, loading from file")
spec, err := torrent.TorrentSpecFromMetaInfoErr(mi)
if err != nil {
return nil, fmt.Errorf("parse spec from metadata: %w", err)
@ -110,33 +135,18 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
infoBytes := spec.InfoBytes
if !isValidInfoHashBytes(infoBytes) {
log.WarnContext(ctx, "info loaded from spec not valid")
infoBytes = nil
}
if len(infoBytes) == 0 {
log.InfoContext(ctx, "no info loaded from file, try to load from cache")
infoBytes, err = s.infoBytes.GetBytes(spec.InfoHash)
if err != nil && err != store.ErrNotFound {
return nil, fmt.Errorf("get info bytes from database: %w", err)
}
}
var info metainfo.Info
err = bencode.Unmarshal(infoBytes, &info)
if err != nil {
infoBytes = nil
} else {
compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, info)
if err != nil {
return nil, err
}
if !compatable {
return nil, fmt.Errorf(
"torrent with name '%s' not compatable existing infohash: %s, new: %s",
t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
)
}
}
t, _ = s.c.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: spec.InfoHash,
Storage: s.Storage,
@ -146,18 +156,33 @@ func (s *Service) AddTorrent(ctx context.Context, f vfs.File) (*torrent.Torrent,
t.AllowDataDownload()
t.AllowDataUpload()
span.AddEvent("torrent added to client")
select {
case <-ctx.Done():
return nil, fmt.Errorf("creating torrent timed out")
return nil, ctx.Err()
case <-t.GotInfo():
err := s.infoBytes.Set(t.InfoHash(), t.Metainfo())
if err != nil {
s.log.Error("error setting info bytes for torrent %s: %s", t.Name(), err.Error())
}
for _, f := range t.Files() {
f.SetPriority(s.DefaultPriority)
}
}
span.AddEvent("got info")
info := t.Info()
if info == nil {
return nil, fmt.Errorf("info is nil")
}
compatable, _, err := s.checkTorrentCompatable(ctx, spec.InfoHash, *info)
if err != nil {
return nil, err
}
if !compatable {
return nil, fmt.Errorf(
"torrent with name '%s' not compatable existing infohash: %s, new: %s",
t.Name(), t.InfoHash().HexString(), spec.InfoHash.HexString(),
)
}
}
@ -271,15 +296,15 @@ func (s *Service) checkTorrentFilesCompatable(aq DirAquire, existingFiles, newFi
return true
}
func (s *Service) getTorrentsByName(name string) []*torrent.Torrent {
out := []*torrent.Torrent{}
for _, t := range s.c.Torrents() {
if t.Name() == name {
out = append(out, t)
}
}
return out
}
// func (s *Service) getTorrentsByName(name string) []*torrent.Torrent {
// out := []*torrent.Torrent{}
// for _, t := range s.c.Torrents() {
// if t.Name() == name {
// out = append(out, t)
// }
// }
// return out
// }
func isValidInfoHashBytes(d []byte) bool {
var info metainfo.Info
@ -290,12 +315,12 @@ func isValidInfoHashBytes(d []byte) bool {
func (s *Service) NewTorrentFs(ctx context.Context, f vfs.File) (vfs.Filesystem, error) {
defer f.Close(ctx)
info, err := f.Stat()
info, err := f.Info()
if err != nil {
return nil, err
}
t, err := s.AddTorrent(ctx, f)
t, err := s.LoadTorrent(ctx, f)
if err != nil {
return nil, err
}
@ -311,7 +336,46 @@ func (s *Service) GetStats() torrent.ConnStats {
return s.c.ConnStats()
}
const loadWorkers = 5
func (s *Service) loadTorrentFiles(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "loadTorrentFiles", trace.WithAttributes(
attribute.Int("workers", loadWorkers),
))
defer span.End()
log := rlog.FunctionLog(s.log, "loadTorrentFiles")
loaderPaths := make(chan string)
wg := sync.WaitGroup{}
defer func() {
close(loaderPaths)
wg.Wait()
}()
loaderWorker := func() {
wg.Add(1)
for path := range loaderPaths {
file, err := vfs.NewLazyOsFile(path)
if err != nil {
log.Error("error opening torrent file", "filename", path, rlog.Err(err))
continue
}
defer file.Close(ctx)
_, err = s.LoadTorrent(ctx, file)
if err != nil {
s.log.Error("failed adding torrent", "error", err)
}
}
wg.Done()
}
for range loadWorkers {
go loaderWorker()
}
return filepath.Walk(s.SourceDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("fs walk error: %w", err)
@ -326,13 +390,7 @@ func (s *Service) loadTorrentFiles(ctx context.Context) error {
}
if strings.HasSuffix(path, ".torrent") {
file := vfs.NewLazyOsFile(path)
defer file.Close(ctx)
_, err = s.AddTorrent(ctx, file)
if err != nil {
s.log.Error("failed adding torrent", "error", err)
}
loaderPaths <- path
}
return nil

21
src/host/tkv/new.go Normal file
View file

@ -0,0 +1,21 @@
package tkv
import (
"path"
"git.kmsign.ru/royalcat/tstor/pkg/kvtrace"
"github.com/royalcat/kv"
"go.opentelemetry.io/otel/attribute"
)
func New[K kv.Bytes, V any](dbdir, name string) (store kv.Store[K, V], err error) {
dir := path.Join(dbdir, name)
store, err = kv.NewBadgerKV[K, V](dir)
if err != nil {
return nil, err
}
store = kvtrace.WrapTracing(store, attribute.String("collection", name), attribute.String("database", "badger"))
return store, err
}

View file

@ -3,40 +3,40 @@ package vfs
import (
"archive/zip"
"context"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/iio"
"github.com/bodgit/sevenzip"
"github.com/nwaples/rardecode/v2"
)
var ArchiveFactories = map[string]FsFactory{
".zip": func(ctx context.Context, f File) (Filesystem, error) {
stat, err := f.Stat()
stat, err := f.Info()
if err != nil {
return nil, err
}
return NewArchive(ctx, stat.Name(), f, stat.Size(), ZipLoader), nil
return NewArchive(ctx, stat.Name(), f, stat.Size(), ZipLoader)
},
".rar": func(ctx context.Context, f File) (Filesystem, error) {
stat, err := f.Stat()
stat, err := f.Info()
if err != nil {
return nil, err
}
return NewArchive(ctx, stat.Name(), f, stat.Size(), RarLoader), nil
return NewArchive(ctx, stat.Name(), f, stat.Size(), RarLoader)
},
".7z": func(ctx context.Context, f File) (Filesystem, error) {
stat, err := f.Stat()
stat, err := f.Info()
if err != nil {
return nil, err
}
return NewArchive(ctx, stat.Name(), f, stat.Size(), SevenZipLoader), nil
return NewArchive(ctx, stat.Name(), f, stat.Size(), SevenZipLoader)
},
}
@ -47,52 +47,73 @@ var _ Filesystem = &ArchiveFS{}
type ArchiveFS struct {
name string
r ctxio.ReaderAt
size int64
Size int64
files func() (map[string]File, error)
files map[string]File
}
func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) *ArchiveFS {
return &ArchiveFS{
name: name,
r: r,
Size: size,
files: OnceValueWOErr(func() (map[string]File, error) {
zipFiles, err := loader(ctx, r, size)
if err != nil {
return nil, err
}
// TODO make optional
singleDir := true
for k := range zipFiles {
if !strings.HasPrefix(k, "/"+name+"/") {
singleDir = false
break
}
}
// ModTime implements Filesystem.
func (a *ArchiveFS) ModTime() time.Time {
return time.Time{}
}
files := make(map[string]File, len(zipFiles))
for k, v := range zipFiles {
// TODO make optional
if strings.Contains(k, "/__MACOSX/") {
continue
}
// Mode implements Filesystem.
func (a *ArchiveFS) Mode() fs.FileMode {
return fs.ModeDir
}
if singleDir {
k, _ = strings.CutPrefix(k, "/"+name)
}
// Size implements Filesystem.
func (a *ArchiveFS) Size() int64 {
return int64(a.size)
}
files[k] = v
}
// Sys implements Filesystem.
func (a *ArchiveFS) Sys() any {
return nil
}
// FIXME
files["/.forcegallery"] = NewMemoryFile(".forcegallery", []byte{})
// FsName implements Filesystem.
func (a *ArchiveFS) FsName() string {
return "archivefs"
}
return files, nil
}),
func NewArchive(ctx context.Context, name string, r ctxio.ReaderAt, size int64, loader archiveLoader) (*ArchiveFS, error) {
archiveFiles, err := loader(ctx, r, size)
if err != nil {
return nil, err
}
// TODO make optional
singleDir := true
for k := range archiveFiles {
if !strings.HasPrefix(k, "/"+name+"/") {
singleDir = false
break
}
}
files := make(map[string]File, len(archiveFiles))
for k, v := range archiveFiles {
// TODO make optional
if strings.Contains(k, "/__MACOSX/") {
continue
}
if singleDir {
k, _ = strings.CutPrefix(k, "/"+name)
}
files[k] = v
}
// FIXME
files["/.forcegallery"] = NewMemoryFile(".forcegallery", []byte{})
return &ArchiveFS{
name: name,
size: size,
files: files,
}, nil
}
// Unlink implements Filesystem.
@ -101,35 +122,21 @@ func (a *ArchiveFS) Unlink(ctx context.Context, filename string) error {
}
func (a *ArchiveFS) Open(ctx context.Context, filename string) (File, error) {
files, err := a.files()
if err != nil {
return nil, err
}
return getFile(files, filename)
return getFile(a.files, filename)
}
func (fs *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
files, err := fs.files()
if err != nil {
return nil, err
}
return listDirFromFiles(files, path)
func (a *ArchiveFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
return listDirFromFiles(a.files, path)
}
// Stat implements Filesystem.
func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
files, err := afs.files()
if err != nil {
return nil, err
if file, ok := afs.files[filename]; ok {
return file.Info()
}
if file, ok := files[filename]; ok {
return file.Stat()
}
for p, _ := range files {
for p, _ := range afs.files {
if strings.HasPrefix(p, filename) {
return newDirInfo(path.Base(filename)), nil
}
@ -140,11 +147,7 @@ func (afs *ArchiveFS) Stat(ctx context.Context, filename string) (fs.FileInfo, e
// Info implements Filesystem.
func (a *ArchiveFS) Info() (fs.FileInfo, error) {
return &fileInfo{
name: a.name,
size: a.Size,
isDir: true,
}, nil
return a, nil
}
// IsDir implements Filesystem.
@ -162,42 +165,46 @@ func (a *ArchiveFS) Type() fs.FileMode {
return fs.ModeDir
}
var _ File = &archiveFile{}
var _ File = (*archiveFile)(nil)
func NewArchiveFile(name string, readerFunc func() (iio.Reader, error), size int64) *archiveFile {
func NewArchiveFile(name string, size int64, af archiveFileReaderFactory) *archiveFile {
return &archiveFile{
name: name,
readerFunc: readerFunc,
size: size,
name: name,
size: size,
af: af,
buffer: ctxio.NewFileBuffer(nil),
}
}
const readahead = 1024 * 16
type archiveFile struct {
name string
size int64
af archiveFileReaderFactory
readerFunc func() (iio.Reader, error)
reader iio.Reader
size int64
m sync.Mutex
offset int64
readen int64
buffer *ctxio.FileBuffer
}
func (d *archiveFile) Stat() (fs.FileInfo, error) {
// Name implements File.
func (d *archiveFile) Name() string {
return d.name
}
// Type implements File.
func (d *archiveFile) Type() fs.FileMode {
return roMode
}
func (d *archiveFile) Info() (fs.FileInfo, error) {
return newFileInfo(d.name, d.size), nil
}
func (d *archiveFile) load() error {
if d.reader != nil {
return nil
}
r, err := d.readerFunc()
if err != nil {
return err
}
d.reader = r
return nil
}
func (d *archiveFile) Size() int64 {
return d.size
}
@ -206,31 +213,60 @@ func (d *archiveFile) IsDir() bool {
return false
}
func (d *archiveFile) Close(ctx context.Context) (err error) {
if d.reader != nil {
err = d.reader.Close()
d.reader = nil
func (d *archiveFile) Close(ctx context.Context) error {
return d.buffer.Close(ctx)
}
func (d *archiveFile) loadMore(ctx context.Context, to int64) error {
d.m.Lock()
defer d.m.Unlock()
if to < d.readen {
return nil
}
return
reader, err := d.af(ctx)
if err != nil {
return fmt.Errorf("failed to get file reader: %w", err)
}
_, err = d.buffer.Seek(0, io.SeekStart)
if err != nil {
return fmt.Errorf("failed to seek to start of the file: %w", err)
}
d.readen, err = ctxio.CopyN(ctx, d.buffer, ctxio.WrapIoReader(reader), to+readahead)
if err != nil && err != io.EOF {
return fmt.Errorf("error copying from archive file reader: %w", err)
}
return nil
}
func (d *archiveFile) Read(ctx context.Context, p []byte) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
err = d.loadMore(ctx, d.offset+int64(len(p)))
if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
}
return d.reader.Read(p)
n, err = d.buffer.Read(ctx, p)
if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err)
}
return n, nil
}
func (d *archiveFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
if err := d.load(); err != nil {
return 0, err
err = d.loadMore(ctx, off+int64(len(p)))
if err != nil {
return 0, fmt.Errorf("failed to load more from archive file: %w", err)
}
return d.reader.ReadAt(p, off)
n, err = d.buffer.ReadAt(ctx, p, off)
if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read from buffer: %w", err)
}
return n, nil
}
type archiveFileReaderFactory func(ctx context.Context) (io.ReadCloser, error)
var _ archiveLoader = ZipLoader
func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[string]*archiveFile, error) {
@ -248,16 +284,24 @@ func ZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
continue
}
rf := func() (iio.Reader, error) {
zr, err := zipFile.Open()
i := i
af := func(ctx context.Context) (io.ReadCloser, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := zip.NewReader(reader, size)
if err != nil {
return nil, err
}
return iio.NewDiskTeeReader(zr)
rc, err := zr.File[i].Open()
if err != nil {
return nil, err
}
return rc, nil
}
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, rf, zipFile.FileInfo().Size())
out[AbsPath(zipFile.Name)] = NewArchiveFile(zipFile.Name, zipFile.FileInfo().Size(), af)
}
return out, nil
@ -274,25 +318,29 @@ func SevenZipLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (
}
out := make(map[string]*archiveFile)
for _, f := range r.File {
for i, f := range r.File {
f := f
if f.FileInfo().IsDir() {
continue
}
rf := func() (iio.Reader, error) {
zr, err := f.Open()
i := i
af := func(ctx context.Context) (io.ReadCloser, error) {
reader := ctxio.IoReaderAt(ctx, ctxreader)
zr, err := sevenzip.NewReader(reader, size)
if err != nil {
return nil, err
}
return iio.NewDiskTeeReader(zr)
rc, err := zr.File[i].Open()
if err != nil {
return nil, err
}
return rc, nil
}
af := NewArchiveFile(f.Name, rf, f.FileInfo().Size())
n := filepath.Join(string(os.PathSeparator), f.Name)
out[n] = af
out[AbsPath(f.Name)] = NewArchiveFile(f.Name, f.FileInfo().Size(), af)
}
return out, nil
@ -318,15 +366,26 @@ func RarLoader(ctx context.Context, ctxreader ctxio.ReaderAt, size int64) (map[s
return nil, err
}
rf := func() (iio.Reader, error) {
return iio.NewDiskTeeReader(r)
name := header.Name
af := func(ctx context.Context) (io.ReadCloser, error) {
reader := ctxio.IoReadSeekerWrapper(ctx, ctxreader, size)
r, err := rardecode.NewReader(reader)
if err != nil {
return nil, err
}
for header, err := r.Next(); err != io.EOF; header, err = r.Next() {
if err != nil {
return nil, err
}
if header.Name == name {
return io.NopCloser(r), nil
}
}
return nil, fmt.Errorf("file with name '%s' not found", name)
}
n := filepath.Join(string(os.PathSeparator), header.Name)
af := NewArchiveFile(header.Name, rf, header.UnPackedSize)
out[n] = af
out[AbsPath(header.Name)] = NewArchiveFile(header.Name, header.UnPackedSize, af)
}
return out, nil

View file

@ -1,4 +1,4 @@
package vfs
package vfs_test
import (
"archive/zip"
@ -8,9 +8,35 @@ import (
"testing"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/stretchr/testify/require"
)
// TODO
// func TestArchiveFactories(t *testing.T) {
// t.Parallel()
// ctx := context.Background()
// require := require.New(t)
// require.Contains(vfs.ArchiveFactories, ".zip")
// require.Contains(vfs.ArchiveFactories, ".rar")
// require.Contains(vfs.ArchiveFactories, ".7z")
// fs, err := vfs.ArchiveFactories[".zip"](ctx, &vfs.DummyFile{})
// require.NoError(err)
// require.NotNil(fs)
// fs, err = vfs.ArchiveFactories[".rar"](ctx, &vfs.DummyFile{})
// require.NoError(err)
// require.NotNil(fs)
// fs, err = vfs.ArchiveFactories[".7z"](ctx, &vfs.DummyFile{})
// require.NoError(err)
// require.NotNil(fs)
// }
var fileContent []byte = []byte("Hello World")
func TestZipFilesystem(t *testing.T) {
@ -22,7 +48,8 @@ func TestZipFilesystem(t *testing.T) {
ctx := context.Background()
// TODO add single dir collapse test
zfs := NewArchive(ctx, "test", zReader, size, ZipLoader)
zfs, err := vfs.NewArchive(ctx, "test", zReader, size, vfs.ZipLoader)
require.NoError(err)
files, err := zfs.ReadDir(ctx, "/path/to/test/file")
require.NoError(err)

View file

@ -6,39 +6,54 @@ import (
"path"
)
var _ File = &dir{}
var _ File = &dirFile{}
func NewDir(name string) File {
return &dir{
func newDirFile(name string) File {
return &dirFile{
name: path.Base(name),
}
}
type dir struct {
type dirFile struct {
name string
}
// Info implements File.
func (d *dir) Stat() (fs.FileInfo, error) {
return newDirInfo(d.name), nil
}
func (d *dir) Size() int64 {
return 0
}
func (d *dir) IsDir() bool {
return true
}
func (d *dir) Close(ctx context.Context) error {
// Close implements File.
func (d *dirFile) Close(ctx context.Context) error {
return nil
}
func (d *dir) Read(ctx context.Context, p []byte) (n int, err error) {
return 0, nil
// Info implements File.
func (d *dirFile) Info() (fs.FileInfo, error) {
return newDirInfo(d.name), nil
}
func (d *dir) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return 0, nil
// IsDir implements File.
func (d *dirFile) IsDir() bool {
return true
}
// Name implements File.
func (d *dirFile) Name() string {
return d.name
}
// Read implements File.
func (d *dirFile) Read(ctx context.Context, p []byte) (n int, err error) {
return 0, fs.ErrInvalid
}
// ReadAt implements File.
func (d *dirFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return 0, fs.ErrInvalid
}
// Size implements File.
func (d *dirFile) Size() int64 {
return 0
}
// Type implements File.
func (d *dirFile) Type() fs.FileMode {
return roMode | fs.ModeDir
}

125
src/host/vfs/dummy.go Normal file
View file

@ -0,0 +1,125 @@
package vfs
import (
"context"
"io/fs"
"os"
"path"
"time"
)
var _ Filesystem = &DummyFs{}
type DummyFs struct {
name string
}
// ModTime implements Filesystem.
func (d *DummyFs) ModTime() time.Time {
return time.Time{}
}
// Mode implements Filesystem.
func (d *DummyFs) Mode() fs.FileMode {
return fs.ModeDir
}
// Size implements Filesystem.
func (d *DummyFs) Size() int64 {
panic("unimplemented")
}
// Sys implements Filesystem.
func (d *DummyFs) Sys() any {
panic("unimplemented")
}
// FsName implements Filesystem.
func (d *DummyFs) FsName() string {
return "dummyfs"
}
// Stat implements Filesystem.
func (*DummyFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
return newFileInfo(path.Base(filename), 0), nil // TODO
}
func (d *DummyFs) Open(ctx context.Context, filename string) (File, error) {
return &DummyFile{}, nil
}
func (d *DummyFs) Unlink(ctx context.Context, filename string) error {
return ErrNotImplemented
}
func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
if path == "/dir/here" {
return []fs.DirEntry{
newFileInfo("file1.txt", 0),
newFileInfo("file2.txt", 0),
}, nil
}
return nil, os.ErrNotExist
}
// Info implements Filesystem.
func (d *DummyFs) Info() (fs.FileInfo, error) {
return newDirInfo(d.name), nil
}
// IsDir implements Filesystem.
func (d *DummyFs) IsDir() bool {
return true
}
// Name implements Filesystem.
func (d *DummyFs) Name() string {
return d.name
}
// Type implements Filesystem.
func (d *DummyFs) Type() fs.FileMode {
return fs.ModeDir
}
var _ File = &DummyFile{}
type DummyFile struct {
name string
}
// Name implements File.
func (d *DummyFile) Name() string {
panic("unimplemented")
}
// Type implements File.
func (d *DummyFile) Type() fs.FileMode {
panic("unimplemented")
}
// Stat implements File.
func (d *DummyFile) Info() (fs.FileInfo, error) {
return newFileInfo(d.name, 0), nil
}
func (d *DummyFile) Size() int64 {
return 0
}
func (d *DummyFile) IsDir() bool {
return false
}
func (d *DummyFile) Close(ctx context.Context) error {
return nil
}
func (d *DummyFile) Read(ctx context.Context, p []byte) (n int, err error) {
return 0, nil
}
func (d *DummyFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return 0, nil
}

View file

@ -8,12 +8,14 @@ import (
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"go.opentelemetry.io/otel"
)
type File interface {
IsDir() bool
Size() int64
Stat() (fs.FileInfo, error)
fs.DirEntry
ctxio.Reader
ctxio.ReaderAt
@ -22,6 +24,8 @@ type File interface {
var ErrNotImplemented = errors.New("not implemented")
var tracer = otel.Tracer("git.kmsign.ru/royalcat/tstor/src/host/vfs")
type Filesystem interface {
// Open opens the named file for reading. If successful, methods on the
// returned file can be used for reading; the associated file descriptor has
@ -35,10 +39,12 @@ type Filesystem interface {
Stat(ctx context.Context, filename string) (fs.FileInfo, error)
Unlink(ctx context.Context, filename string) error
// As filesystem mounted to some path, make sense to have the filesystem implement DirEntry
fs.DirEntry
}
const defaultMode = fs.FileMode(0555)
// readonly
const roMode = fs.FileMode(0555)
type fileInfo struct {
name string
@ -87,10 +93,10 @@ func (fi *fileInfo) Size() int64 {
func (fi *fileInfo) Mode() fs.FileMode {
if fi.isDir {
return defaultMode | fs.ModeDir
return roMode | fs.ModeDir
}
return defaultMode
return roMode
}
func (fi *fileInfo) ModTime() time.Time {

View file

@ -37,7 +37,7 @@ func TestDirInfo(t *testing.T) {
require.NotNil(fi.ModTime())
require.NotZero(fi.Type() & fs.ModeDir)
require.NotZero(fi.Mode() & fs.ModeDir)
require.Equal(defaultMode|fs.ModeDir, fi.Mode())
require.Equal(roMode|fs.ModeDir, fi.Mode())
require.Nil(fi.Sys())
}

View file

@ -5,22 +5,62 @@ import (
"io/fs"
"log/slog"
"reflect"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type LogFS struct {
fs Filesystem
log *slog.Logger
timeout time.Duration
readTimeout time.Duration
}
var _ Filesystem = (*LogFS)(nil)
func WrapLogFS(fs Filesystem, log *slog.Logger) *LogFS {
func WrapLogFS(fs Filesystem) *LogFS {
return &LogFS{
fs: fs,
log: log.With("component", "fs"),
fs: fs,
log: rlog.ComponentLog("fs"),
timeout: time.Minute * 3,
readTimeout: time.Minute,
}
}
// ModTime implements Filesystem.
func (lfs *LogFS) ModTime() time.Time {
return lfs.ModTime()
}
// Mode implements Filesystem.
func (lfs *LogFS) Mode() fs.FileMode {
return lfs.Mode()
}
// Size implements Filesystem.
func (lfs *LogFS) Size() int64 {
return lfs.Size()
}
// Sys implements Filesystem.
func (lfs *LogFS) Sys() any {
return lfs.Sys()
}
func (fs *LogFS) FsName() string {
return "logfs"
}
func (fs *LogFS) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption {
return trace.WithAttributes(append([]attribute.KeyValue{
attribute.String("fs", fs.FsName()),
}, add...)...)
}
// Info implements Filesystem.
func (fs *LogFS) Info() (fs.FileInfo, error) {
return fs.fs.Info()
@ -42,36 +82,84 @@ func (fs *LogFS) Type() fs.FileMode {
}
// Open implements Filesystem.
func (fs *LogFS) Open(ctx context.Context, filename string) (File, error) {
file, err := fs.fs.Open(ctx, filename)
func (fs *LogFS) Open(ctx context.Context, filename string) (file File, err error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "Open",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
file, err = fs.fs.Open(ctx, filename)
if err != nil {
fs.log.With("filename", filename).Error("Failed to open file")
}
file = WrapLogFile(file, filename, fs.log)
file = WrapLogFile(file, filename, fs.log, fs.readTimeout)
return file, err
}
// ReadDir implements Filesystem.
func (fs *LogFS) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
file, err := fs.fs.ReadDir(ctx, path)
func (fs *LogFS) ReadDir(ctx context.Context, path string) (entries []fs.DirEntry, err error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "ReadDir",
fs.traceAttrs(attribute.String("path", path)),
)
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
entries, err = fs.fs.ReadDir(ctx, path)
if err != nil {
fs.log.ErrorContext(ctx, "Failed to read dir", "path", path, "error", err.Error(), "fs-type", reflect.TypeOf(fs.fs).Name())
}
return file, err
return entries, err
}
// Stat implements Filesystem.
func (fs *LogFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
file, err := fs.fs.Stat(ctx, filename)
func (fs *LogFS) Stat(ctx context.Context, filename string) (info fs.FileInfo, err error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "Stat",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
info, err = fs.fs.Stat(ctx, filename)
if err != nil {
fs.log.Error("Failed to stat", "filename", filename, "error", err)
}
return file, err
return info, err
}
// Unlink implements Filesystem.
func (fs *LogFS) Unlink(ctx context.Context, filename string) error {
err := fs.fs.Unlink(ctx, filename)
func (fs *LogFS) Unlink(ctx context.Context, filename string) (err error) {
ctx, cancel := context.WithTimeout(ctx, fs.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "Unlink",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
err = fs.fs.Unlink(ctx, filename)
if err != nil {
fs.log.Error("Failed to stat", "filename", filename, "error", err)
}
@ -79,24 +167,51 @@ func (fs *LogFS) Unlink(ctx context.Context, filename string) error {
}
type LogFile struct {
f File
log *slog.Logger
filename string
f File
log *slog.Logger
timeout time.Duration
}
// Name implements File.
func (f *LogFile) Name() string {
return f.f.Name()
}
// Type implements File.
func (f *LogFile) Type() fs.FileMode {
return f.f.Type()
}
var _ File = (*LogFile)(nil)
func WrapLogFile(f File, filename string, log *slog.Logger) *LogFile {
func WrapLogFile(f File, filename string, log *slog.Logger, timeout time.Duration) *LogFile {
return &LogFile{
f: f,
log: log.With("filename", filename),
filename: filename,
f: f,
log: log.With("filename", filename),
timeout: timeout,
}
}
// Close implements File.
func (f *LogFile) Close(ctx context.Context) error {
err := f.f.Close(ctx)
func (f *LogFile) Close(ctx context.Context) (err error) {
ctx, cancel := context.WithTimeout(ctx, f.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "Close",
trace.WithAttributes(attribute.String("filename", f.filename)),
)
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
err = f.f.Close(ctx)
if err != nil {
f.log.Error("Failed to close", "error", err)
f.log.ErrorContext(ctx, "Failed to close", "error", err)
}
return err
}
@ -108,6 +223,22 @@ func (f *LogFile) IsDir() bool {
// Read implements File.
func (f *LogFile) Read(ctx context.Context, p []byte) (n int, err error) {
ctx, cancel := context.WithTimeout(ctx, f.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "Read",
trace.WithAttributes(
attribute.String("filename", f.filename),
attribute.Int("length", len(p)),
),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
if err != nil {
span.RecordError(err)
}
span.End()
}()
n, err = f.f.Read(ctx, p)
if err != nil {
f.log.Error("Failed to read", "error", err)
@ -117,6 +248,22 @@ func (f *LogFile) Read(ctx context.Context, p []byte) (n int, err error) {
// ReadAt implements File.
func (f *LogFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
ctx, cancel := context.WithTimeout(ctx, f.timeout)
defer cancel()
ctx, span := tracer.Start(ctx, "ReadAt",
trace.WithAttributes(
attribute.String("filename", f.filename),
attribute.Int("length", len(p)),
),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
if err != nil {
span.RecordError(err)
}
span.End()
}()
n, err = f.f.ReadAt(ctx, p, off)
if err != nil {
f.log.Error("Failed to read", "offset", off, "error", err)
@ -130,8 +277,8 @@ func (f *LogFile) Size() int64 {
}
// Stat implements File.
func (f *LogFile) Stat() (fs.FileInfo, error) {
info, err := f.f.Stat()
func (f *LogFile) Info() (fs.FileInfo, error) {
info, err := f.f.Info()
if err != nil {
f.log.Error("Failed to read", "error", err)
}

View file

@ -5,15 +5,41 @@ import (
"context"
"io/fs"
"path"
"time"
)
var _ Filesystem = &MemoryFs{}
type MemoryFs struct {
name string
files map[string]*MemoryFile
}
var _ Filesystem = (*MemoryFs)(nil)
// ModTime implements Filesystem.
func (mfs *MemoryFs) ModTime() time.Time {
return time.Time{}
}
// Mode implements Filesystem.
func (mfs *MemoryFs) Mode() fs.FileMode {
return fs.ModeDir
}
// Size implements Filesystem.
func (fs *MemoryFs) Size() int64 {
return 0
}
// Sys implements Filesystem.
func (fs *MemoryFs) Sys() any {
return nil
}
// FsKind implements Filesystem.
func (fs *MemoryFs) FsName() string {
return "memoryfs"
}
// Info implements Filesystem.
func (fs *MemoryFs) Info() (fs.FileInfo, error) {
return newDirInfo(fs.name), nil
@ -77,7 +103,17 @@ func NewMemoryFile(name string, data []byte) *MemoryFile {
}
}
func (d *MemoryFile) Stat() (fs.FileInfo, error) {
// Name implements File.
func (d *MemoryFile) Name() string {
return d.name
}
// Type implements File.
func (d *MemoryFile) Type() fs.FileMode {
return roMode
}
func (d *MemoryFile) Info() (fs.FileInfo, error) {
return newFileInfo(d.name, int64(d.data.Len())), nil
}

View file

@ -12,13 +12,19 @@ type OsFS struct {
hostDir string
}
var _ Filesystem = (*OsFS)(nil)
// Stat implements Filesystem.
func (fs *OsFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
if path.Clean(filename) == Separator {
return newDirInfo(Separator), nil
}
return os.Stat(path.Join(fs.hostDir, filename))
info, err := os.Stat(path.Join(fs.hostDir, filename))
if err != nil {
return nil, err
}
return info, nil
}
// Unlink implements Filesystem.
@ -28,11 +34,11 @@ func (fs *OsFS) Unlink(ctx context.Context, filename string) error {
// Open implements Filesystem.
func (fs *OsFS) Open(ctx context.Context, filename string) (File, error) {
if path.Clean(filename) == Separator {
return NewDir(filename), nil
if isRoot(filename) {
return newDirFile(fs.Name()), nil
}
return NewLazyOsFile(path.Join(fs.hostDir, filename)), nil
return NewLazyOsFile(path.Join(fs.hostDir, filename))
}
// ReadDir implements Filesystem.
@ -42,7 +48,7 @@ func (o *OsFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) {
// Info implements Filesystem.
func (fs *OsFS) Info() (fs.FileInfo, error) {
return newDirInfo(path.Base(fs.hostDir)), nil
return newDirInfo(fs.Name()), nil
}
// IsDir implements Filesystem.
@ -68,56 +74,16 @@ func NewOsFs(osDir string) *OsFS {
var _ Filesystem = &OsFS{}
type OsFile struct {
f *os.File
}
func NewOsFile(f *os.File) *OsFile {
return &OsFile{f: f}
}
var _ File = &OsFile{}
// Info implements File.
func (f *OsFile) Info() (fs.FileInfo, error) {
return f.f.Stat()
}
// Close implements File.
func (f *OsFile) Close(ctx context.Context) error {
return f.f.Close()
}
// Read implements File.
func (f *OsFile) Read(ctx context.Context, p []byte) (n int, err error) {
return f.f.Read(p)
}
// ReadAt implements File.
func (f *OsFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return f.f.ReadAt(p, off)
}
func (f *OsFile) Stat() (fs.FileInfo, error) {
return f.f.Stat()
}
// Size implements File.
func (f *OsFile) Size() int64 {
stat, err := f.Stat()
func NewLazyOsFile(path string) (*LazyOsFile, error) {
info, err := os.Stat(path)
if err != nil {
return 0
return nil, err
}
return stat.Size()
}
// IsDir implements File.
func (f *OsFile) IsDir() bool {
stat, err := f.Stat()
if err != nil {
return false
}
return stat.IsDir()
return &LazyOsFile{
path: path,
info: info,
}, nil
}
type LazyOsFile struct {
@ -125,15 +91,10 @@ type LazyOsFile struct {
path string
file *os.File
// cached field
info fs.FileInfo
}
func NewLazyOsFile(path string) *LazyOsFile {
return &LazyOsFile{path: path}
}
var _ File = &OsFile{}
var _ File = (*LazyOsFile)(nil)
func (f *LazyOsFile) open() error {
f.m.Lock()
@ -151,6 +112,16 @@ func (f *LazyOsFile) open() error {
return nil
}
// Name implements File.
func (f *LazyOsFile) Name() string {
return path.Base(f.path)
}
// Type implements File.
func (f *LazyOsFile) Type() fs.FileMode {
return f.info.Mode()
}
// Close implements File.
func (f *LazyOsFile) Close(ctx context.Context) error {
if f.file == nil {
@ -177,41 +148,17 @@ func (f *LazyOsFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, er
return f.file.ReadAt(p, off)
}
func (f *LazyOsFile) Stat() (fs.FileInfo, error) {
f.m.Lock()
defer f.m.Unlock()
if f.info == nil {
if f.file == nil {
info, err := os.Stat(f.path)
if err != nil {
return nil, err
}
f.info = info
} else {
info, err := f.file.Stat()
if err != nil {
return nil, err
}
f.info = info
}
}
func (f *LazyOsFile) Info() (fs.FileInfo, error) {
return f.info, nil
}
// Size implements File.
func (f *LazyOsFile) Size() int64 {
stat, err := f.Stat()
if err != nil {
return 0
}
return stat.Size()
return f.info.Size()
}
// IsDir implements File.
func (f *LazyOsFile) IsDir() bool {
stat, err := f.Stat()
if err != nil {
return false
}
return stat.IsDir()
return f.info.IsDir()
}

75
src/host/vfs/os_test.go Normal file
View file

@ -0,0 +1,75 @@
package vfs_test
import (
"context"
"os"
"testing"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/stretchr/testify/require"
)
func TestOsFs(t *testing.T) {
const testDir = "/tmp/tstor-test"
require := require.New(t)
ctx := context.Background()
err := os.RemoveAll(testDir)
require.NotErrorIs(err, os.ErrNotExist)
defer func() {
err = os.RemoveAll(testDir)
require.NotErrorIs(err, os.ErrNotExist)
}()
err = os.MkdirAll(testDir, os.ModePerm)
require.NoError(err)
err = os.MkdirAll(testDir+"/dir1", os.ModePerm)
require.NoError(err)
err = os.MkdirAll(testDir+"/dir1/dir2", os.ModePerm)
require.NoError(err)
err = os.MkdirAll(testDir+"/dir1/dir3", os.ModePerm)
require.NoError(err)
osfile, err := os.Create(testDir + "/dir1/dir2/file")
require.NoError(err)
err = osfile.Close()
require.NoError(err)
fs := vfs.NewOsFs(testDir)
dirs := []string{"/", "/.", "/dir1", "/dir1/dir2"}
for _, dir := range dirs {
file, err := fs.Open(ctx, dir)
require.NoError(err)
require.True(file.IsDir())
stat, err := file.Info()
require.NoError(err)
require.True(stat.IsDir())
require.NoError(file.Close(ctx))
info, err := fs.Stat(ctx, dir)
require.NoError(err)
require.True(info.IsDir())
entries, err := fs.ReadDir(ctx, dir)
require.NoError(err)
for _, e := range entries {
switch e.Name() {
case "dir2", "dir1", "dir3":
require.False(e.Type().IsRegular())
require.True(e.Type().IsDir())
require.True(e.IsDir())
case "file":
require.True(e.Type().IsRegular())
require.False(e.Type().IsDir())
require.False(e.IsDir())
}
}
}
file, err := fs.Open(ctx, "/dir1/dir2/file")
require.NoError(err)
require.False(file.IsDir())
}

View file

@ -2,28 +2,84 @@ package vfs
import (
"context"
"errors"
"fmt"
"io/fs"
"log/slog"
"path"
"reflect"
"slices"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/rlog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
)
type ResolverFS struct {
rootFS Filesystem
resolver *resolver
log *slog.Logger
}
func NewResolveFS(rootFs Filesystem, factories map[string]FsFactory) *ResolverFS {
return &ResolverFS{
rootFS: rootFs,
resolver: newResolver(factories),
log: rlog.ComponentLog("fs/resolverfs"),
}
}
// ModTime implements Filesystem.
func (r *ResolverFS) ModTime() time.Time {
return time.Time{}
}
// Mode implements Filesystem.
func (r *ResolverFS) Mode() fs.FileMode {
return fs.ModeDir
}
// Size implements Filesystem.
func (r *ResolverFS) Size() int64 {
return 0
}
// Sys implements Filesystem.
func (r *ResolverFS) Sys() any {
return nil
}
// FsName implements Filesystem.
func (r *ResolverFS) FsName() string {
return "resolverfs"
}
func (fs *ResolverFS) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption {
return trace.WithAttributes(append([]attribute.KeyValue{
attribute.String("fs", fs.FsName()),
}, add...)...)
}
func (r *ResolverFS) ResolvablesExtensions() []string {
return maps.Keys(r.resolver.factories)
}
// Open implements Filesystem.
func (r *ResolverFS) Open(ctx context.Context, filename string) (File, error) {
ctx, span := tracer.Start(ctx, "Open",
r.traceAttrs(attribute.String("filename", filename)),
)
defer span.End()
if path.Clean(filename) == Separator {
return newDirFile(r.Name()), nil
}
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open)
if err != nil {
return nil, err
@ -37,6 +93,11 @@ func (r *ResolverFS) Open(ctx context.Context, filename string) (File, error) {
// ReadDir implements Filesystem.
func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, error) {
ctx, span := tracer.Start(ctx, "ReadDir",
r.traceAttrs(attribute.String("name", dir)),
)
defer span.End()
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, dir, r.rootFS.Open)
if err != nil {
return nil, err
@ -57,8 +118,14 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
if err != nil {
return nil, err
}
defer file.Close(ctx)
nestedfs, err := r.resolver.nestedFs(ctx, filepath, file)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.log.ErrorContext(ctx, "creating fs timed out", "filename", e.Name())
continue
}
return nil, err
}
@ -72,11 +139,23 @@ func (r *ResolverFS) ReadDir(ctx context.Context, dir string) ([]fs.DirEntry, er
// Stat implements Filesystem.
func (r *ResolverFS) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
ctx, span := tracer.Start(ctx, "Stat",
r.traceAttrs(attribute.String("filename", filename)),
)
defer span.End()
if isRoot(filename) {
return r, nil
}
fsPath, nestedFs, nestedFsPath, err := r.resolver.resolvePath(ctx, filename, r.rootFS.Open)
if err != nil {
return nil, err
}
span.SetAttributes(attribute.String("fsPath", fsPath), attribute.String("nestedFsPath", nestedFsPath))
if nestedFs != nil {
span.AddEvent("calling nested fs")
return nestedFs.Stat(ctx, nestedFsPath)
}
@ -98,7 +177,7 @@ func (r *ResolverFS) Unlink(ctx context.Context, filename string) error {
// Info implements Filesystem.
func (r *ResolverFS) Info() (fs.FileInfo, error) {
return newDirInfo(r.rootFS.Name()), nil
return r, nil
}
// IsDir implements Filesystem.
@ -108,7 +187,7 @@ func (r *ResolverFS) IsDir() bool {
// Name implements Filesystem.
func (r *ResolverFS) Name() string {
return r.Name()
return r.rootFS.Name()
}
// Type implements Filesystem.
@ -120,8 +199,6 @@ var _ Filesystem = &ResolverFS{}
type FsFactory func(ctx context.Context, f File) (Filesystem, error)
const Separator = "/"
func newResolver(factories map[string]FsFactory) *resolver {
return &resolver{
factories: factories,
@ -171,6 +248,9 @@ func (r *resolver) nestedFs(ctx context.Context, fsPath string, file File) (File
// open requeue raw open, without resolver call
func (r *resolver) resolvePath(ctx context.Context, name string, rawOpen openFile) (fsPath string, nestedFs Filesystem, nestedFsPath string, err error) {
ctx, span := tracer.Start(ctx, "resolvePath")
defer span.End()
name = path.Clean(name)
name = strings.TrimPrefix(name, Separator)
parts := strings.Split(name, Separator)
@ -205,8 +285,12 @@ PARTS_LOOP:
defer r.m.Unlock()
if nestedFs, ok := r.fsmap[fsPath]; ok {
span.AddEvent("fs loaded from cache", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
return fsPath, nestedFs, nestedFsPath, nil
} else {
ctx, span := tracer.Start(ctx, "CreateFS")
defer span.End()
fsFile, err := rawOpen(ctx, fsPath)
if err != nil {
return "", nil, "", fmt.Errorf("error opening filesystem file: %s with error: %w", fsPath, err)
@ -217,6 +301,8 @@ PARTS_LOOP:
}
r.fsmap[fsPath] = nestedFs
span.AddEvent("fs created", trace.WithAttributes(attribute.String("nestedFs", reflect.TypeOf(nestedFs).Name())))
return fsPath, nestedFs, nestedFsPath, nil
}
@ -226,7 +312,7 @@ var ErrNotExist = fs.ErrNotExist
func getFile[F File](m map[string]F, name string) (File, error) {
if name == Separator {
return NewDir(name), nil
return newDirFile(name), nil
}
f, ok := m[name]
@ -236,7 +322,7 @@ func getFile[F File](m map[string]F, name string) (File, error) {
for p := range m {
if strings.HasPrefix(p, name) {
return NewDir(name), nil
return newDirFile(name), nil
}
}

View file

@ -1,235 +1,237 @@
package vfs
package vfs_test
import (
"archive/zip"
"bytes"
"context"
"io/fs"
"os"
"path"
"testing"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/stretchr/testify/require"
)
type Dummy struct {
name string
}
func createZip(files map[string][]byte) ([]byte, error) {
buf := bytes.NewBuffer(nil)
zw := zip.NewWriter(buf)
// Stat implements File.
func (d *Dummy) Stat() (fs.FileInfo, error) {
return newFileInfo(d.name, 0), nil
}
for name, data := range files {
fw, err := zw.Create(name)
if err != nil {
return nil, err
}
func (d *Dummy) Size() int64 {
return 0
}
func (d *Dummy) IsDir() bool {
return false
}
func (d *Dummy) Close(ctx context.Context) error {
return nil
}
func (d *Dummy) Read(ctx context.Context, p []byte) (n int, err error) {
return 0, nil
}
func (d *Dummy) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
return 0, nil
}
var _ File = &Dummy{}
type DummyFs struct {
name string
}
// Stat implements Filesystem.
func (*DummyFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
return newFileInfo(path.Base(filename), 0), nil // TODO
}
func (d *DummyFs) Open(ctx context.Context, filename string) (File, error) {
return &Dummy{}, nil
}
func (d *DummyFs) Unlink(ctx context.Context, filename string) error {
return ErrNotImplemented
}
func (d *DummyFs) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
if path == "/dir/here" {
return []fs.DirEntry{
newFileInfo("file1.txt", 0),
newFileInfo("file2.txt", 0),
}, nil
_, err = fw.Write(data)
if err != nil {
return nil, err
}
}
err := zw.Flush()
if err != nil {
return nil, err
}
return nil, os.ErrNotExist
err = zw.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Info implements Filesystem.
func (d *DummyFs) Info() (fs.FileInfo, error) {
return newDirInfo(d.name), nil
}
// IsDir implements Filesystem.
func (d *DummyFs) IsDir() bool {
return true
}
// Name implements Filesystem.
func (d *DummyFs) Name() string {
return d.name
}
// Type implements Filesystem.
func (d *DummyFs) Type() fs.FileMode {
return fs.ModeDir
}
var _ Filesystem = &DummyFs{}
func TestResolver(t *testing.T) {
func TestResolverFs(t *testing.T) {
t.Parallel()
resolver := newResolver(ArchiveFactories)
ctx := context.Background()
t.Run("nested fs", func(t *testing.T) {
t.Parallel()
require := require.New(t)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/f1.rar/f2.rar", func(_ context.Context, path string) (File, error) {
require.Equal("/f1.rar", path)
return &Dummy{}, nil
})
require.NoError(err)
require.Equal("/f1.rar", fsPath)
require.Equal("/f2.rar", nestedFsPath)
require.IsType(&ArchiveFS{}, nestedFs)
testZip, err := createZip(map[string][]byte{
"123.txt": []byte("123"),
"files/321.txt": []byte("321"),
})
t.Run("root", func(t *testing.T) {
require.NoError(t, err)
fs := vfs.NewResolveFS(vfs.NewMemoryFS("/", map[string]*vfs.MemoryFile{
"/data/123.zip": vfs.NewMemoryFile("123.zip", testZip),
}), vfs.ArchiveFactories)
t.Run("dir", func(t *testing.T) {
t.Parallel()
require := require.New(t)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/", func(_ context.Context, path string) (File, error) {
require.Equal("/", path)
return &Dummy{}, nil
})
require.NoError(err)
require.Nil(nestedFs)
require.Equal("/", fsPath)
require.Equal("", nestedFsPath)
})
dirs := []string{
"/data", "/", "/.",
"/data/123.zip", "/data/123.zip/files", "/data/123.zip/files/.",
}
t.Run("root dirty", func(t *testing.T) {
t.Parallel()
require := require.New(t)
for _, dir := range dirs {
file, err := fs.Open(ctx, dir)
require.NoError(err)
require.True(file.IsDir())
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//", func(_ context.Context, path string) (File, error) {
require.Equal("/", path)
return &Dummy{}, nil
})
require.NoError(err)
require.Nil(nestedFs)
require.Equal("/", fsPath)
require.Equal("", nestedFsPath)
})
t.Run("fs dirty", func(t *testing.T) {
t.Parallel()
require := require.New(t)
stat, err := file.Info()
require.NoError(err)
require.True(stat.IsDir())
}
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//f1.rar", func(_ context.Context, path string) (File, error) {
require.Equal("/f1.rar", path)
return &Dummy{}, nil
})
entries, err := fs.ReadDir(ctx, "/data")
require.NoError(err)
require.Equal("/f1.rar", fsPath)
require.Equal("/", nestedFsPath)
require.IsType(&ArchiveFS{}, nestedFs)
})
t.Run("inside folder", func(t *testing.T) {
t.Parallel()
require := require.New(t)
require.Len(entries, 1)
fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//test1/f1.rar", func(_ context.Context, path string) (File, error) {
require.Equal("/test1/f1.rar", path)
return &Dummy{}, nil
})
for _, e := range entries {
switch e.Name() {
case "123.zip":
require.True(e.IsDir())
require.IsType(&vfs.ArchiveFS{}, e)
}
}
entries, err = fs.ReadDir(ctx, "/data/123.zip/files")
require.NoError(err)
require.IsType(&ArchiveFS{}, nestedFs)
require.Equal("/test1/f1.rar", fsPath)
require.Equal("/", nestedFsPath)
require.Len(entries, 1)
entries, err = fs.ReadDir(ctx, "/data/123.zip")
require.NoError(err)
require.Len(entries, 3)
for _, e := range entries {
switch e.Name() {
case "files":
require.True(e.IsDir())
case "123.txt":
require.False(e.IsDir())
}
}
})
}
func TestArchiveFactories(t *testing.T) {
t.Parallel()
// func TestResolver(t *testing.T) {
// t.Parallel()
// resolver := newResolver(ArchiveFactories)
// ctx := context.Background()
ctx := context.Background()
// t.Run("nested fs", func(t *testing.T) {
// t.Parallel()
// require := require.New(t)
require := require.New(t)
// fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/f1.rar/f2.rar", func(_ context.Context, path string) (File, error) {
// require.Equal("/f1.rar", path)
// return &vfs.Dummy{}, nil
// })
// require.NoError(err)
// require.Equal("/f1.rar", fsPath)
// require.Equal("/f2.rar", nestedFsPath)
// require.IsType(&vfs.ArchiveFS{}, nestedFs)
// })
// t.Run("root", func(t *testing.T) {
// t.Parallel()
// require := require.New(t)
require.Contains(ArchiveFactories, ".zip")
require.Contains(ArchiveFactories, ".rar")
require.Contains(ArchiveFactories, ".7z")
// fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/", func(_ context.Context, path string) (File, error) {
// require.Equal("/", path)
// return &Dummy{}, nil
// })
// require.NoError(err)
// require.Nil(nestedFs)
// require.Equal("/", fsPath)
// require.Equal("", nestedFsPath)
// })
fs, err := ArchiveFactories[".zip"](ctx, &Dummy{})
require.NoError(err)
require.NotNil(fs)
// t.Run("root dirty", func(t *testing.T) {
// t.Parallel()
// require := require.New(t)
fs, err = ArchiveFactories[".rar"](ctx, &Dummy{})
require.NoError(err)
require.NotNil(fs)
// fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//", func(_ context.Context, path string) (File, error) {
// require.Equal("/", path)
// return &Dummy{}, nil
// })
// require.NoError(err)
// require.Nil(nestedFs)
// require.Equal("/", fsPath)
// require.Equal("", nestedFsPath)
// })
fs, err = ArchiveFactories[".7z"](ctx, &Dummy{})
require.NoError(err)
require.NotNil(fs)
}
// t.Run("root dirty 2", func(t *testing.T) {
// t.Parallel()
// require := require.New(t)
func TestFiles(t *testing.T) {
t.Parallel()
require := require.New(t)
// fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "/.", func(_ context.Context, path string) (File, error) {
// require.Equal("/", path)
// return &Dummy{}, nil
// })
// require.NoError(err)
// require.Nil(nestedFs)
// require.Equal("/", fsPath)
// require.Equal("", nestedFsPath)
// })
files := map[string]*Dummy{
"/test/file.txt": &Dummy{},
"/test/file2.txt": &Dummy{},
"/test1/file.txt": &Dummy{},
}
{
file, err := getFile(files, "/test")
require.NoError(err)
require.Equal(&dir{name: "test"}, file)
}
{
file, err := getFile(files, "/test/file.txt")
require.NoError(err)
require.Equal(&Dummy{}, file)
}
{
out, err := listDirFromFiles(files, "/test")
require.NoError(err)
require.Len(out, 2)
require.Equal("file.txt", out[0].Name())
require.Equal("file2.txt", out[1].Name())
require.False(out[0].IsDir())
require.False(out[1].IsDir())
}
{
out, err := listDirFromFiles(files, "/test1")
require.NoError(err)
require.Len(out, 1)
require.Equal("file.txt", out[0].Name())
require.False(out[0].IsDir())
}
{
out, err := listDirFromFiles(files, "/")
require.NoError(err)
require.Len(out, 2)
require.Equal("test", out[0].Name())
require.Equal("test1", out[1].Name())
require.True(out[0].IsDir())
require.True(out[1].IsDir())
}
}
// t.Run("fs dirty", func(t *testing.T) {
// t.Parallel()
// require := require.New(t)
// fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//.//f1.rar", func(_ context.Context, path string) (File, error) {
// require.Equal("/f1.rar", path)
// return &Dummy{}, nil
// })
// require.NoError(err)
// require.Equal("/f1.rar", fsPath)
// require.Equal("/", nestedFsPath)
// require.IsType(&ArchiveFS{}, nestedFs)
// })
// t.Run("inside folder", func(t *testing.T) {
// t.Parallel()
// require := require.New(t)
// fsPath, nestedFs, nestedFsPath, err := resolver.resolvePath(ctx, "//test1/f1.rar", func(_ context.Context, path string) (File, error) {
// require.Equal("/test1/f1.rar", path)
// return &Dummy{}, nil
// })
// require.NoError(err)
// require.IsType(&ArchiveFS{}, nestedFs)
// require.Equal("/test1/f1.rar", fsPath)
// require.Equal("/", nestedFsPath)
// })
// }
// func TestFiles(t *testing.T) {
// t.Parallel()
// require := require.New(t)
// files := map[string]*vfs.DummyFile{
// "/test/file.txt": &vfs.DummyFile{},
// "/test/file2.txt": &vfs.DummyFile{},
// "/test1/file.txt": &vfs.DummyFile{},
// }
// {
// file, err := getFile(files, "/test")
// require.NoError(err)
// require.Equal(&dir{name: "test"}, file)
// }
// {
// file, err := getFile(files, "/test/file.txt")
// require.NoError(err)
// require.Equal(&Dummy{}, file)
// }
// {
// out, err := listDirFromFiles(files, "/test")
// require.NoError(err)
// require.Len(out, 2)
// require.Equal("file.txt", out[0].Name())
// require.Equal("file2.txt", out[1].Name())
// require.False(out[0].IsDir())
// require.False(out[1].IsDir())
// }
// {
// out, err := listDirFromFiles(files, "/test1")
// require.NoError(err)
// require.Len(out, 1)
// require.Equal("file.txt", out[0].Name())
// require.False(out[0].IsDir())
// }
// {
// out, err := listDirFromFiles(files, "/")
// require.NoError(err)
// require.Len(out, 2)
// require.Equal("test", out[0].Name())
// require.Equal("test1", out[1].Name())
// require.True(out[0].IsDir())
// require.True(out[1].IsDir())
// }
// }

View file

@ -8,15 +8,15 @@ import (
"slices"
"strings"
"sync"
"time"
"git.kmsign.ru/royalcat/tstor/pkg/ctxio"
"git.kmsign.ru/royalcat/tstor/src/host/controller"
"github.com/anacrolix/torrent"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
)
var _ Filesystem = &TorrentFs{}
type TorrentFs struct {
name string
@ -28,6 +28,8 @@ type TorrentFs struct {
resolver *resolver
}
var _ Filesystem = (*TorrentFs)(nil)
func NewTorrentFs(name string, c *controller.Torrent) *TorrentFs {
return &TorrentFs{
name: name,
@ -45,7 +47,7 @@ func (tfs *TorrentFs) Name() string {
// Info implements fs.DirEntry.
func (tfs *TorrentFs) Info() (fs.FileInfo, error) {
return newDirInfo(tfs.name), nil
return tfs, nil
}
// IsDir implements fs.DirEntry.
@ -58,6 +60,31 @@ func (tfs *TorrentFs) Type() fs.FileMode {
return fs.ModeDir
}
// ModTime implements fs.FileInfo.
func (tfs *TorrentFs) ModTime() time.Time {
return time.Time{}
}
// Mode implements fs.FileInfo.
func (tfs *TorrentFs) Mode() fs.FileMode {
return fs.ModeDir
}
// Size implements fs.FileInfo.
func (tfs *TorrentFs) Size() int64 {
return 0
}
// Sys implements fs.FileInfo.
func (tfs *TorrentFs) Sys() any {
return nil
}
// FsName implements Filesystem.
func (tfs *TorrentFs) FsName() string {
return "torrentfs"
}
func (fs *TorrentFs) files(ctx context.Context) (map[string]File, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
@ -66,14 +93,17 @@ func (fs *TorrentFs) files(ctx context.Context) (map[string]File, error) {
return fs.filesCache, nil
}
files, err := fs.Torrent.Files(context.Background())
ctx, span := tracer.Start(ctx, "files", fs.traceAttrs())
defer span.End()
files, err := fs.Torrent.Files(ctx)
if err != nil {
return nil, err
}
fs.filesCache = make(map[string]File)
for _, file := range files {
file.Download()
file.SetPriority(torrent.PiecePriorityNormal)
p := AbsPath(file.Path())
tf, err := openTorrentFile(ctx, path.Base(p), file)
if err != nil {
@ -93,7 +123,7 @@ func (fs *TorrentFs) files(ctx context.Context) (map[string]File, error) {
if nestedFs == nil {
goto DEFAULT_DIR // FIXME
}
fs.filesCache, err = listFilesRecursive(ctx, nestedFs, "/")
fs.filesCache, err = fs.listFilesRecursive(ctx, nestedFs, "/")
if err != nil {
return nil, err
}
@ -141,7 +171,12 @@ DEFAULT_DIR:
// return true
// }
func listFilesRecursive(ctx context.Context, vfs Filesystem, start string) (map[string]File, error) {
func (fs *TorrentFs) listFilesRecursive(ctx context.Context, vfs Filesystem, start string) (map[string]File, error) {
ctx, span := tracer.Start(ctx, "listFilesRecursive",
fs.traceAttrs(attribute.String("start", start)),
)
defer span.End()
out := make(map[string]File, 0)
entries, err := vfs.ReadDir(ctx, start)
if err != nil {
@ -150,7 +185,7 @@ func listFilesRecursive(ctx context.Context, vfs Filesystem, start string) (map[
for _, entry := range entries {
filename := path.Join(start, entry.Name())
if entry.IsDir() {
rec, err := listFilesRecursive(ctx, vfs, filename)
rec, err := fs.listFilesRecursive(ctx, vfs, filename)
if err != nil {
return nil, err
}
@ -167,16 +202,31 @@ func listFilesRecursive(ctx context.Context, vfs Filesystem, start string) (map[
return out, nil
}
func (fs *TorrentFs) rawOpen(ctx context.Context, path string) (File, error) {
func (fs *TorrentFs) rawOpen(ctx context.Context, filename string) (file File, err error) {
ctx, span := tracer.Start(ctx, "rawOpen",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
files, err := fs.files(ctx)
if err != nil {
return nil, err
}
file, err := getFile(files, path)
file, err = getFile(files, filename)
return file, err
}
func (fs *TorrentFs) rawStat(ctx context.Context, filename string) (fs.FileInfo, error) {
ctx, span := tracer.Start(ctx, "rawStat",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer span.End()
files, err := fs.files(ctx)
if err != nil {
return nil, err
@ -185,13 +235,26 @@ func (fs *TorrentFs) rawStat(ctx context.Context, filename string) (fs.FileInfo,
if err != nil {
return nil, err
}
return file.Stat()
return file.Info()
}
func (fs *TorrentFs) traceAttrs(add ...attribute.KeyValue) trace.SpanStartOption {
return trace.WithAttributes(append([]attribute.KeyValue{
attribute.String("fs", fs.FsName()),
attribute.String("torrent", fs.Torrent.Name()),
attribute.String("infohash", fs.Torrent.InfoHash()),
}, add...)...)
}
// Stat implements Filesystem.
func (fs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, error) {
if filename == Separator {
return newDirInfo(filename), nil
ctx, span := tracer.Start(ctx, "Stat",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer span.End()
if path.Clean(filename) == Separator {
return fs, nil
}
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen)
@ -206,6 +269,15 @@ func (fs *TorrentFs) Stat(ctx context.Context, filename string) (fs.FileInfo, er
}
func (fs *TorrentFs) Open(ctx context.Context, filename string) (File, error) {
ctx, span := tracer.Start(ctx, "Open",
fs.traceAttrs(attribute.String("filename", filename)),
)
defer span.End()
if path.Clean(filename) == Separator {
return newDirFile(fs.name), nil
}
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, filename, fs.rawOpen)
if err != nil {
return nil, err
@ -218,6 +290,11 @@ func (fs *TorrentFs) Open(ctx context.Context, filename string) (File, error) {
}
func (fs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
ctx, span := tracer.Start(ctx, "ReadDir",
fs.traceAttrs(attribute.String("name", name)),
)
defer span.End()
fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(ctx, name, fs.rawOpen)
if err != nil {
return nil, err
@ -234,6 +311,11 @@ func (fs *TorrentFs) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e
}
func (fs *TorrentFs) Unlink(ctx context.Context, name string) error {
ctx, span := tracer.Start(ctx, "Unlink",
fs.traceAttrs(attribute.String("name", name)),
)
defer span.End()
name = AbsPath(name)
fs.mu.Lock()
@ -256,10 +338,10 @@ func (fs *TorrentFs) Unlink(ctx context.Context, name string) error {
return ErrNotImplemented
}
return fs.Torrent.ExcludeFile(context.Background(), tfile.file)
return fs.Torrent.ExcludeFile(ctx, tfile.file)
}
var _ File = &torrentFile{}
var _ File = (*torrentFile)(nil)
type torrentFile struct {
name string
@ -268,20 +350,24 @@ type torrentFile struct {
tr torrent.Reader
lastReadTimeout time.Time
file *torrent.File
}
const secondaryTimeout = time.Hour
func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*torrentFile, error) {
select {
case <-file.Torrent().GotInfo():
break
case <-ctx.Done():
return nil, ctx.Err()
}
// select {
// case <-file.Torrent().GotInfo():
// break
// case <-ctx.Done():
// return nil, ctx.Err()
// }
r := file.NewReader()
r.SetReadahead(4096) // TODO configurable
r.SetResponsive()
r.SetReadahead(1024 * 1024 * 16) // TODO configurable
// r.SetResponsive()
return &torrentFile{
name: name,
@ -290,7 +376,17 @@ func openTorrentFile(ctx context.Context, name string, file *torrent.File) (*tor
}, nil
}
func (tf *torrentFile) Stat() (fs.FileInfo, error) {
// Name implements File.
func (tf *torrentFile) Name() string {
return tf.name
}
// Type implements File.
func (tf *torrentFile) Type() fs.FileMode {
return roMode | fs.ModeDir
}
func (tf *torrentFile) Info() (fs.FileInfo, error) {
return newFileInfo(tf.name, tf.file.Length()), nil
}
@ -311,32 +407,80 @@ func (rw *torrentFile) Close(ctx context.Context) error {
// Read implements ctxio.Reader.
func (tf *torrentFile) Read(ctx context.Context, p []byte) (n int, err error) {
ctx, span := tracer.Start(ctx, "Read",
trace.WithAttributes(attribute.Int("length", len(p))),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
tf.mu.Lock()
defer tf.mu.Unlock()
if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
tf.lastReadTimeout = time.Now()
}
}()
return tf.tr.ReadContext(ctx, p)
}
func (yf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
yf.mu.Lock()
defer yf.mu.Unlock()
func (tf *torrentFile) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
ctx, span := tracer.Start(ctx, "ReadAt",
trace.WithAttributes(attribute.Int("length", len(p)), attribute.Int64("offset", off)),
)
defer func() {
span.SetAttributes(attribute.Int("read", n))
span.End()
}()
_, err := yf.tr.Seek(off, io.SeekStart)
tf.mu.Lock()
defer tf.mu.Unlock()
if time.Since(tf.lastReadTimeout) < secondaryTimeout { // make short timeout for already faliled files
span.SetAttributes(attribute.Bool("short_timeout", true))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
defer cancel()
}
defer func() {
if err == context.DeadlineExceeded {
tf.lastReadTimeout = time.Now()
}
}()
_, err = tf.tr.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
return readAtLeast(ctx, yf, p, len(p))
// return tf.tr.ReadContext(ctx, p)
n, err = readAtLeast(ctx, tf.tr, p, len(p))
_, err = tf.tr.Seek(0, io.SeekStart)
if err != nil {
return 0, err
}
return n, err
}
func readAtLeast(ctx context.Context, r ctxio.Reader, buf []byte, min int) (n int, err error) {
func readAtLeast(ctx context.Context, r torrent.Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(ctx, buf[n:])
nn, err = r.ReadContext(ctx, buf[n:])
n += nn
}
if n >= min {

View file

@ -1,13 +1,10 @@
package vfs
import (
"context"
"os"
"testing"
"github.com/anacrolix/torrent"
"github.com/stretchr/testify/require"
)
const testMagnet = "magnet:?xt=urn:btih:a88fda5954e89178c372716a6a78b8180ed4dad3&dn=The+WIRED+CD+-+Rip.+Sample.+Mash.+Share&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fwired-cd.torrent"
@ -85,61 +82,61 @@ func TestMain(m *testing.M) {
// require.NoError(f.Close())
// }
func TestReadAtTorrent(t *testing.T) {
t.Parallel()
// func TestReadAtTorrent(t *testing.T) {
// t.Parallel()
ctx := context.Background()
// ctx := context.Background()
require := require.New(t)
// require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
// to, err := Cli.AddMagnet(testMagnet)
// require.NoError(err)
<-to.GotInfo()
torrFile := to.Files()[0]
// <-to.GotInfo()
// torrFile := to.Files()[0]
tf := torrentFile{
file: torrFile,
}
// tf, err := openTorrentFile(ctx, "torr", torrFile)
// require.NoError(err)
defer tf.Close(ctx)
// defer tf.Close(ctx)
toRead := make([]byte, 5)
n, err := tf.ReadAt(ctx, toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
// toRead := make([]byte, 5)
// n, err := tf.ReadAt(ctx, toRead, 6)
// require.NoError(err)
// require.Equal(5, n)
// require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = tf.ReadAt(ctx, toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}
// n, err = tf.ReadAt(ctx, toRead, 0)
// require.NoError(err)
// require.Equal(5, n)
// require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
// }
func TestReadAtWrapper(t *testing.T) {
t.Parallel()
// func TestReadAtWrapper(t *testing.T) {
// t.Parallel()
ctx := context.Background()
// ctx := context.Background()
require := require.New(t)
// require := require.New(t)
to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)
// to, err := Cli.AddMagnet(testMagnet)
// require.NoError(err)
<-to.GotInfo()
torrFile := to.Files()[0]
// <-to.GotInfo()
// torrFile := to.Files()[0]
r, err := openTorrentFile(ctx, "file", torrFile)
defer r.Close(ctx)
// r, err := openTorrentFile(ctx, "file", torrFile)
// require.NoError(err)
// defer r.Close(ctx)
toRead := make([]byte, 5)
n, err := r.ReadAt(ctx, toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
// toRead := make([]byte, 5)
// n, err := r.ReadAt(ctx, toRead, 6)
// require.NoError(err)
// require.Equal(5, n)
// require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)
n, err = r.ReadAt(ctx, toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}
// n, err = r.ReadAt(ctx, toRead, 0)
// require.NoError(err)
// require.Equal(5, n)
// require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
// }

View file

@ -1,10 +1,17 @@
package vfs
import (
"path"
"strings"
"sync"
)
const Separator = "/"
func isRoot(filename string) bool {
return path.Clean(filename) == Separator
}
func trimRelPath(p, t string) string {
return strings.Trim(strings.TrimPrefix(p, t), "/")
}

View file

@ -5,22 +5,20 @@ import (
"log"
"log/slog"
nfs "github.com/willscott/go-nfs"
nfs "git.kmsign.ru/royalcat/tstor/pkg/go-nfs"
)
var _ nfs.Logger = (*NFSLog)(nil)
type NFSLog struct {
level nfs.LogLevel
// r *slog.Logger
l *slog.Logger
l *slog.Logger
}
func NewNFSLog(r *slog.Logger) nfs.Logger {
func NewNFSLog(r *slog.Logger) *NFSLog {
return &NFSLog{
level: nfs.DebugLevel,
// l: r.Level(zerolog.DebugLevel),
l: r,
l: r,
}
}