oprimized, working

This commit is contained in:
royalcat 2023-12-22 02:15:39 +03:00
parent 2b39afca3b
commit 0350ecba9a
38 changed files with 1809 additions and 826 deletions
cmd/tstor

View file

@ -1,27 +1,30 @@
package main
import (
"bufio"
"fmt"
"net"
nethttp "net/http"
_ "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"runtime"
"syscall"
"time"
"git.kmsign.ru/royalcat/tstor/src/config"
"git.kmsign.ru/royalcat/tstor/src/host"
"git.kmsign.ru/royalcat/tstor/src/host/torrent"
"github.com/anacrolix/torrent/storage"
"github.com/gin-gonic/gin"
"git.kmsign.ru/royalcat/tstor/src/host/vfs"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
wnfs "github.com/willscott/go-nfs"
"git.kmsign.ru/royalcat/tstor/src/http"
dlog "git.kmsign.ru/royalcat/tstor/src/log"
"git.kmsign.ru/royalcat/tstor/src/mounts/fuse"
"git.kmsign.ru/royalcat/tstor/src/mounts/httpfs"
"git.kmsign.ru/royalcat/tstor/src/mounts/nfs"
"git.kmsign.ru/royalcat/tstor/src/mounts/webdav"
)
@ -44,16 +47,7 @@ func main() {
},
Action: func(c *cli.Context) error {
err := load(c.String(configFlag))
// stop program execution on errors to avoid flashing consoles
if err != nil && runtime.GOOS == "windows" {
log.Error().Err(err).Msg("problem starting application")
fmt.Print("Press 'Enter' to continue...")
bufio.NewReader(os.Stdin).ReadBytes('\n')
}
return err
return run(c.String(configFlag))
},
HideHelpCommand: true,
@ -64,50 +58,8 @@ func main() {
}
}
func setupStorage(tcfg config.TorrentClient) (storage.ClientImplCloser, storage.PieceCompletion, error) {
pcp := filepath.Join(tcfg.DataFolder, "piece-completion")
if err := os.MkdirAll(pcp, 0744); err != nil {
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
}
pc, err := storage.NewBoltPieceCompletion(pcp)
if err != nil {
return nil, nil, fmt.Errorf("error creating servers piece completion: %w", err)
}
func run(configPath string) error {
// TODO implement cache/storage switching
// cacheDir := filepath.Join(tcfg.DataFolder, "cache")
// if err := os.MkdirAll(cacheDir, 0744); err != nil {
// return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
// }
// fc, err := filecache.NewCache(cacheDir)
// if err != nil {
// return nil, nil, fmt.Errorf("error creating cache: %w", err)
// }
// log.Info().Msg(fmt.Sprintf("setting cache size to %d MB", 1024))
// fc.SetCapacity(1024 * 1024 * 1024)
// rp := storage.NewResourcePieces(fc.AsResourceProvider())
// st := &stc{rp}
filesDir := filepath.Join(tcfg.DataFolder, "files")
if err := os.MkdirAll(pcp, 0744); err != nil {
return nil, nil, fmt.Errorf("error creating piece completion folder: %w", err)
}
st := storage.NewFileWithCompletion(filesDir, pc)
return st, pc, nil
}
type stc struct {
storage.ClientImpl
}
func (s *stc) Close() error {
return nil
}
func load(configPath string) error {
conf, err := config.Load(configPath)
if err != nil {
return fmt.Errorf("error loading configuration: %w", err)
@ -115,6 +67,11 @@ func load(configPath string) error {
dlog.Load(&conf.Log)
err = syscall.Setpriority(syscall.PRIO_PGRP, 0, 19)
if err != nil {
log.Err(err).Msg("set priority failed")
}
if err := os.MkdirAll(conf.TorrentClient.MetadataFolder, 0744); err != nil {
return fmt.Errorf("error creating metadata folder: %w", err)
}
@ -123,22 +80,25 @@ func load(configPath string) error {
if err != nil {
return fmt.Errorf("error starting item store: %w", err)
}
defer fis.Close()
id, err := torrent.GetOrCreatePeerID(filepath.Join(conf.TorrentClient.MetadataFolder, "ID"))
if err != nil {
return fmt.Errorf("error creating node ID: %w", err)
}
st, _, err := setupStorage(conf.TorrentClient)
st, _, err := torrent.SetupStorage(conf.TorrentClient)
if err != nil {
return err
}
defer st.Close()
c, err := torrent.NewClient(st, fis, &conf.TorrentClient, id)
if err != nil {
return fmt.Errorf("error starting torrent client: %w", err)
}
c.AddDhtNodes(conf.TorrentClient.DHTNodes)
defer c.Close()
ts := torrent.NewService(c, conf.TorrentClient.AddTimeout, conf.TorrentClient.ReadTimeout)
@ -147,46 +107,15 @@ func load(configPath string) error {
}
cfs := host.NewStorage(conf.DataFolder, ts)
var mh *fuse.Handler
if conf.Mounts.Fuse.Enabled {
mh = fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path)
mh := fuse.NewHandler(conf.Mounts.Fuse.AllowOther, conf.Mounts.Fuse.Path)
err := mh.Mount(cfs)
if err != nil {
return fmt.Errorf("mount fuse error: %w", err)
}
defer mh.Unmount()
}
sigChan := make(chan os.Signal)
signal.Notify(sigChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Info().Msg("closing servers...")
// for _, s := range servers {
// if err := s.Close(); err != nil {
// log.Warn().Err(err).Msg("problem closing server")
// }
// }
log.Info().Msg("closing items database...")
fis.Close()
log.Info().Msg("closing torrent client...")
c.Close()
if mh != nil {
log.Info().Msg("unmounting fuse filesystem...")
mh.Unmount()
}
log.Info().Msg("exiting")
os.Exit(1)
}()
go func() {
if mh == nil {
return
}
if err := mh.Mount(cfs); err != nil {
log.Info().Err(err).Msg("error mounting filesystems")
}
}()
if conf.Mounts.WebDAV.Enabled {
go func() {
if err := webdav.NewWebDAVServer(cfs, conf.Mounts.WebDAV.Port, conf.Mounts.WebDAV.User, conf.Mounts.WebDAV.Pass); err != nil {
@ -199,24 +128,63 @@ func load(configPath string) error {
if conf.Mounts.HttpFs.Enabled {
go func() {
httpfs := httpfs.NewHTTPFS(cfs)
r := gin.New()
r.GET("*filepath", func(c *gin.Context) {
path := c.Param("filepath")
c.FileFromFS(path, httpfs)
})
log.Info().Str("host", fmt.Sprintf("0.0.0.0:%d", conf.Mounts.HttpFs.Port)).Msg("starting HTTPFS")
if err := r.Run(fmt.Sprintf("0.0.0.0:%d", conf.Mounts.HttpFs.Port)); err != nil {
err = nethttp.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", conf.Mounts.HttpFs.Port), nethttp.FileServer(httpfs))
if err != nil {
log.Error().Err(err).Msg("error starting HTTPFS")
}
// r := gin.New()
// r.GET("*filepath", func(c *gin.Context) {
// path := c.Param("filepath")
// c.FileFromFS(path, httpfs)
// })
// log.Info().Str("host", fmt.Sprintf("0.0.0.0:%d", conf.Mounts.HttpFs.Port)).Msg("starting HTTPFS")
// if err := r.Run(fmt.Sprintf("0.0.0.0:%d", conf.Mounts.HttpFs.Port)); err != nil {
// log.Error().Err(err).Msg("error starting HTTPFS")
// }
}()
}
logFilename := filepath.Join(conf.Log.Path, dlog.FileName)
if conf.Mounts.NFS.Enabled {
go func() {
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", conf.Mounts.NFS.Port))
panicOnErr(err, "starting TCP listener")
log.Info().Str("host", listener.Addr().String()).Msg("starting NFS server")
handler, err := nfs.NewNFSv3Handler(cfs)
panicOnErr(err, "creating NFS handler")
panicOnErr(wnfs.Serve(listener, handler), "serving nfs")
}()
}
err = http.New(nil, nil, ts, logFilename, conf)
log.Error().Err(err).Msg("error initializing HTTP server")
return err
dataFS := vfs.NewOsFs(conf.DataFolder)
go func() {
if err := webdav.NewWebDAVServer(dataFS, 36912, conf.Mounts.WebDAV.User, conf.Mounts.WebDAV.Pass); err != nil {
log.Error().Err(err).Msg("error starting webDAV")
}
log.Warn().Msg("webDAV configuration not found!")
}()
go func() {
logFilename := filepath.Join(conf.Log.Path, dlog.FileName)
err = http.New(nil, nil, ts, logFilename, conf)
log.Error().Err(err).Msg("error initializing HTTP server")
}()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
return nil
}
func panicOnErr(err error, desc string) {
if err == nil {
return
}
log.Err(err).Msg(desc)
log.Panic()
}