tstor/torrent/server.go
Antonio Navarro Perez e90e6eaf1f
Improve server. (#103)
2021-12-09 17:45:25 +01:00

255 lines
4.7 KiB
Go

package torrent
import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/distribyted/distribyted/config"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type ServerState int
const (
UNKNOWN ServerState = iota
SEEDING
READING
UPDATING
STOPPED
ERROR
)
func (ss ServerState) String() string {
return [...]string{"Unknown", "Seeding", "Reading", "Updating", "Stopped", "Error"}[ss]
}
type ServerInfo struct {
Magnet string `json:"magnetUri"`
UpdatedAt int64 `json:"updatedAt"`
Name string `json:"name"`
Folder string `json:"folder"`
State string `json:"state"`
Peers int `json:"peers"`
Seeds int `json:"seeds"`
}
type Server struct {
cfg *config.Server
log zerolog.Logger
fw *fsnotify.Watcher
eventsCount uint64
c *torrent.Client
pc storage.PieceCompletion
mu sync.RWMutex
t *torrent.Torrent
si ServerInfo
}
func NewServer(c *torrent.Client, pc storage.PieceCompletion, cfg *config.Server) *Server {
l := log.Logger.With().Str("component", "server").Str("name", cfg.Name).Logger()
return &Server{
cfg: cfg,
log: l,
c: c,
pc: pc,
}
}
func (s *Server) Start() error {
s.log.Info().Msg("starting new server folder")
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
if err := os.MkdirAll(s.cfg.Path, 0744); err != nil {
return fmt.Errorf("error creating server folder: %s. Error: %w", s.cfg.Path, err)
}
if err := filepath.Walk(s.cfg.Path,
func(path string, info os.FileInfo, err error) error {
if info.Mode().IsDir() {
s.log.Debug().Str("folder", path).Msg("adding new folder")
return w.Add(path)
}
return nil
}); err != nil {
return err
}
s.fw = w
go func() {
if err := s.makeMagnet(); err != nil {
s.updateState(ERROR)
s.log.Error().Err(err).Msg("error generating magnet on start")
}
s.watch()
}()
go func() {
for {
select {
case event, ok := <-w.Events:
if !ok {
return
}
s.log.Info().Str("file", event.Name).Str("op", event.Op.String()).Msg("file changed inside server folder")
atomic.AddUint64(&s.eventsCount, 1)
case err, ok := <-w.Errors:
if !ok {
return
}
s.updateState(STOPPED)
s.log.Error().Err(err).Msg("error watching server folder")
}
}
}()
s.log.Info().Msg("server folder started")
return nil
}
func (s *Server) watch() {
s.log.Info().Msg("starting watcher")
for range time.Tick(time.Second * 5) {
if s.eventsCount == 0 {
continue
}
ec := s.eventsCount
if err := s.makeMagnet(); err != nil {
s.updateState(ERROR)
s.log.Error().Err(err).Msg("error generating magnet")
}
atomic.AddUint64(&s.eventsCount, -ec)
}
}
func (s *Server) makeMagnet() error {
s.log.Info().Msg("starting serving new torrent")
info := metainfo.Info{
PieceLength: 2 << 18,
}
s.updateState(READING)
if err := info.BuildFromFilePath(s.cfg.Path); err != nil {
return err
}
s.updateState(UPDATING)
if len(info.Files) == 0 {
s.mu.Lock()
s.si.Magnet = ""
s.si.Folder = s.cfg.Path
s.si.Name = s.cfg.Name
s.si.UpdatedAt = time.Now().Unix()
s.mu.Unlock()
s.log.Info().Msg("not creating magnet from empty folder")
s.updateState(STOPPED)
return nil
}
mi := metainfo.MetaInfo{
InfoBytes: bencode.MustMarshal(info),
}
ih := mi.HashInfoBytes()
to, _ := s.c.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: ih,
Storage: storage.NewFileOpts(storage.NewFileClientOpts{
ClientBaseDir: s.cfg.Path,
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
return filepath.Join(opts.File.Path...)
},
TorrentDirMaker: nil,
PieceCompletion: s.pc,
}),
})
tks := s.trackers()
err := to.MergeSpec(&torrent.TorrentSpec{
InfoBytes: mi.InfoBytes,
Trackers: [][]string{tks},
})
if err != nil {
return err
}
m := metainfo.Magnet{
InfoHash: ih,
DisplayName: s.cfg.Name,
Trackers: tks,
}
s.mu.Lock()
s.t = to
s.si.Magnet = m.String()
s.si.Folder = s.cfg.Path
s.si.Name = s.cfg.Name
s.si.UpdatedAt = time.Now().Unix()
s.mu.Unlock()
s.updateState(SEEDING)
s.log.Info().Str("hash", ih.HexString()).Msg("new torrent is ready")
return nil
}
func (s *Server) updateState(ss ServerState) {
s.mu.Lock()
s.si.State = ss.String()
s.mu.Unlock()
}
func (s *Server) trackers() []string {
// TODO load trackers from URL too
return s.cfg.Trackers
}
func (s *Server) Close() error {
if s.fw == nil {
return nil
}
return s.fw.Close()
}
func (s *Server) Info() *ServerInfo {
s.mu.RLock()
defer s.mu.RUnlock()
if s.t != nil {
st := s.t.Stats()
s.si.Peers = st.TotalPeers
s.si.Seeds = st.ConnectedSeeders
}
return &s.si
}