255 lines
4.7 KiB
Go
255 lines
4.7 KiB
Go
package torrent
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/anacrolix/torrent"
|
|
"github.com/anacrolix/torrent/bencode"
|
|
"github.com/anacrolix/torrent/metainfo"
|
|
"github.com/anacrolix/torrent/storage"
|
|
"github.com/distribyted/distribyted/config"
|
|
"github.com/fsnotify/fsnotify"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type ServerState int
|
|
|
|
const (
|
|
UNKNOWN ServerState = iota
|
|
SEEDING
|
|
READING
|
|
UPDATING
|
|
STOPPED
|
|
ERROR
|
|
)
|
|
|
|
func (ss ServerState) String() string {
|
|
return [...]string{"Unknown", "Seeding", "Reading", "Updating", "Stopped", "Error"}[ss]
|
|
}
|
|
|
|
type ServerInfo struct {
|
|
Magnet string `json:"magnetUri"`
|
|
UpdatedAt int64 `json:"updatedAt"`
|
|
Name string `json:"name"`
|
|
Folder string `json:"folder"`
|
|
State string `json:"state"`
|
|
Peers int `json:"peers"`
|
|
Seeds int `json:"seeds"`
|
|
}
|
|
|
|
type Server struct {
|
|
cfg *config.Server
|
|
log zerolog.Logger
|
|
|
|
fw *fsnotify.Watcher
|
|
|
|
eventsCount uint64
|
|
|
|
c *torrent.Client
|
|
pc storage.PieceCompletion
|
|
|
|
mu sync.RWMutex
|
|
t *torrent.Torrent
|
|
si ServerInfo
|
|
}
|
|
|
|
func NewServer(c *torrent.Client, pc storage.PieceCompletion, cfg *config.Server) *Server {
|
|
l := log.Logger.With().Str("component", "server").Str("name", cfg.Name).Logger()
|
|
|
|
return &Server{
|
|
cfg: cfg,
|
|
log: l,
|
|
c: c,
|
|
pc: pc,
|
|
}
|
|
}
|
|
|
|
func (s *Server) Start() error {
|
|
s.log.Info().Msg("starting new server folder")
|
|
w, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := os.MkdirAll(s.cfg.Path, 0744); err != nil {
|
|
return fmt.Errorf("error creating server folder: %s. Error: %w", s.cfg.Path, err)
|
|
}
|
|
|
|
if err := filepath.Walk(s.cfg.Path,
|
|
func(path string, info os.FileInfo, err error) error {
|
|
if info.Mode().IsDir() {
|
|
s.log.Debug().Str("folder", path).Msg("adding new folder")
|
|
return w.Add(path)
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.fw = w
|
|
go func() {
|
|
if err := s.makeMagnet(); err != nil {
|
|
s.updateState(ERROR)
|
|
s.log.Error().Err(err).Msg("error generating magnet on start")
|
|
}
|
|
|
|
s.watch()
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case event, ok := <-w.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
s.log.Info().Str("file", event.Name).Str("op", event.Op.String()).Msg("file changed inside server folder")
|
|
atomic.AddUint64(&s.eventsCount, 1)
|
|
case err, ok := <-w.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
s.updateState(STOPPED)
|
|
s.log.Error().Err(err).Msg("error watching server folder")
|
|
}
|
|
}
|
|
}()
|
|
|
|
s.log.Info().Msg("server folder started")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) watch() {
|
|
s.log.Info().Msg("starting watcher")
|
|
for range time.Tick(time.Second * 5) {
|
|
if s.eventsCount == 0 {
|
|
continue
|
|
}
|
|
|
|
ec := s.eventsCount
|
|
if err := s.makeMagnet(); err != nil {
|
|
s.updateState(ERROR)
|
|
s.log.Error().Err(err).Msg("error generating magnet")
|
|
}
|
|
|
|
atomic.AddUint64(&s.eventsCount, -ec)
|
|
}
|
|
}
|
|
|
|
func (s *Server) makeMagnet() error {
|
|
|
|
s.log.Info().Msg("starting serving new torrent")
|
|
|
|
info := metainfo.Info{
|
|
PieceLength: 2 << 18,
|
|
}
|
|
|
|
s.updateState(READING)
|
|
|
|
if err := info.BuildFromFilePath(s.cfg.Path); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.updateState(UPDATING)
|
|
|
|
if len(info.Files) == 0 {
|
|
s.mu.Lock()
|
|
s.si.Magnet = ""
|
|
s.si.Folder = s.cfg.Path
|
|
s.si.Name = s.cfg.Name
|
|
s.si.UpdatedAt = time.Now().Unix()
|
|
s.mu.Unlock()
|
|
s.log.Info().Msg("not creating magnet from empty folder")
|
|
|
|
s.updateState(STOPPED)
|
|
return nil
|
|
}
|
|
|
|
mi := metainfo.MetaInfo{
|
|
InfoBytes: bencode.MustMarshal(info),
|
|
}
|
|
|
|
ih := mi.HashInfoBytes()
|
|
|
|
to, _ := s.c.AddTorrentOpt(torrent.AddTorrentOpts{
|
|
InfoHash: ih,
|
|
Storage: storage.NewFileOpts(storage.NewFileClientOpts{
|
|
ClientBaseDir: s.cfg.Path,
|
|
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
|
|
return filepath.Join(opts.File.Path...)
|
|
},
|
|
TorrentDirMaker: nil,
|
|
PieceCompletion: s.pc,
|
|
}),
|
|
})
|
|
|
|
tks := s.trackers()
|
|
|
|
err := to.MergeSpec(&torrent.TorrentSpec{
|
|
InfoBytes: mi.InfoBytes,
|
|
|
|
Trackers: [][]string{tks},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m := metainfo.Magnet{
|
|
InfoHash: ih,
|
|
DisplayName: s.cfg.Name,
|
|
Trackers: tks,
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.t = to
|
|
s.si.Magnet = m.String()
|
|
s.si.Folder = s.cfg.Path
|
|
s.si.Name = s.cfg.Name
|
|
s.si.UpdatedAt = time.Now().Unix()
|
|
s.mu.Unlock()
|
|
s.updateState(SEEDING)
|
|
|
|
s.log.Info().Str("hash", ih.HexString()).Msg("new torrent is ready")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) updateState(ss ServerState) {
|
|
s.mu.Lock()
|
|
s.si.State = ss.String()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *Server) trackers() []string {
|
|
// TODO load trackers from URL too
|
|
return s.cfg.Trackers
|
|
}
|
|
|
|
func (s *Server) Close() error {
|
|
if s.fw == nil {
|
|
return nil
|
|
}
|
|
return s.fw.Close()
|
|
}
|
|
|
|
func (s *Server) Info() *ServerInfo {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
if s.t != nil {
|
|
st := s.t.Stats()
|
|
s.si.Peers = st.TotalPeers
|
|
s.si.Seeds = st.ConnectedSeeders
|
|
}
|
|
|
|
return &s.si
|
|
}
|