2
0
mirror of synced 2025-02-24 14:48:27 +00:00

dht: Cap nodes to 10k, drop nodes that timeout if we're out of space

This commit is contained in:
Matt Joiner 2014-12-08 19:09:11 -06:00
parent 4be7b11a3b
commit 78eb25535d

View File

@ -23,6 +23,8 @@ import (
"github.com/anacrolix/libtorgo/bencode"
)
const maxNodes = 10000
// Uniquely identifies a transaction to us.
type transactionKey struct {
RemoteAddr string // host:port
@ -68,7 +70,7 @@ func (s *Server) Stats() (ss serverStats) {
s.mu.Lock()
defer s.mu.Unlock()
for _, n := range s.nodes {
if n.Good() {
if n.DefinitelyGood() {
ss.NumGoodNodes++
}
}
@ -150,7 +152,7 @@ func (n *Node) NodeInfo() (ret NodeInfo) {
return
}
func (n *Node) Good() bool {
func (n *Node) DefinitelyGood() bool {
if len(n.id) != 20 {
return false
}
@ -162,9 +164,6 @@ func (n *Node) Good() bool {
if n.lastSentQuery.Before(n.lastGotResponse) {
return true
}
if time.Now().Sub(n.lastSentQuery) >= 2*time.Minute {
return false
}
return true
}
@ -280,6 +279,11 @@ func (t *transaction) sendQuery() error {
}
func (t *transaction) timeout() {
go func() {
t.s.mu.Lock()
defer t.s.mu.Unlock()
t.s.nodeTimedOut(t.remoteAddr)
}()
t.close()
}
@ -287,6 +291,7 @@ func (t *transaction) close() {
if t.closing() {
return
}
t.queryPacket = nil
close(t.Response)
close(t.done)
t.timer.Stop()
@ -323,6 +328,7 @@ func (t *transaction) handleResponse(m Msg) {
if t.onResponse != nil {
t.onResponse(m)
}
t.queryPacket = nil
select {
case t.Response <- m:
default:
@ -388,7 +394,7 @@ func (s *Server) processPacket(b []byte, addr dHTAddr) {
return
}
}
log.Printf("%s: received bad krpc message: %s: %q", s, err, b)
log.Printf("%s: received bad krpc message from %s: %s: %q", s, addr, err, b)
}()
return
}
@ -544,10 +550,25 @@ func (s *Server) getNode(addr dHTAddr) (n *Node) {
n = &Node{
addr: addr,
}
s.nodes[addr.String()] = n
if len(s.nodes) < maxNodes {
s.nodes[addr.String()] = n
}
}
return
}
func (s *Server) nodeTimedOut(addr dHTAddr) {
node, ok := s.nodes[addr.String()]
if !ok {
return
}
if node.DefinitelyGood() {
return
}
if len(s.nodes) < maxNodes {
return
}
delete(s.nodes, addr.String())
}
func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
if list := s.ipBlockList; list != nil {
@ -682,7 +703,7 @@ func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err
s.mu.Lock()
defer s.mu.Unlock()
for _, node := range s.closestNodes(160, infoHash, func(n *Node) bool {
return n.Good() && n.announceToken != ""
return n.announceToken != ""
}) {
err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort)
if err != nil {
@ -950,7 +971,7 @@ func (s *Server) bootstrap() (err error) {
func (s *Server) numGoodNodes() (num int) {
for _, n := range s.nodes {
if n.Good() {
if n.DefinitelyGood() {
num++
}
}
@ -1107,7 +1128,7 @@ func idDistance(a, b string) (ret bigIntDistance) {
// }
func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() })
return s.closestNodes(k, targetID, func(n *Node) bool { return n.DefinitelyGood() })
}
func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node {