Add dht AnnouncePeer

This commit is contained in:
Matt Joiner 2014-11-16 13:08:33 -06:00
parent 0b8ddd8720
commit 1460e835c2
1 changed files with 81 additions and 5 deletions

View File

@ -13,6 +13,8 @@ import (
"sync"
"time"
"bitbucket.org/anacrolix/go.torrent/logonce"
"bitbucket.org/anacrolix/go.torrent/util"
"github.com/nsf/libtorgo/bencode"
)
@ -25,6 +27,8 @@ type Server struct {
nodes map[string]*Node // Keyed by *net.UDPAddr.String().
mu sync.Mutex
closed chan struct{}
NumConfirmedAnnounces int
}
type ServerConfig struct {
@ -86,6 +90,7 @@ type Node struct {
id string
lastHeardFrom time.Time
lastSentTo time.Time
announceToken string
}
func (n *Node) NodeInfo() (ret NodeInfo) {
@ -123,6 +128,32 @@ func (m Msg) T() (t string) {
return
}
type KRPCError struct {
Code int
Msg string
}
func (me KRPCError) Error() string {
return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
}
var _ error = KRPCError{}
func (m Msg) Error() *KRPCError {
if m["y"] != "e" {
return nil
}
l := m["e"].([]interface{})
return &KRPCError{int(l[0].(int64)), l[1].(string)}
}
// Returns the token given in response to a get_peers request for future
// announce_peer requests to that node.
func (m Msg) AnnounceToken() string {
defer func() { recover() }()
return m["r"].(map[string]interface{})["token"].(string)
}
type transaction struct {
remoteAddr net.Addr
t string
@ -413,10 +444,10 @@ func (s *Server) timeoutTransaction(t *transaction) {
s.removeTransaction(t)
}
func (s *Server) query(node *net.UDPAddr, q string, a map[string]string) (t *transaction, err error) {
func (s *Server) query(node *net.UDPAddr, q string, a map[string]interface{}) (t *transaction, err error) {
tid := s.nextTransactionID()
if a == nil {
a = make(map[string]string, 1)
a = make(map[string]interface{}, 1)
}
a["id"] = s.IDString()
d := map[string]interface{}{
@ -488,6 +519,46 @@ func (s *Server) Ping(node *net.UDPAddr) (*transaction, error) {
return s.query(node, "ping", nil)
}
// Announce a local peer. This can only be done to nodes that gave us an
// announce token, which is received in responses during GetPeers. It's
// recommended then that GetPeers is called before this method.
func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
for _, node := range s.closestNodes(10000, infoHash, func(n *Node) bool {
return n.Good() && n.announceToken != ""
}) {
err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort)
if err != nil {
break
}
}
return
}
func (s *Server) announcePeer(node *net.UDPAddr, infoHash string, port int, token string, impliedPort bool) error {
t, err := s.query(node, "announce_peer", map[string]interface{}{
"implied_port": func() int {
if impliedPort {
return 1
} else {
return 0
}
}(),
"info_hash": infoHash,
"port": port,
"token": token,
})
t.setOnResponse(func(m Msg) {
if err := m.Error(); err != nil {
logonce.Stderr.Printf("announce_peer response: %s", err)
return
}
s.NumConfirmedAnnounces++
})
return err
}
type findNodeResponse struct {
Nodes []NodeInfo
}
@ -593,7 +664,7 @@ func (s *Server) liftNodes(d Msg) {
// Sends a find_node query to addr. targetID is the node we're looking for.
func (s *Server) findNode(addr *net.UDPAddr, targetID string) (t *transaction, err error) {
t, err = s.query(addr, "find_node", map[string]string{"target": targetID})
t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID})
if err != nil {
return
}
@ -704,12 +775,13 @@ func (s *Server) getPeers(addr *net.UDPAddr, infoHash string) (t *transaction, e
err = fmt.Errorf("infohash has bad length")
return
}
t, err = s.query(addr, "get_peers", map[string]string{"info_hash": infoHash})
t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash})
if err != nil {
return
}
t.setOnResponse(func(m Msg) {
s.liftNodes(m)
s.getNode(addr).announceToken = m.AnnounceToken()
})
return
}
@ -831,10 +903,14 @@ func idDistance(a, b string) (ret int) {
}
func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() })
}
func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node {
sel := newKClosestNodesSelector(k, targetID)
idNodes := make(map[string]*Node, len(s.nodes))
for _, node := range s.nodes {
if !node.Good() {
if !filter(node) {
continue
}
sel.Push(node.id)