package vfs import ( "context" "io" "io/fs" "path" "slices" "strings" "sync" "time" "git.kmsign.ru/royalcat/tstor/src/host/controller" "git.kmsign.ru/royalcat/tstor/src/iio" "github.com/RoaringBitmap/roaring" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/torrent" "golang.org/x/exp/maps" ) var _ Filesystem = &TorrentFs{} type TorrentFs struct { name string mu sync.Mutex Torrent *controller.Torrent readTimeout int filesCache map[string]File resolver *resolver } func NewTorrentFs(name string, c *controller.Torrent, readTimeout int) *TorrentFs { return &TorrentFs{ name: name, Torrent: c, readTimeout: readTimeout, resolver: newResolver(ArchiveFactories), } } var _ fs.DirEntry = (*TorrentFs)(nil) // Name implements fs.DirEntry. func (tfs *TorrentFs) Name() string { return tfs.name } // Info implements fs.DirEntry. func (tfs *TorrentFs) Info() (fs.FileInfo, error) { return newDirInfo(tfs.name), nil } // IsDir implements fs.DirEntry. func (tfs *TorrentFs) IsDir() bool { return true } // Type implements fs.DirEntry. func (tfs *TorrentFs) Type() fs.FileMode { return fs.ModeDir } func (fs *TorrentFs) files() (map[string]File, error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.filesCache != nil { return fs.filesCache, nil } files, err := fs.Torrent.Files(context.Background()) if err != nil { return nil, err } fs.filesCache = make(map[string]File) for _, file := range files { file.Download() p := AbsPath(file.Path()) fs.filesCache[p] = &torrentFile{ name: path.Base(p), timeout: fs.readTimeout, file: file, } } // TODO optional if len(fs.filesCache) == 1 && fs.resolver.isNestedFs(fs.Torrent.Name()) { filepath := "/" + fs.Torrent.Name() if file, ok := fs.filesCache[filepath]; ok { nestedFs, err := fs.resolver.nestedFs(filepath, file) if err != nil { return nil, err } if nestedFs == nil { goto DEFAULT_DIR // FIXME } fs.filesCache, err = listFilesRecursive(nestedFs, "/") if err != nil { return nil, err } return fs.filesCache, nil } } DEFAULT_DIR: rootDir := "/" + fs.Torrent.Name() + "/" singleDir := true for k, _ := range fs.filesCache { if !strings.HasPrefix(k, rootDir) { singleDir = false } } if singleDir { for k, f := range fs.filesCache { delete(fs.filesCache, k) k, _ = strings.CutPrefix(k, rootDir) k = AbsPath(k) fs.filesCache[k] = f } } return fs.filesCache, nil } func anyPeerHasFiles(file *torrent.File) bool { for _, conn := range file.Torrent().PeerConns() { if bitmapHaveFile(conn.PeerPieces(), file) { return true } } return false } func bitmapHaveFile(bitmap *roaring.Bitmap, file *torrent.File) bool { for i := file.BeginPieceIndex(); i < file.EndPieceIndex(); i++ { if !bitmap.ContainsInt(i) { return false } } return true } func listFilesRecursive(vfs Filesystem, start string) (map[string]File, error) { out := make(map[string]File, 0) entries, err := vfs.ReadDir(start) if err != nil { return nil, err } for _, entry := range entries { filename := path.Join(start, entry.Name()) if entry.IsDir() { rec, err := listFilesRecursive(vfs, filename) if err != nil { return nil, err } maps.Copy(out, rec) } else { file, err := vfs.Open(filename) if err != nil { return nil, err } out[filename] = file } } return out, nil } func (fs *TorrentFs) rawOpen(path string) (File, error) { files, err := fs.files() if err != nil { return nil, err } file, err := getFile(files, path) return file, err } func (fs *TorrentFs) rawStat(filename string) (fs.FileInfo, error) { files, err := fs.files() if err != nil { return nil, err } file, err := getFile(files, filename) if err != nil { return nil, err } return file.Stat() } // Stat implements Filesystem. func (fs *TorrentFs) Stat(filename string) (fs.FileInfo, error) { if filename == Separator { return newDirInfo(filename), nil } fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(filename, fs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { return nestedFs.Stat(nestedFsPath) } return fs.rawStat(fsPath) } func (fs *TorrentFs) Open(filename string) (File, error) { fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(filename, fs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { return nestedFs.Open(nestedFsPath) } return fs.rawOpen(fsPath) } func (fs *TorrentFs) ReadDir(name string) ([]fs.DirEntry, error) { fsPath, nestedFs, nestedFsPath, err := fs.resolver.resolvePath(name, fs.rawOpen) if err != nil { return nil, err } if nestedFs != nil { return nestedFs.ReadDir(nestedFsPath) } files, err := fs.files() if err != nil { return nil, err } return listDirFromFiles(files, fsPath) } func (fs *TorrentFs) Unlink(name string) error { name = AbsPath(name) fs.mu.Lock() defer fs.mu.Unlock() files, err := fs.files() if err != nil { return err } if !slices.Contains(maps.Keys(files), name) { return ErrNotExist } file := files[name] delete(fs.filesCache, name) tfile, ok := file.(*torrentFile) if !ok { return ErrNotImplemented } return fs.Torrent.ExcludeFile(context.Background(), tfile.file) } type reader interface { iio.Reader missinggo.ReadContexter } type readAtWrapper struct { timeout int mu sync.Mutex torrent.Reader io.ReaderAt io.Closer } func newReadAtWrapper(r torrent.Reader, timeout int) reader { w := &readAtWrapper{Reader: r, timeout: timeout} w.SetResponsive() return w } func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) { rw.mu.Lock() defer rw.mu.Unlock() _, err := rw.Seek(off, io.SeekStart) if err != nil { return 0, err } return readAtLeast(rw, rw.timeout, p, len(p)) } func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) { if len(buf) < min { return 0, io.ErrShortBuffer } for n < min && err == nil { var nn int ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) defer cancel() nn, err = r.ReadContext(ctx, buf[n:]) n += nn } if n >= min { err = nil } else if n > 0 && err == io.EOF { err = io.ErrUnexpectedEOF } return } func (rw *readAtWrapper) Close() error { rw.mu.Lock() defer rw.mu.Unlock() return rw.Reader.Close() } var _ File = &torrentFile{} type torrentFile struct { name string reader reader timeout int file *torrent.File } func (d *torrentFile) Stat() (fs.FileInfo, error) { return newFileInfo(d.name, d.file.Length()), nil } func (d *torrentFile) load() { if d.reader != nil { return } d.reader = newReadAtWrapper(d.file.NewReader(), d.timeout) } func (d *torrentFile) Size() int64 { return d.file.Length() } func (d *torrentFile) IsDir() bool { return false } func (d *torrentFile) Close() error { var err error if d.reader != nil { err = d.reader.Close() } d.reader = nil return err } func (d *torrentFile) Read(p []byte) (n int, err error) { d.load() ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Second) defer cancel() return d.reader.ReadContext(ctx, p) } func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) { d.load() return d.reader.ReadAt(p, off) }