Server implementation. (#90)
* Server implementation. - Share the content of a folder as a magnet file. - Web interface with all data needed for sharing data. - New configuration to add several servers - Every time the content of the server folder is changed, the magnet file will be generated again. Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com> * Update dependencies Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com> * Use boltdb piece completion storage. Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com>
This commit is contained in:
parent
5d4e48f0f9
commit
ddda39b22a
15 changed files with 541 additions and 67 deletions
torrent
253
torrent/server.go
Normal file
253
torrent/server.go
Normal file
|
@ -0,0 +1,253 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/bencode"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/distribyted/distribyted/config"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type ServerState int
|
||||
|
||||
const (
|
||||
UNKNOWN ServerState = iota
|
||||
SEEDING
|
||||
READING
|
||||
UPDATING
|
||||
STOPPED
|
||||
ERROR
|
||||
)
|
||||
|
||||
func (ss ServerState) String() string {
|
||||
return [...]string{"Unknown", "Seeding", "Reading", "Updating", "Stopped", "Error"}[ss]
|
||||
}
|
||||
|
||||
type ServerInfo struct {
|
||||
Magnet string `json:"magnetUri"`
|
||||
UpdatedAt int64 `json:"updatedAt"`
|
||||
Name string `json:"name"`
|
||||
Folder string `json:"folder"`
|
||||
State string `json:"state"`
|
||||
Peers int `json:"peers"`
|
||||
Seeds int `json:"seeds"`
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
cfg *config.Server
|
||||
log zerolog.Logger
|
||||
|
||||
fw *fsnotify.Watcher
|
||||
|
||||
eventsCount uint64
|
||||
|
||||
c *torrent.Client
|
||||
pc storage.PieceCompletion
|
||||
|
||||
mu sync.RWMutex
|
||||
t *torrent.Torrent
|
||||
si ServerInfo
|
||||
}
|
||||
|
||||
func NewServer(c *torrent.Client, pc storage.PieceCompletion, cfg *config.Server) *Server {
|
||||
l := log.Logger.With().Str("component", "server").Str("name", cfg.Name).Logger()
|
||||
|
||||
return &Server{
|
||||
cfg: cfg,
|
||||
log: l,
|
||||
c: c,
|
||||
pc: pc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
s.log.Info().Msg("starting new server folder")
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(s.cfg.Path, 0744); err != nil {
|
||||
return fmt.Errorf("error creating server folder: %s. Error: %w", s.cfg.Path, err)
|
||||
}
|
||||
|
||||
if err := filepath.Walk(s.cfg.Path,
|
||||
func(path string, info os.FileInfo, err error) error {
|
||||
if info.Mode().IsDir() {
|
||||
s.log.Debug().Str("folder", path).Msg("adding new folder")
|
||||
return w.Add(path)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.fw = w
|
||||
|
||||
if err := s.makeMagnet(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go s.watch()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-w.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
s.log.Info().Str("file", event.Name).Str("op", event.Op.String()).Msg("file changed inside server folder")
|
||||
atomic.AddUint64(&s.eventsCount, 1)
|
||||
case err, ok := <-w.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
s.updateState(STOPPED)
|
||||
s.log.Error().Err(err).Msg("error watching server folder")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.log.Info().Msg("server folder started")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) watch() {
|
||||
s.log.Info().Msg("starting watcher")
|
||||
for range time.Tick(time.Second * 5) {
|
||||
if s.eventsCount == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
ec := s.eventsCount
|
||||
if err := s.makeMagnet(); err != nil {
|
||||
s.updateState(ERROR)
|
||||
s.log.Error().Err(err).Msg("error generating magnet")
|
||||
}
|
||||
|
||||
atomic.AddUint64(&s.eventsCount, -ec)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) makeMagnet() error {
|
||||
|
||||
s.log.Info().Msg("starting serving new torrent")
|
||||
|
||||
info := metainfo.Info{
|
||||
PieceLength: 1 << 8,
|
||||
}
|
||||
|
||||
s.updateState(READING)
|
||||
|
||||
if err := info.BuildFromFilePath(s.cfg.Path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.updateState(UPDATING)
|
||||
|
||||
if len(info.Files) == 0 {
|
||||
s.mu.Lock()
|
||||
s.si.Magnet = ""
|
||||
s.si.Folder = s.cfg.Path
|
||||
s.si.Name = s.cfg.Name
|
||||
s.si.UpdatedAt = time.Now().Unix()
|
||||
s.mu.Unlock()
|
||||
s.log.Info().Msg("not creating magnet from empty folder")
|
||||
|
||||
s.updateState(STOPPED)
|
||||
return nil
|
||||
}
|
||||
|
||||
mi := metainfo.MetaInfo{
|
||||
InfoBytes: bencode.MustMarshal(info),
|
||||
}
|
||||
|
||||
ih := mi.HashInfoBytes()
|
||||
|
||||
to, _ := s.c.AddTorrentOpt(torrent.AddTorrentOpts{
|
||||
InfoHash: ih,
|
||||
Storage: storage.NewFileOpts(storage.NewFileClientOpts{
|
||||
ClientBaseDir: s.cfg.Path,
|
||||
FilePathMaker: func(opts storage.FilePathMakerOpts) string {
|
||||
return filepath.Join(opts.File.Path...)
|
||||
},
|
||||
TorrentDirMaker: nil,
|
||||
PieceCompletion: s.pc,
|
||||
}),
|
||||
})
|
||||
|
||||
tks := s.trackers()
|
||||
|
||||
err := to.MergeSpec(&torrent.TorrentSpec{
|
||||
InfoBytes: mi.InfoBytes,
|
||||
|
||||
Trackers: [][]string{tks},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m := metainfo.Magnet{
|
||||
InfoHash: ih,
|
||||
DisplayName: s.cfg.Name,
|
||||
Trackers: tks,
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.t = to
|
||||
s.si.Magnet = m.String()
|
||||
s.si.Folder = s.cfg.Path
|
||||
s.si.Name = s.cfg.Name
|
||||
s.si.UpdatedAt = time.Now().Unix()
|
||||
s.mu.Unlock()
|
||||
s.updateState(SEEDING)
|
||||
|
||||
s.log.Info().Str("hash", ih.HexString()).Msg("new torrent is ready")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) updateState(ss ServerState) {
|
||||
s.mu.Lock()
|
||||
s.si.State = ss.String()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Server) trackers() []string {
|
||||
// TODO load trackers from URL too
|
||||
return s.cfg.Trackers
|
||||
}
|
||||
|
||||
func (s *Server) Close() error {
|
||||
if s.fw == nil {
|
||||
return nil
|
||||
}
|
||||
return s.fw.Close()
|
||||
}
|
||||
|
||||
func (s *Server) Info() *ServerInfo {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if s.t != nil {
|
||||
st := s.t.Stats()
|
||||
s.si.Peers = st.TotalPeers
|
||||
s.si.Seeds = st.ConnectedSeeders
|
||||
}
|
||||
|
||||
return &s.si
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue