p2p/discv5: fix multiple discovery issues (#16036)

* p2p/discv5: add query delay, fix node address update logic, retry refresh if empty

* p2p/discv5: remove unnecessary ping before topic query

* p2p/discv5: do not filter local address from topicNodes

* p2p/discv5: remove canQuery()

* p2p/discv5: gofmt
This commit is contained in:
Felföldi Zsolt 2018-02-08 18:06:31 +01:00 committed by Péter Szilágyi
parent 2b4c7e9b37
commit c4712bf96b
3 changed files with 37 additions and 27 deletions

View File

@ -565,11 +565,8 @@ loop:
if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil { if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil {
lookupChn <- net.ticketStore.radius[res.target.topic].converged lookupChn <- net.ticketStore.radius[res.target.topic].converged
} }
net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte { net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node, topic Topic) []byte {
net.ping(n, n.addr()) if n.state != nil && n.state.canQuery {
return n.pingEcho
}, func(n *Node, topic Topic) []byte {
if n.state == known {
return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
} else { } else {
if n.state == unknown { if n.state == unknown {
@ -633,15 +630,20 @@ loop:
} }
net.refreshResp <- refreshDone net.refreshResp <- refreshDone
case <-refreshDone: case <-refreshDone:
log.Trace("<-net.refreshDone") log.Trace("<-net.refreshDone", "table size", net.tab.count)
refreshDone = nil if net.tab.count != 0 {
list := searchReqWhenRefreshDone refreshDone = nil
searchReqWhenRefreshDone = nil list := searchReqWhenRefreshDone
go func() { searchReqWhenRefreshDone = nil
for _, req := range list { go func() {
net.topicSearchReq <- req for _, req := range list {
} net.topicSearchReq <- req
}() }
}()
} else {
refreshDone = make(chan struct{})
net.refresh(refreshDone)
}
} }
} }
log.Trace("loop stopped") log.Trace("loop stopped")
@ -751,7 +753,15 @@ func (net *Network) internNodeFromNeighbours(sender *net.UDPAddr, rn rpcNode) (n
return n, err return n, err
} }
if !n.IP.Equal(rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP { if !n.IP.Equal(rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP {
err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n) if n.state == known {
// reject address change if node is known by us
err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n)
} else {
// accept otherwise; this will be handled nicer with signed ENRs
n.IP = rn.IP
n.UDP = rn.UDP
n.TCP = rn.TCP
}
} }
return n, err return n, err
} }

View File

@ -494,13 +494,13 @@ func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping
} }
} }
func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte, query func(n *Node, topic Topic) []byte) { func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) {
now := mclock.Now() now := mclock.Now()
for i, n := range nodes { for i, n := range nodes {
if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius { if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
if lookup.radiusLookup { if lookup.radiusLookup {
if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC { if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now} s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now}
} }
} // else { } // else {
if s.canQueryTopic(n, lookup.topic) { if s.canQueryTopic(n, lookup.topic) {

View File

@ -49,7 +49,7 @@ var (
// Timeouts // Timeouts
const ( const (
respTimeout = 500 * time.Millisecond respTimeout = 500 * time.Millisecond
sendTimeout = 500 * time.Millisecond queryDelay = 1000 * time.Millisecond
expiration = 20 * time.Second expiration = 20 * time.Second
ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP
@ -318,20 +318,20 @@ func (t *udp) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []by
func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) { func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
p := topicNodes{Echo: queryHash} p := topicNodes{Echo: queryHash}
if len(nodes) == 0 { var sent bool
t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p) for _, result := range nodes {
return if result.IP.Equal(t.net.tab.self.IP) || netutil.CheckRelayIP(remote.IP, result.IP) == nil {
} p.Nodes = append(p.Nodes, nodeToRPC(result))
for i, result := range nodes {
if netutil.CheckRelayIP(remote.IP, result.IP) != nil {
continue
} }
p.Nodes = append(p.Nodes, nodeToRPC(result)) if len(p.Nodes) == maxTopicNodes {
if len(p.Nodes) == maxTopicNodes || i == len(nodes)-1 {
t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p) t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
p.Nodes = p.Nodes[:0] p.Nodes = p.Nodes[:0]
sent = true
} }
} }
if !sent || len(p.Nodes) > 0 {
t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p)
}
} }
func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) { func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) {