2
0
mirror of synced 2025-02-24 14:48:27 +00:00

dht: Provide Announce, that combines get_peers and announce_peer

This commit is contained in:
Matt Joiner 2015-01-29 14:20:21 +11:00
parent 04e5880d09
commit b3380f1cee
3 changed files with 77 additions and 92 deletions

View File

@ -1855,7 +1855,7 @@ func (cl *Client) waitWantPeers(t *torrent) bool {
func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
for cl.waitWantPeers(t) { for cl.waitWantPeers(t) {
log.Printf("getting peers for %q from DHT", t) log.Printf("getting peers for %q from DHT", t)
ps, err := cl.dHT.GetPeers(string(t.InfoHash[:])) ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil { if err != nil {
log.Printf("error getting peers from dht: %s", err) log.Printf("error getting peers from dht: %s", err)
return return
@ -1900,22 +1900,6 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
ps.Close() ps.Close()
log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs)) log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
// After a GetPeers, we can announce on the best nodes that gave us an
// announce token.
port := cl.incomingPeerPort()
// If port is zero, then we're not listening, and there's nothing to
// announce.
if port != 0 {
// We can't allow the port to be implied as long as the UTP and
// DHT ports are different.
err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
if err != nil {
log.Printf("error announcing torrent to DHT: %s", err)
} else {
log.Printf("announced %q to DHT", t)
}
}
} }
} }

View File

@ -254,12 +254,23 @@ func (m Msg) ID() string {
return m[m["y"].(string)].(map[string]interface{})["id"].(string) return m[m["y"].(string)].(map[string]interface{})["id"].(string)
} }
func (m Msg) Nodes() []NodeInfo { // Suggested nodes in a response.
var r findNodeResponse func (m Msg) Nodes() (nodes []NodeInfo) {
if err := r.UnmarshalKRPCMsg(m); err != nil { b := func() string {
return nil defer func() {
recover()
}()
return m["r"].(map[string]interface{})["nodes"].(string)
}()
for i := 0; i < len(b); i += 26 {
var n NodeInfo
err := n.UnmarshalCompact([]byte(b[i : i+26]))
if err != nil {
continue
}
nodes = append(nodes, n)
} }
return r.Nodes return
} }
type KRPCError struct { type KRPCError struct {
@ -292,9 +303,10 @@ func (m Msg) Error() (ret *KRPCError) {
// Returns the token given in response to a get_peers request for future // Returns the token given in response to a get_peers request for future
// announce_peer requests to that node. // announce_peer requests to that node.
func (m Msg) AnnounceToken() string { func (m Msg) AnnounceToken() (token string, ok bool) {
defer func() { recover() }() defer func() { recover() }()
return m["r"].(map[string]interface{})["token"].(string) token, ok = m["r"].(map[string]interface{})["token"].(string)
return
} }
type transaction struct { type transaction struct {
@ -795,6 +807,9 @@ func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err
} }
func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) error { func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) error {
if port == 0 && !impliedPort {
return errors.New("nothing to announce")
}
t, err := s.query(node, "announce_peer", map[string]interface{}{ t, err := s.query(node, "announce_peer", map[string]interface{}{
"implied_port": func() int { "implied_port": func() int {
if impliedPort { if impliedPort {
@ -817,38 +832,6 @@ func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token str
return err return err
} }
type findNodeResponse struct {
Nodes []NodeInfo
}
func getResponseNodes(m Msg) (s string, err error) {
defer func() {
r := recover()
if r == nil {
return
}
err = fmt.Errorf("couldn't get response nodes: %s: %#v", r, m)
}()
s = m["r"].(map[string]interface{})["nodes"].(string)
return
}
func (me *findNodeResponse) UnmarshalKRPCMsg(m Msg) error {
b, err := getResponseNodes(m)
if err != nil {
return err
}
for i := 0; i < len(b); i += 26 {
var n NodeInfo
err := n.UnmarshalCompact([]byte(b[i : i+26]))
if err != nil {
return err
}
me.Nodes = append(me.Nodes, n)
}
return nil
}
func (t *transaction) setOnResponse(f func(m Msg)) { func (t *transaction) setOnResponse(f func(m Msg)) {
if t.onResponse != nil { if t.onResponse != nil {
panic(t.onResponse) panic(t.onResponse)
@ -861,23 +844,16 @@ func (s *Server) liftNodes(d Msg) {
if d["y"] != "r" { if d["y"] != "r" {
return return
} }
var r findNodeResponse for _, cni := range d.Nodes() {
err := r.UnmarshalKRPCMsg(d) if util.AddrPort(cni.Addr) == 0 {
if err != nil { // TODO: Why would people even do this?
// log.Print(err) continue
} else {
for _, cni := range r.Nodes {
if util.AddrPort(cni.Addr) == 0 {
// TODO: Why would people even do this?
continue
}
if s.ipBlocked(util.AddrIP(cni.Addr)) {
continue
}
n := s.getNode(cni.Addr)
n.SetIDFromBytes(cni.ID[:])
} }
// log.Printf("lifted %d nodes", len(r.Nodes)) if s.ipBlocked(util.AddrIP(cni.Addr)) {
continue
}
n := s.getNode(cni.Addr)
n.SetIDFromBytes(cni.ID[:])
} }
} }
@ -895,7 +871,9 @@ func (s *Server) findNode(addr dHTAddr, targetID string) (t *transaction, err er
return return
} }
func extractValues(m Msg) (vs []util.CompactPeer) { // In a get_peers response, the addresses of torrent clients involved with the
// queried info-hash.
func (m Msg) Values() (vs []util.CompactPeer) {
r, ok := m["r"] r, ok := m["r"]
if !ok { if !ok {
return return
@ -941,7 +919,10 @@ func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *transaction, err er
} }
t.setOnResponse(func(m Msg) { t.setOnResponse(func(m Msg) {
s.liftNodes(m) s.liftNodes(m)
s.getNode(addr).announceToken = m.AnnounceToken() at, ok := m.AnnounceToken()
if ok {
s.getNode(addr).announceToken = at
}
}) })
return return
} }

View File

@ -6,6 +6,8 @@ import (
"log" "log"
"time" "time"
"bitbucket.org/anacrolix/go.torrent/logonce"
"bitbucket.org/anacrolix/go.torrent/util" "bitbucket.org/anacrolix/go.torrent/util"
"bitbucket.org/anacrolix/sync" "bitbucket.org/anacrolix/sync"
"github.com/willf/bloom" "github.com/willf/bloom"
@ -13,13 +15,22 @@ import (
type peerDiscovery struct { type peerDiscovery struct {
*peerStream *peerStream
triedAddrs *bloom.BloomFilter triedAddrs *bloom.BloomFilter
pending int pending int
server *Server server *Server
infoHash string infoHash string
numContacted int
announcePort int
announcePortImplied bool
} }
func (s *Server) GetPeers(infoHash string) (*peerStream, error) { func (pd *peerDiscovery) NumContacted() int {
pd.mu.Lock()
defer pd.mu.Unlock()
return pd.numContacted
}
func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDiscovery, error) {
s.mu.Lock() s.mu.Lock()
startAddrs := func() (ret []dHTAddr) { startAddrs := func() (ret []dHTAddr) {
for _, n := range s.closestGoodNodes(160, infoHash) { for _, n := range s.closestGoodNodes(160, infoHash) {
@ -39,13 +50,15 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
} }
disc := &peerDiscovery{ disc := &peerDiscovery{
peerStream: &peerStream{ peerStream: &peerStream{
Values: make(chan peerStreamValue), Values: make(chan peerStreamValue, 100),
stop: make(chan struct{}), stop: make(chan struct{}),
values: make(chan peerStreamValue), values: make(chan peerStreamValue),
}, },
triedAddrs: bloom.NewWithEstimates(1000, 0.5), triedAddrs: bloom.NewWithEstimates(1000, 0.5),
server: s, server: s,
infoHash: infoHash, infoHash: infoHash,
announcePort: port,
announcePortImplied: impliedPort,
} }
// Function ferries from values to Values until discovery is halted. // Function ferries from values to Values until discovery is halted.
go func() { go func() {
@ -71,7 +84,7 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
disc.contact(addr) disc.contact(addr)
disc.mu.Unlock() disc.mu.Unlock()
} }
return disc.peerStream, nil return disc, nil
} }
func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) { func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) {
@ -89,6 +102,7 @@ func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) {
} }
func (me *peerDiscovery) contact(addr dHTAddr) { func (me *peerDiscovery) contact(addr dHTAddr) {
me.numContacted++
me.triedAddrs.Add([]byte(addr.String())) me.triedAddrs.Add([]byte(addr.String()))
if err := me.getPeers(addr); err != nil { if err := me.getPeers(addr); err != nil {
log.Printf("error sending get_peers request to %s: %s", addr, err) log.Printf("error sending get_peers request to %s: %s", addr, err)
@ -113,6 +127,13 @@ func (me *peerDiscovery) closingCh() chan struct{} {
return me.peerStream.stop return me.peerStream.stop
} }
func (me *peerDiscovery) announcePeer(to dHTAddr, token string) {
err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied)
if err != nil {
logonce.Stderr.Printf("error announcing peer: %s", err)
}
}
func (me *peerDiscovery) getPeers(addr dHTAddr) error { func (me *peerDiscovery) getPeers(addr dHTAddr) error {
me.server.mu.Lock() me.server.mu.Lock()
defer me.server.mu.Unlock() defer me.server.mu.Unlock()
@ -129,17 +150,12 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error {
me.responseNode(n) me.responseNode(n)
} }
me.mu.Unlock() me.mu.Unlock()
if vs := extractValues(m); vs != nil {
if vs := m.Values(); vs != nil {
nodeInfo := NodeInfo{ nodeInfo := NodeInfo{
Addr: t.remoteAddr, Addr: t.remoteAddr,
} }
id := func() string { copy(nodeInfo.ID[:], m.ID())
defer func() {
recover()
}()
return m["r"].(map[string]interface{})["id"].(string)
}()
copy(nodeInfo.ID[:], id)
select { select {
case me.peerStream.values <- peerStreamValue{ case me.peerStream.values <- peerStreamValue{
Peers: vs, Peers: vs,
@ -148,6 +164,10 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error {
case <-me.peerStream.stop: case <-me.peerStream.stop:
} }
} }
if at, ok := m.AnnounceToken(); ok {
me.announcePeer(addr, at)
}
case <-me.closingCh(): case <-me.closingCh():
} }
t.Close() t.Close()