2022-09-29 19:49:55 -04:00
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
2022-02-14 01:51:28 +01:00
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## Node Discovery Protocol v5
##
## Node discovery protocol implementation as per specification:
## https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md
##
## This node discovery protocol implementation uses the same underlying
## implementation of routing table as is also used for the discovery v4
## implementation, which is the same or similar as the one described in the
## original Kademlia paper:
## https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf
##
## This might not be the most optimal implementation for the node discovery
## protocol v5. Why?
##
## The Kademlia paper describes an implementation that starts off from one
## k-bucket, and keeps splitting the bucket as more nodes are discovered and
## added. The bucket splits only on the part of the binary tree where our own
## node its id belongs too (same prefix). Resulting eventually in a k-bucket per
## logarithmic distance (log base2 distance). Well, not really, as nodes with
## ids in the closer distance ranges will never be found. And because of this an
## optimisation is done where buckets will also split sometimes even if the
## nodes own id does not have the same prefix (this is to avoid creating highly
## unbalanced branches which would require longer lookups).
##
## Now, some implementations take a more simplified approach. They just create
## directly a bucket for each possible logarithmic distance (e.g. here 1->256).
## Some implementations also don't create buckets with logarithmic distance
## lower than a certain value (e.g. only 1/15th of the highest buckets),
## because the closer to the node (the lower the distance), the less chance
## there is to still find nodes.
##
## The discovery protocol v4 its `FindNode` call will request the k closest
## nodes. As does original Kademlia. This effectively puts the work at the node
## that gets the request. This node will have to check its buckets and gather
## the closest. Some implementations go over all the nodes in all the buckets
## for this (e.g. go-ethereum discovery v4). However, in our bucket splitting
## approach, this search is improved.
##
## In the discovery protocol v5 the `FindNode` call is changed and now the
## logarithmic distance is passed as parameter instead of the NodeId. And only
## nodes that match that logarithmic distance are allowed to be returned.
## This change was made to not put the trust at the requested node for selecting
## the closest nodes. To counter a possible (mistaken) difference in
## implementation, but more importantly for security reasons. See also:
## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-rationale.md#115-guard-against-kademlia-implementation-flaws
##
## The result is that in an implementation which just stores buckets per
## logarithmic distance, it simply needs to return the right bucket. In our
## split-bucket implementation, this cannot be done as such and thus the closest
2022-11-02 10:21:05 -06:00
## neighbors search is still done. And to do this, a reverse calculation of an
2022-02-14 01:51:28 +01:00
## id at given logarithmic distance is needed (which is why there is the
## `idAtDistance` proc). Next, nodes with invalid distances need to be filtered
## out to be compliant to the specification. This can most likely get further
2022-11-02 10:21:05 -06:00
## optimized, but it sounds likely better to switch away from the split-bucket
2022-02-14 01:51:28 +01:00
## approach. I believe that the main benefit it has is improved lookups
## (due to no unbalanced branches), and it looks like this will be negated by
## limiting the returned nodes to only the ones of the requested logarithmic
## distance for the `FindNode` call.
## This `FindNode` change in discovery v5 will also have an effect on the
## efficiency of the network. Work will be moved from the receiver of
## `FindNodes` to the requester. But this also means more network traffic,
## as less nodes will potentially be passed around per `FindNode` call, and thus
## more requests will be needed for a lookup (adding bandwidth and latency).
## This might be a concern for mobile devices.
{. push raises : [ Defect ] . }
import
2022-09-13 12:01:36 -06:00
std / [ tables , sets , options , math , sequtils , algorithm , strutils ] ,
2022-10-01 12:05:03 -04:00
stew / shims / net as stewNet ,
json_serialization / std / net ,
stew / [ base64 , endians2 , results ] ,
pkg / [ chronicles , chronicles / chronos_tools ] ,
pkg / chronos ,
pkg / stint ,
2022-11-02 10:21:05 -06:00
pkg / bearssl / rand ,
2022-10-01 12:05:03 -04:00
pkg / metrics
import " . " / [
messages ,
messages_encoding ,
node ,
routing_table ,
spr ,
random2 ,
ip_vote ,
nodes_verification ,
providers ,
transport ]
2022-02-14 01:51:28 +01:00
import nimcrypto except toHex
2022-10-01 12:05:03 -04:00
export options , results , node , spr , providers
2022-02-14 01:51:28 +01:00
declareCounter discovery_message_requests_outgoing ,
" Discovery protocol outgoing message requests " , labels = [ " response " ]
declareCounter discovery_message_requests_incoming ,
" Discovery protocol incoming message requests " , labels = [ " response " ]
declareCounter discovery_unsolicited_messages ,
" Discovery protocol unsolicited or timed-out messages "
declareCounter discovery_enr_auto_update ,
2022-03-07 01:19:49 +01:00
" Amount of discovery IP:port address SPR auto updates "
2022-02-14 01:51:28 +01:00
logScope :
topics = " discv5 "
const
2022-09-12 15:45:49 -06:00
Alpha = 3 ## Kademlia concurrency factor
LookupRequestLimit = 3 ## Amount of distances requested in a single Findnode
2022-02-14 01:51:28 +01:00
## message for a lookup or query
2022-09-12 15:45:49 -06:00
FindNodeResultLimit = 16 ## Maximum amount of SPRs in the total Nodes messages
2022-02-14 01:51:28 +01:00
## that will be processed
2022-09-12 15:45:49 -06:00
MaxNodesPerMessage = 3 ## Maximum amount of SPRs per individual Nodes message
RefreshInterval = 5 . minutes ## Interval of launching a random query to
2022-02-14 01:51:28 +01:00
## refresh the routing table.
2022-09-12 15:45:49 -06:00
RevalidateMax = 10000 ## Revalidation of a peer is done between 0 and this
2022-02-14 01:51:28 +01:00
## value in milliseconds
2022-09-12 15:45:49 -06:00
IpMajorityInterval = 5 . minutes ## Interval for checking the latest IP:Port
2022-03-07 01:19:49 +01:00
## majority and updating this when SPR auto update is set.
2022-09-12 15:45:49 -06:00
InitialLookups = 1 ## Amount of lookups done when populating the routing table
ResponseTimeout * = 4 . seconds ## timeout for the response of a request-response
MaxProvidersEntries * = 1_000_000 # one million records
MaxProvidersPerEntry * = 20 # providers per entry
2022-02-14 01:51:28 +01:00
## call
2022-09-13 12:01:36 -06:00
func shortLog * ( record : SignedPeerRecord ) : string =
## Returns compact string representation of ``SignedPeerRecord``.
##
result =
" peerId: " & record . data . peerId . shortLog & " , " &
" seqNo: " & $ record . data . seqNo & " , " &
" addresses: " & record . data . addresses . mapIt ( $ it . address ) . join ( " , " )
func shortLog * ( records : seq [ SignedPeerRecord ] ) : string =
## Returns compact string representation of a sequence of
## ``SignedPeerRecord``.
##
for r in records :
result & = " provider: " & r . shortLog
chronicles . formatIt ( SignedPeerRecord ) : shortLog ( it )
chronicles . formatIt ( seq [ SignedPeerRecord ] ) : shortLog ( it )
2022-02-14 01:51:28 +01:00
type
DiscoveryConfig * = object
tableIpLimits * : TableIpLimits
bitsPerHop * : int
Protocol * = ref object
localNode * : Node
2022-03-25 15:00:57 +11:00
privateKey : PrivateKey
2022-02-14 23:44:12 +01:00
transport * : Transport [ Protocol ] # exported for tests
2022-02-14 01:51:28 +01:00
routingTable * : RoutingTable
awaitedMessages : Table [ ( NodeId , RequestId ) , Future [ Option [ Message ] ] ]
2023-06-06 10:59:04 +02:00
# awaitedNodesMessages: Table[(NodeId, RequestId), (Future[DiscResult[seq[SignedPeerRecord]]], int, seq[SignedPeerRecord])] # for some reason DiscResult is not compiling here, needs to be expanded
2023-06-02 22:17:04 +02:00
awaitedNodesMessages : Table [ ( NodeId , RequestId ) , ( Future [ Result [ seq [ SignedPeerRecord ] , cstring ] ] , uint32 , seq [ SignedPeerRecord ] ) ]
2022-02-14 01:51:28 +01:00
refreshLoop : Future [ void ]
revalidateLoop : Future [ void ]
ipMajorityLoop : Future [ void ]
lastLookup : chronos . Moment
2022-03-07 01:19:49 +01:00
bootstrapRecords * : seq [ SignedPeerRecord ]
2022-02-14 01:51:28 +01:00
ipVote : IpVote
enrAutoUpdate : bool
talkProtocols * : Table [ seq [ byte ] , TalkProtocol ] # TODO: Table is a bit of
2022-11-02 10:21:05 -06:00
rng * : ref HmacDrbgContext
2022-09-29 19:49:55 -04:00
providers : ProvidersManager
2023-06-06 11:05:09 +02:00
valueStore : Table [ NodeId , seq [ byte ] ]
2022-02-14 01:51:28 +01:00
TalkProtocolHandler * = proc ( p : TalkProtocol , request : seq [ byte ] , fromId : NodeId , fromUdpAddress : Address ) : seq [ byte ]
{. gcsafe , raises : [ Defect ] . }
TalkProtocol * = ref object of RootObj
protocolHandler * : TalkProtocolHandler
DiscResult * [ T ] = Result [ T , cstring ]
const
defaultDiscoveryConfig * = DiscoveryConfig (
tableIpLimits : DefaultTableIpLimits ,
bitsPerHop : DefaultBitsPerHop )
proc addNode * ( d : Protocol , node : Node ) : bool =
## Add `Node` to discovery routing table.
##
## Returns true only when `Node` was added as a new entry to a bucket in the
## routing table.
if d . routingTable . addNode ( node ) = = Added :
return true
else :
return false
2022-03-07 01:19:49 +01:00
proc addNode * ( d : Protocol , r : SignedPeerRecord ) : bool =
## Add `Node` from a `SignedPeerRecord` to discovery routing table.
2022-02-14 01:51:28 +01:00
##
2022-03-07 01:19:49 +01:00
## Returns false only if no valid `Node` can be created from the `SignedPeerRecord` or
2022-02-14 01:51:28 +01:00
## on the conditions of `addNode` from a `Node`.
let node = newNode ( r )
if node . isOk ( ) :
return d . addNode ( node [ ] )
2022-03-07 01:19:49 +01:00
proc addNode * ( d : Protocol , spr : SprUri ) : bool =
## Add `Node` from a SPR URI to discovery routing table.
2022-02-14 01:51:28 +01:00
##
2022-03-07 01:19:49 +01:00
## Returns false if no valid SPR URI, or on the conditions of `addNode` from
## an `SignedPeerRecord`.
try :
var r : SignedPeerRecord
let res = r . fromURI ( spr )
if res :
return d . addNode ( r )
except Base64Error as e :
error " Base64 error decoding SPR URI " , error = e . msg
return false
2022-02-14 01:51:28 +01:00
proc getNode * ( d : Protocol , id : NodeId ) : Option [ Node ] =
## Get the node with id from the routing table.
d . routingTable . getNode ( id )
proc randomNodes * ( d : Protocol , maxAmount : int ) : seq [ Node ] =
## Get a `maxAmount` of random nodes from the local routing table.
d . routingTable . randomNodes ( maxAmount )
proc randomNodes * ( d : Protocol , maxAmount : int ,
pred : proc ( x : Node ) : bool {. gcsafe , noSideEffect . } ) : seq [ Node ] =
## Get a `maxAmount` of random nodes from the local routing table with the
## `pred` predicate function applied as filter on the nodes selected.
d . routingTable . randomNodes ( maxAmount , pred )
proc randomNodes * ( d : Protocol , maxAmount : int ,
enrField : ( string , seq [ byte ] ) ) : seq [ Node ] =
## Get a `maxAmount` of random nodes from the local routing table. The
## the nodes selected are filtered by provided `enrField`.
d . randomNodes ( maxAmount , proc ( x : Node ) : bool = x . record . contains ( enrField ) )
proc neighbours * ( d : Protocol , id : NodeId , k : int = BUCKET_SIZE ,
seenOnly = false ) : seq [ Node ] =
## Return up to k neighbours (closest node ids) of the given node id.
d . routingTable . neighbours ( id , k , seenOnly )
proc neighboursAtDistances * ( d : Protocol , distances : seq [ uint16 ] ,
k : int = BUCKET_SIZE , seenOnly = false ) : seq [ Node ] =
## Return up to k neighbours (closest node ids) at given distances.
d . routingTable . neighboursAtDistances ( distances , k , seenOnly )
proc nodesDiscovered * ( d : Protocol ) : int = d . routingTable . len
2022-03-25 15:00:57 +11:00
func privKey * ( d : Protocol ) : lent PrivateKey =
2022-02-14 01:51:28 +01:00
d . privateKey
2022-03-07 01:19:49 +01:00
func getRecord * ( d : Protocol ) : SignedPeerRecord =
## Get the SPR of the local node.
2022-02-14 01:51:28 +01:00
d . localNode . record
2022-09-12 16:22:56 -06:00
proc updateRecord * (
d : Protocol ,
spr : Option [ SignedPeerRecord ] = SignedPeerRecord . none ) : DiscResult [ void ] =
2022-02-14 01:51:28 +01:00
## Update the ENR of the local node with provided `enrFields` k:v pairs.
2022-09-12 16:22:56 -06:00
##
2022-03-07 01:19:49 +01:00
2022-09-12 16:22:56 -06:00
if spr . isSome :
let
newSpr = spr . get ( )
seqNo = d . localNode . record . seqNum
2022-09-29 19:49:55 -04:00
info " Updated discovery SPR " , uri = newSpr . toURI ( )
2022-09-12 16:22:56 -06:00
d . localNode . record = newSpr
d . localNode . record . data . seqNo = seqNo
? d . localNode . record . incSeqNo ( d . privateKey )
2022-03-07 01:19:49 +01:00
2022-02-14 01:51:28 +01:00
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
# we stored a handshake with in order to get that ENR updated?
2022-09-12 16:22:56 -06:00
ok ( )
2022-02-23 21:11:43 +01:00
proc sendResponse ( d : Protocol , dstId : NodeId , dstAddr : Address ,
message : SomeMessage , reqId : RequestId ) =
## send Response using the specifid reqId
d . transport . sendMessage ( dstId , dstAddr , encodeMessage ( message , reqId ) )
2022-02-14 01:51:28 +01:00
proc sendNodes ( d : Protocol , toId : NodeId , toAddr : Address , reqId : RequestId ,
nodes : openArray [ Node ] ) =
proc sendNodes ( d : Protocol , toId : NodeId , toAddr : Address ,
message : NodesMessage , reqId : RequestId ) {. nimcall . } =
trace " Respond message packet " , dstId = toId , address = toAddr ,
kind = MessageKind . nodes
2022-02-23 21:11:43 +01:00
d . sendResponse ( toId , toAddr , message , reqId )
2022-02-14 01:51:28 +01:00
if nodes . len = = 0 :
# In case of 0 nodes, a reply is still needed
2022-03-07 01:19:49 +01:00
d . sendNodes ( toId , toAddr , NodesMessage ( total : 1 , sprs : @ [ ] ) , reqId )
2022-02-14 01:51:28 +01:00
return
var message : NodesMessage
# TODO: Do the total calculation based on the max UDP packet size we want to
2022-03-07 01:19:49 +01:00
# send and the SPR size of all (max 16) nodes.
2022-02-14 01:51:28 +01:00
# Which UDP packet size to take? 1280? 576?
2022-09-12 15:45:49 -06:00
message . total = ceil ( nodes . len / MaxNodesPerMessage ) . uint32
2022-02-14 01:51:28 +01:00
for i in 0 .. < nodes . len :
2022-03-07 01:19:49 +01:00
message . sprs . add ( nodes [ i ] . record )
2022-09-12 15:45:49 -06:00
if message . sprs . len = = MaxNodesPerMessage :
2022-02-14 01:51:28 +01:00
d . sendNodes ( toId , toAddr , message , reqId )
2022-03-07 01:19:49 +01:00
message . sprs . setLen ( 0 )
2022-02-14 01:51:28 +01:00
2022-03-07 01:19:49 +01:00
if message . sprs . len ! = 0 :
2022-02-14 01:51:28 +01:00
d . sendNodes ( toId , toAddr , message , reqId )
proc handlePing ( d : Protocol , fromId : NodeId , fromAddr : Address ,
ping : PingMessage , reqId : RequestId ) =
2022-03-07 01:19:49 +01:00
let pong = PongMessage ( sprSeq : d . localNode . record . seqNum , ip : fromAddr . ip ,
2022-02-14 01:51:28 +01:00
port : fromAddr . port . uint16 )
trace " Respond message packet " , dstId = fromId , address = fromAddr ,
kind = MessageKind . pong
2022-02-23 21:11:43 +01:00
d . sendResponse ( fromId , fromAddr , pong , reqId )
2022-02-14 01:51:28 +01:00
proc handleFindNode ( d : Protocol , fromId : NodeId , fromAddr : Address ,
fn : FindNodeMessage , reqId : RequestId ) =
if fn . distances . len = = 0 :
d . sendNodes ( fromId , fromAddr , reqId , [ ] )
elif fn . distances . contains ( 0 ) :
# A request for our own record.
# It would be a weird request if there are more distances next to 0
# requested, so in this case lets just pass only our own. TODO: OK?
d . sendNodes ( fromId , fromAddr , reqId , [ d . localNode ] )
else :
# TODO: Still deduplicate also?
if fn . distances . all ( proc ( x : uint16 ) : bool = return x < = 256 ) :
d . sendNodes ( fromId , fromAddr , reqId ,
d . routingTable . neighboursAtDistances ( fn . distances , seenOnly = true ) )
else :
# At least one invalid distance, but the polite node we are, still respond
# with empty nodes.
d . sendNodes ( fromId , fromAddr , reqId , [ ] )
2022-03-17 01:29:01 +01:00
proc handleFindNodeFast ( d : Protocol , fromId : NodeId , fromAddr : Address ,
fnf : FindNodeFastMessage , reqId : RequestId ) =
d . sendNodes ( fromId , fromAddr , reqId ,
d . routingTable . neighbours ( fnf . target , seenOnly = true ) )
2022-04-07 14:02:12 -05:00
# TODO: if known, maybe we should add exact target even if not yet "seen"
2022-03-17 01:29:01 +01:00
2022-02-14 01:51:28 +01:00
proc handleTalkReq ( d : Protocol , fromId : NodeId , fromAddr : Address ,
talkreq : TalkReqMessage , reqId : RequestId ) =
let talkProtocol = d . talkProtocols . getOrDefault ( talkreq . protocol )
let talkresp =
if talkProtocol . isNil ( ) or talkProtocol . protocolHandler . isNil ( ) :
# Protocol identifier that is not registered and thus not supported. An
# empty response is send as per specification.
TalkRespMessage ( response : @ [ ] )
else :
TalkRespMessage ( response : talkProtocol . protocolHandler ( talkProtocol ,
talkreq . request , fromId , fromAddr ) )
trace " Respond message packet " , dstId = fromId , address = fromAddr ,
kind = MessageKind . talkresp
2022-02-23 21:11:43 +01:00
d . sendResponse ( fromId , fromAddr , talkresp , reqId )
2022-02-14 01:51:28 +01:00
2022-09-29 19:49:55 -04:00
proc addProviderLocal ( p : Protocol , cId : NodeId , prov : SignedPeerRecord ) {. async . } =
2022-09-13 12:01:36 -06:00
trace " adding provider to local db " , n = p . localNode , cId , prov
2022-09-12 15:45:49 -06:00
2022-09-29 19:49:55 -04:00
if ( let res = ( await p . providers . add ( cid , prov ) ) ; res . isErr ) :
trace " Unable to add provider " , cid , peerId = prov . data . peerId
2022-03-10 17:02:24 +11:00
2022-09-12 15:45:49 -06:00
proc handleAddProvider (
d : Protocol ,
fromId : NodeId ,
fromAddr : Address ,
addProvider : AddProviderMessage ,
reqId : RequestId ) =
2022-09-29 19:49:55 -04:00
asyncSpawn d . addProviderLocal ( addProvider . cId , addProvider . prov )
2022-03-10 17:02:24 +11:00
2022-09-13 12:01:36 -06:00
proc handleGetProviders (
d : Protocol ,
fromId : NodeId ,
fromAddr : Address ,
getProviders : GetProvidersMessage ,
2022-09-29 19:49:55 -04:00
reqId : RequestId ) {. async . } =
2022-03-10 17:02:24 +11:00
#TODO: add checks, add signed version
2022-09-29 19:49:55 -04:00
let
provs = await d . providers . get ( getProviders . cId )
if provs . isErr :
trace " Unable to get providers " , cid = getProviders . cId , err = provs . error . msg
return
2022-03-10 17:02:24 +11:00
2022-09-29 19:49:55 -04:00
##TODO: handle multiple messages
let response = ProvidersMessage ( total : 1 , provs : provs . get )
d . sendResponse ( fromId , fromAddr , response , reqId )
2022-03-10 17:02:24 +11:00
2023-06-06 11:05:09 +02:00
proc addValueLocal ( p : Protocol , cId : NodeId , value : seq [ byte ] ) {. async . } =
trace " adding value to local db " , n = p . localNode , cId , value
p . valueStore [ cId ] = value
proc handleAddValue (
d : Protocol ,
fromId : NodeId ,
fromAddr : Address ,
addValue : AddValueMessage ,
reqId : RequestId ) =
asyncSpawn d . addValueLocal ( addValue . cId , addValue . value )
proc handleGetValue (
d : Protocol ,
fromId : NodeId ,
fromAddr : Address ,
getValue : GetValueMessage ,
reqId : RequestId ) {. async . } =
try :
let value = d . valueStore [ getValue . cId ]
trace " retrieved value from local db " , n = d . localNode , cID = getValue . cId , value
##TODO: handle multiple messages?
let response = ValueMessage ( value : value )
d . sendResponse ( fromId , fromAddr , response , reqId )
except KeyError :
# should we respond here? I would say so
trace " no value in local db " , n = d . localNode , cID = getValue . cId
# TODO: add noValue response
2022-02-14 01:51:28 +01:00
proc handleMessage ( d : Protocol , srcId : NodeId , fromAddr : Address ,
message : Message ) =
case message . kind
of ping :
discovery_message_requests_incoming . inc ( )
d . handlePing ( srcId , fromAddr , message . ping , message . reqId )
of findNode :
discovery_message_requests_incoming . inc ( )
d . handleFindNode ( srcId , fromAddr , message . findNode , message . reqId )
2022-03-17 01:29:01 +01:00
of findNodeFast :
discovery_message_requests_incoming . inc ( )
d . handleFindNodeFast ( srcId , fromAddr , message . findNodeFast , message . reqId )
2022-02-14 01:51:28 +01:00
of talkReq :
discovery_message_requests_incoming . inc ( )
d . handleTalkReq ( srcId , fromAddr , message . talkReq , message . reqId )
2022-03-10 17:02:24 +11:00
of addProvider :
discovery_message_requests_incoming . inc ( )
discovery_message_requests_incoming . inc ( labelValues = [ " no_response " ] )
d . handleAddProvider ( srcId , fromAddr , message . addProvider , message . reqId )
of getProviders :
discovery_message_requests_incoming . inc ( )
2022-09-29 19:49:55 -04:00
asyncSpawn d . handleGetProviders ( srcId , fromAddr , message . getProviders , message . reqId )
2023-06-06 11:05:09 +02:00
of addValue :
discovery_message_requests_incoming . inc ( )
#discovery_message_requests_incoming.inc(labelValues = ["no_response"])
d . handleAddValue ( srcId , fromAddr , message . addValue , message . reqId )
of getValue :
discovery_message_requests_incoming . inc ( )
asyncSpawn d . handleGetValue ( srcId , fromAddr , message . getValue , message . reqId )
2022-02-14 01:51:28 +01:00
of regTopic , topicQuery :
discovery_message_requests_incoming . inc ( )
discovery_message_requests_incoming . inc ( labelValues = [ " no_response " ] )
trace " Received unimplemented message kind " , kind = message . kind ,
origin = fromAddr
2023-06-02 22:17:04 +02:00
of nodes :
trace " node-response message received "
var sprs = message . nodes . sprs
let total = message . nodes . total
trace " waiting for more nodes messages " , me = d . localNode , srcId , total
try :
var ( waiter , cnt , s ) = d . awaitedNodesMessages [ ( srcId , message . reqId ) ]
cnt + = 1
s . add ( sprs )
d . awaitedNodesMessages [ ( srcId , message . reqId ) ] = ( waiter , cnt , s )
trace " nodes collected " , me = d . localNode , srcId , cnt , s
if cnt = = total :
d . awaitedNodesMessages . del ( ( srcId , message . reqId ) )
trace " all nodes responses received " , me = d . localNode , srcId
waiter . complete ( DiscResult [ seq [ SignedPeerRecord ] ] . ok ( s ) )
except KeyError :
discovery_unsolicited_messages . inc ( )
warn " Timed out or unrequested message " , kind = message . kind ,
origin = fromAddr
2022-02-14 01:51:28 +01:00
else :
var waiter : Future [ Option [ Message ] ]
if d . awaitedMessages . take ( ( srcId , message . reqId ) , waiter ) :
waiter . complete ( some ( message ) )
else :
discovery_unsolicited_messages . inc ( )
trace " Timed out or unrequested message " , kind = message . kind ,
origin = fromAddr
proc registerTalkProtocol * ( d : Protocol , protocolId : seq [ byte ] ,
protocol : TalkProtocol ) : DiscResult [ void ] =
# Currently allow only for one handler per talk protocol.
if d . talkProtocols . hasKeyOrPut ( protocolId , protocol ) :
err ( " Protocol identifier already registered " )
else :
ok ( )
proc replaceNode ( d : Protocol , n : Node ) =
if n . record notin d . bootstrapRecords :
d . routingTable . replaceNode ( n )
else :
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the routing table.
2022-03-18 11:02:16 +01:00
debug " Message request to bootstrap node failed " , src = d . localNode , dst = n
2022-02-14 01:51:28 +01:00
2023-05-10 21:38:57 +02:00
proc sendRequest * [ T : SomeMessage ] ( d : Protocol , toId : NodeId , toAddr : Address , m : T ,
reqId : RequestId ) =
let
message = encodeMessage ( m , reqId )
trace " Send message packet " , dstId = toId , toAddr , kind = messageKind ( T )
discovery_message_requests_outgoing . inc ( )
d . transport . sendMessage ( toId , toAddr , message )
proc sendRequest * [ T : SomeMessage ] ( d : Protocol , toNode : Node , m : T ,
reqId : RequestId ) =
doAssert ( toNode . address . isSome ( ) )
let
message = encodeMessage ( m , reqId )
trace " Send message packet " , dstId = toNode . id ,
address = toNode . address , kind = messageKind ( T )
discovery_message_requests_outgoing . inc ( )
d . transport . sendMessage ( toNode , message )
2023-05-10 21:31:18 +02:00
proc waitResponse * [ T : SomeMessage ] ( d : Protocol , node : Node , msg : T ) :
Future [ Option [ Message ] ] =
let reqId = RequestId . init ( d . rng [ ] )
result = d . waitMessage ( node , reqId )
sendRequest ( d , node , msg , reqId )
2022-02-14 01:51:28 +01:00
2023-05-17 08:08:12 +02:00
proc waitMessage ( d : Protocol , fromNode : Node , reqId : RequestId , timeout = ResponseTimeout ) :
2022-02-14 01:51:28 +01:00
Future [ Option [ Message ] ] =
result = newFuture [ Option [ Message ] ] ( " waitMessage " )
let res = result
let key = ( fromNode . id , reqId )
2023-05-17 08:08:12 +02:00
sleepAsync ( timeout ) . addCallback ( ) do ( data : pointer ) :
2022-02-14 01:51:28 +01:00
d . awaitedMessages . del ( key )
if not res . finished :
res . complete ( none ( Message ) )
d . awaitedMessages [ key ] = result
2023-05-10 21:31:18 +02:00
proc waitNodeResponses * [ T : SomeMessage ] ( d : Protocol , node : Node , msg : T ) :
Future [ DiscResult [ seq [ SignedPeerRecord ] ] ] =
let reqId = RequestId . init ( d . rng [ ] )
result = d . waitNodes ( node , reqId )
sendRequest ( d , node , msg , reqId )
2023-06-02 22:17:04 +02:00
proc waitNodes ( d : Protocol , fromNode : Node , reqId : RequestId , timeout = ResponseTimeout ) :
Future [ DiscResult [ seq [ SignedPeerRecord ] ] ] =
2022-02-14 01:51:28 +01:00
## Wait for one or more nodes replies.
##
## The first reply will hold the total number of replies expected, and based
## on that, more replies will be awaited.
## If one reply is lost here (timed out), others are ignored too.
## Same counts for out of order receival.
2023-06-02 22:17:04 +02:00
## TODO: these are VERY optimistic assumptions here. We need a short timeout if we collect
result = newFuture [ DiscResult [ seq [ SignedPeerRecord ] ] ] ( " waitNodesMessages " )
let res = result
let key = ( fromNode . id , reqId )
sleepAsync ( timeout ) . addCallback ( ) do ( data : pointer ) :
d . awaitedNodesMessages . del ( key )
if not res . finished :
res . complete ( DiscResult [ seq [ SignedPeerRecord ] ] . err ( " waitNodeMessages timed out " ) )
d . awaitedNodesMessages [ key ] = ( result , 0 . uint32 , newSeq [ SignedPeerRecord ] ( ) )
2022-02-14 01:51:28 +01:00
proc ping * ( d : Protocol , toNode : Node ) :
Future [ DiscResult [ PongMessage ] ] {. async . } =
## Send a discovery ping message.
##
## Returns the received pong message or an error.
2023-05-10 21:31:18 +02:00
let
msg = PingMessage ( sprSeq : d . localNode . record . seqNum )
resp = await d . waitResponse ( toNode , msg )
2022-02-14 01:51:28 +01:00
if resp . isSome ( ) :
if resp . get ( ) . kind = = pong :
d . routingTable . setJustSeen ( toNode )
return ok ( resp . get ( ) . pong )
else :
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " invalid_response " ] )
return err ( " Invalid response to ping message " )
else :
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " no_response " ] )
return err ( " Pong message not received in time " )
proc findNode * ( d : Protocol , toNode : Node , distances : seq [ uint16 ] ) :
Future [ DiscResult [ seq [ Node ] ] ] {. async . } =
2022-03-17 01:29:01 +01:00
## Send a getNeighbours message.
2022-02-14 01:51:28 +01:00
##
## Returns the received nodes or an error.
2022-03-07 01:19:49 +01:00
## Received SPRs are already validated and converted to `Node`.
2023-05-10 21:31:18 +02:00
let
msg = FindNodeMessage ( distances : distances )
nodes = await d . waitNodeResponses ( toNode , msg )
2022-02-14 01:51:28 +01:00
if nodes . isOk :
2022-09-12 15:45:49 -06:00
let res = verifyNodesRecords ( nodes . get ( ) , toNode , FindNodeResultLimit , distances )
2022-02-14 01:51:28 +01:00
d . routingTable . setJustSeen ( toNode )
return ok ( res )
else :
d . replaceNode ( toNode )
return err ( nodes . error )
2022-03-17 01:29:01 +01:00
proc findNodeFast * ( d : Protocol , toNode : Node , target : NodeId ) :
Future [ DiscResult [ seq [ Node ] ] ] {. async . } =
## Send a findNode message.
##
## Returns the received nodes or an error.
## Received SPRs are already validated and converted to `Node`.
2023-05-10 21:31:18 +02:00
let
msg = FindNodeFastMessage ( target : target )
nodes = await d . waitNodeResponses ( toNode , msg )
2022-03-17 01:29:01 +01:00
if nodes . isOk :
2022-09-12 15:45:49 -06:00
let res = verifyNodesRecords ( nodes . get ( ) , toNode , FindNodeResultLimit )
2022-03-17 01:29:01 +01:00
d . routingTable . setJustSeen ( toNode )
return ok ( res )
else :
d . replaceNode ( toNode )
return err ( nodes . error )
2022-02-14 01:51:28 +01:00
proc talkReq * ( d : Protocol , toNode : Node , protocol , request : seq [ byte ] ) :
Future [ DiscResult [ seq [ byte ] ] ] {. async . } =
## Send a discovery talkreq message.
##
## Returns the received talkresp message or an error.
2023-05-10 21:31:18 +02:00
let
msg = TalkReqMessage ( protocol : protocol , request : request )
resp = await d . waitResponse ( toNode , msg )
2022-02-14 01:51:28 +01:00
if resp . isSome ( ) :
if resp . get ( ) . kind = = talkResp :
d . routingTable . setJustSeen ( toNode )
return ok ( resp . get ( ) . talkResp . response )
else :
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " invalid_response " ] )
return err ( " Invalid response to talk request message " )
else :
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " no_response " ] )
return err ( " Talk response message not received in time " )
proc lookupDistances * ( target , dest : NodeId ) : seq [ uint16 ] =
let td = logDistance ( target , dest )
let tdAsInt = int ( td )
result . add ( td )
var i = 1
2022-09-12 15:45:49 -06:00
while result . len < LookupRequestLimit :
2022-02-14 01:51:28 +01:00
if tdAsInt + i < 256 :
result . add ( td + uint16 ( i ) )
if tdAsInt - i > 0 :
result . add ( td - uint16 ( i ) )
inc i
proc lookupWorker ( d : Protocol , destNode : Node , target : NodeId ) :
Future [ seq [ Node ] ] {. async . } =
let dists = lookupDistances ( target , destNode . id )
2022-09-12 15:45:49 -06:00
# Instead of doing max `LookupRequestLimit` findNode requests, make use
2022-02-14 01:51:28 +01:00
# of the discv5.1 functionality to request nodes for multiple distances.
let r = await d . findNode ( destNode , dists )
if r . isOk :
result . add ( r [ ] )
# Attempt to add all nodes discovered
for n in result :
discard d . addNode ( n )
2022-03-17 01:29:01 +01:00
proc lookupWorkerFast ( d : Protocol , destNode : Node , target : NodeId ) :
Future [ seq [ Node ] ] {. async . } =
## use terget NodeId based find_node
let r = await d . findNodeFast ( destNode , target )
if r . isOk :
result . add ( r [ ] )
# Attempt to add all nodes discovered
for n in result :
discard d . addNode ( n )
proc lookup * ( d : Protocol , target : NodeId , fast : bool = false ) : Future [ seq [ Node ] ] {. async . } =
2022-02-14 01:51:28 +01:00
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = d . routingTable . neighbours ( target , BUCKET_SIZE ,
seenOnly = false )
var asked , seen = initHashSet [ NodeId ] ( )
asked . incl ( d . localNode . id ) # No need to ask our own node
seen . incl ( d . localNode . id ) # No need to discover our own node
for node in closestNodes :
seen . incl ( node . id )
2022-09-12 15:45:49 -06:00
var pendingQueries = newSeqOfCap [ Future [ seq [ Node ] ] ] ( Alpha )
2022-02-14 01:51:28 +01:00
while true :
var i = 0
2022-09-12 15:45:49 -06:00
# Doing `Alpha` amount of requests at once as long as closer non queried
2022-02-14 01:51:28 +01:00
# nodes are discovered.
2022-09-12 15:45:49 -06:00
while i < closestNodes . len and pendingQueries . len < Alpha :
2022-02-14 01:51:28 +01:00
let n = closestNodes [ i ]
if not asked . containsOrIncl ( n . id ) :
2022-04-07 14:02:12 -05:00
if fast :
2022-03-17 01:29:01 +01:00
pendingQueries . add ( d . lookupWorkerFast ( n , target ) )
else :
pendingQueries . add ( d . lookupWorker ( n , target ) )
2022-02-14 01:51:28 +01:00
inc i
trace " discv5 pending queries " , total = pendingQueries . len
if pendingQueries . len = = 0 :
break
let query = await one ( pendingQueries )
trace " Got discv5 lookup query response "
let index = pendingQueries . find ( query )
if index ! = - 1 :
pendingQueries . del ( index )
else :
error " Resulting query should have been in the pending queries "
let nodes = query . read
# TODO: Remove node on timed-out query?
for n in nodes :
if not seen . containsOrIncl ( n . id ) :
# If it wasn't seen before, insert node while remaining sorted
closestNodes . insert ( n , closestNodes . lowerBound ( n ,
proc ( x : Node , n : Node ) : int =
cmp ( distance ( x . id , target ) , distance ( n . id , target ) )
) )
if closestNodes . len > BUCKET_SIZE :
closestNodes . del ( closestNodes . high ( ) )
d . lastLookup = now ( chronos . Moment )
return closestNodes
2022-03-10 17:02:24 +11:00
proc addProvider * (
d : Protocol ,
cId : NodeId ,
pr : SignedPeerRecord ) : Future [ seq [ Node ] ] {. async . } =
var res = await d . lookup ( cId )
trace " lookup returned: " , res
2022-10-03 15:14:47 -06:00
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
2022-03-10 17:02:24 +11:00
if res . len = = 0 :
res . add ( d . localNode )
for toNode in res :
if toNode ! = d . localNode :
2023-05-10 21:31:18 +02:00
let reqId = RequestId . init ( d . rng [ ] )
d . sendRequest ( toNode , AddProviderMessage ( cId : cId , prov : pr ) , reqId )
2022-03-10 17:02:24 +11:00
else :
2022-09-29 19:49:55 -04:00
asyncSpawn d . addProviderLocal ( cId , pr )
2022-03-10 17:02:24 +11:00
return res
proc sendGetProviders ( d : Protocol , toNode : Node ,
cId : NodeId ) : Future [ DiscResult [ ProvidersMessage ] ]
{. async . } =
let msg = GetProvidersMessage ( cId : cId )
trace " sendGetProviders " , toNode , msg
let
2023-05-10 21:31:18 +02:00
resp = await d . waitResponse ( toNode , msg )
2022-03-10 17:02:24 +11:00
if resp . isSome ( ) :
2022-10-01 12:05:03 -04:00
if resp . get ( ) . kind = = MessageKind . providers :
2022-03-10 17:02:24 +11:00
d . routingTable . setJustSeen ( toNode )
return ok ( resp . get ( ) . provs )
else :
# TODO: do we need to do something when there is an invalid response?
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " invalid_response " ] )
return err ( " Invalid response to GetProviders message " )
else :
# TODO: do we need to do something when there is no response?
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " no_response " ] )
return err ( " GetProviders response message not received in time " )
proc getProvidersLocal * (
d : Protocol ,
cId : NodeId ,
maxitems : int = 5 ,
2022-09-29 19:49:55 -04:00
) : Future [ seq [ SignedPeerRecord ] ] {. async . } =
2022-03-10 17:02:24 +11:00
2022-09-29 19:49:55 -04:00
let provs = await d . providers . get ( cId )
if provs . isErr :
trace " Unable to get local providers " , cId , err = provs . error . msg
return provs . get
2022-03-10 17:02:24 +11:00
2022-10-03 15:14:47 -06:00
proc removeProvidersLocal * (
d : Protocol ,
peerId : PeerId ) {. async . } =
trace " Removing local provider " , peerId
if (
let res = await d . providers . remove ( peerId ) ;
res . isErr ) :
trace " Error removing provider " , err = res . error . msg
2022-03-10 17:02:24 +11:00
proc getProviders * (
d : Protocol ,
cId : NodeId ,
maxitems : int = 5 ,
2022-10-01 12:05:03 -04:00
timeout : Duration = 5000 . milliseconds
2022-03-10 17:02:24 +11:00
) : Future [ DiscResult [ seq [ SignedPeerRecord ] ] ] {. async . } =
# What providers do we know about?
2022-09-29 19:49:55 -04:00
var res = await d . getProvidersLocal ( cId , maxitems )
2022-09-13 12:01:36 -06:00
trace " local providers: " , prov = res . mapIt ( it )
2022-03-10 17:02:24 +11:00
let nodesNearby = await d . lookup ( cId )
trace " nearby: " , nodesNearby
var providersFut : seq [ Future [ DiscResult [ ProvidersMessage ] ] ]
for n in nodesNearby :
if n ! = d . localNode :
providersFut . add ( d . sendGetProviders ( n , cId ) )
while providersFut . len > 0 :
let providersMsg = await one ( providersFut )
# trace "Got providers response", providersMsg
let index = providersFut . find ( providersMsg )
if index ! = - 1 :
providersFut . del ( index )
let providersMsg2 = await providersMsg
let providersMsgRes = providersMsg . read
if providersMsgRes . isOk :
let providers = providersMsgRes . get . provs
res = res . concat ( providers ) . deduplicate
else :
error " Sending of GetProviders message failed " , error = providersMsgRes . error
# TODO: should we consider this as an error result if all GetProviders
# requests fail??
2022-10-03 15:14:47 -06:00
trace " getProviders collected: " , res = res . mapIt ( it . data )
2022-03-10 17:02:24 +11:00
return ok res
2023-06-06 11:05:09 +02:00
proc addValue * (
d : Protocol ,
cId : NodeId ,
value : seq [ byte ] ) : Future [ seq [ Node ] ] {. async . } =
var res = await d . lookup ( cId )
trace " lookup returned: " , res
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
if res . len = = 0 :
res . add ( d . localNode )
for toNode in res :
if toNode ! = d . localNode :
let reqId = RequestId . init ( d . rng [ ] )
d . sendRequest ( toNode , AddValueMessage ( cId : cId , value : value ) , reqId )
else :
asyncSpawn d . addValueLocal ( cId , value )
return res
proc sendGetValue ( d : Protocol , toNode : Node ,
cId : NodeId ) : Future [ DiscResult [ ValueMessage ] ]
{. async . } =
let msg = GetValueMessage ( cId : cId )
trace " sendGetValue " , toNode , msg
let
resp = await d . waitResponse ( toNode , msg )
if resp . isSome ( ) :
if resp . get ( ) . kind = = MessageKind . respValue :
d . routingTable . setJustSeen ( toNode )
return ok ( resp . get ( ) . value )
else :
# TODO: do we need to do something when there is an invalid response?
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " invalid_response " ] )
return err ( " Invalid response to GetValue message " )
else :
# TODO: do we need to do something when there is no response?
d . replaceNode ( toNode )
discovery_message_requests_outgoing . inc ( labelValues = [ " no_response " ] )
return err ( " GetValue response message not received in time " )
proc getValue * (
d : Protocol ,
cId : NodeId ,
timeout : Duration = 5000 . milliseconds # TODO: not used?
) : Future [ DiscResult [ seq [ byte ] ] ] {. async . } =
# # What value do we know about?
# var res = await d.getProvidersLocal(cId, maxitems)
# trace "local providers:", prov = res.mapIt(it)
let nodesNearby = await d . lookup ( cId )
trace " nearby: " , nodesNearby
var providersFut : seq [ Future [ DiscResult [ ValueMessage ] ] ]
for n in nodesNearby :
if n ! = d . localNode :
providersFut . add ( d . sendGetValue ( n , cId ) )
while providersFut . len > 0 :
let providersMsg = await one ( providersFut )
# trace "Got providers response", providersMsg
let index = providersFut . find ( providersMsg )
if index ! = - 1 :
providersFut . del ( index )
let providersMsg2 = await providersMsg
let providersMsgRes = providersMsg . read
if providersMsgRes . isOk :
let value = providersMsgRes . get . value
var res = value
# TODO: validate result before accepting as the right one
# TODO: cancel pending futures!
return ok res
else :
2023-06-13 18:35:43 +02:00
debug " Sending of GetValue message failed " , error = providersMsgRes . error
2023-06-06 11:05:09 +02:00
# TODO: should we consider this as an error result if all GetProviders
# requests fail??
trace " getValue returned no result " , cId
return err " getValue failed "
2022-02-14 01:51:28 +01:00
proc query * ( d : Protocol , target : NodeId , k = BUCKET_SIZE ) : Future [ seq [ Node ] ]
{. async . } =
## Query k nodes for the given target, returns all nodes found, including the
## nodes queried.
##
## This will take k nodes from the routing table closest to target and
## query them for nodes closest to target. If there are less than k nodes in
## the routing table, nodes returned by the first queries will be used.
var queryBuffer = d . routingTable . neighbours ( target , k , seenOnly = false )
var asked , seen = initHashSet [ NodeId ] ( )
asked . incl ( d . localNode . id ) # No need to ask our own node
seen . incl ( d . localNode . id ) # No need to discover our own node
for node in queryBuffer :
seen . incl ( node . id )
2022-09-12 15:45:49 -06:00
var pendingQueries = newSeqOfCap [ Future [ seq [ Node ] ] ] ( Alpha )
2022-02-14 01:51:28 +01:00
while true :
var i = 0
2022-09-12 15:45:49 -06:00
while i < min ( queryBuffer . len , k ) and pendingQueries . len < Alpha :
2022-02-14 01:51:28 +01:00
let n = queryBuffer [ i ]
if not asked . containsOrIncl ( n . id ) :
pendingQueries . add ( d . lookupWorker ( n , target ) )
inc i
trace " discv5 pending queries " , total = pendingQueries . len
if pendingQueries . len = = 0 :
break
let query = await one ( pendingQueries )
trace " Got discv5 lookup query response "
let index = pendingQueries . find ( query )
if index ! = - 1 :
pendingQueries . del ( index )
else :
error " Resulting query should have been in the pending queries "
let nodes = query . read
# TODO: Remove node on timed-out query?
for n in nodes :
if not seen . containsOrIncl ( n . id ) :
queryBuffer . add ( n )
d . lastLookup = now ( chronos . Moment )
return queryBuffer
proc queryRandom * ( d : Protocol ) : Future [ seq [ Node ] ] =
## Perform a query for a random target, return all nodes discovered.
d . query ( NodeId . random ( d . rng [ ] ) )
proc queryRandom * ( d : Protocol , enrField : ( string , seq [ byte ] ) ) :
Future [ seq [ Node ] ] {. async . } =
## Perform a query for a random target, return all nodes discovered which
## contain enrField.
let nodes = await d . queryRandom ( )
var filtered : seq [ Node ]
for n in nodes :
if n . record . contains ( enrField ) :
filtered . add ( n )
return filtered
proc resolve * ( d : Protocol , id : NodeId ) : Future [ Option [ Node ] ] {. async . } =
## Resolve a `Node` based on provided `NodeId`.
##
## This will first look in the own routing table. If the node is known, it
## will try to contact if for newer information. If node is not known or it
## does not reply, a lookup is done to see if it can find a (newer) record of
## the node on the network.
if id = = d . localNode . id :
return some ( d . localNode )
let node = d . getNode ( id )
if node . isSome ( ) :
let request = await d . findNode ( node . get ( ) , @ [ 0 'u 16 ] )
# TODO: Handle failures better. E.g. stop on different failures than timeout
if request . isOk ( ) and request [ ] . len > 0 :
return some ( request [ ] [ 0 ] )
let discovered = await d . lookup ( id )
for n in discovered :
if n . id = = id :
if node . isSome ( ) and node . get ( ) . record . seqNum > = n . record . seqNum :
return node
else :
return some ( n )
return node
proc seedTable * ( d : Protocol ) =
## Seed the table with known nodes.
for record in d . bootstrapRecords :
if d . addNode ( record ) :
debug " Added bootstrap node " , uri = toURI ( record )
else :
debug " Bootstrap node could not be added " , uri = toURI ( record )
# TODO:
# Persistent stored nodes could be added to seed from here
# See: https://github.com/status-im/nim-eth/issues/189
proc populateTable * ( d : Protocol ) {. async . } =
## Do a set of initial lookups to quickly populate the table.
# start with a self target query (neighbour nodes)
let selfQuery = await d . query ( d . localNode . id )
trace " Discovered nodes in self target query " , nodes = selfQuery . len
2022-09-12 15:45:49 -06:00
# `InitialLookups` random queries
for i in 0 .. < InitialLookups :
2022-02-14 01:51:28 +01:00
let randomQuery = await d . queryRandom ( )
trace " Discovered nodes in random target query " , nodes = randomQuery . len
debug " Total nodes in routing table after populate " ,
total = d . routingTable . len ( )
proc revalidateNode * ( d : Protocol , n : Node ) {. async . } =
let pong = await d . ping ( n )
if pong . isOk ( ) :
let res = pong . get ( )
2022-03-07 01:19:49 +01:00
if res . sprSeq > n . record . seqNum :
# Request new SPR
2022-02-14 01:51:28 +01:00
let nodes = await d . findNode ( n , @ [ 0 'u 16 ] )
if nodes . isOk ( ) and nodes [ ] . len > 0 :
discard d . addNode ( nodes [ ] [ 0 ] )
# Get IP and port from pong message and add it to the ip votes
let a = Address ( ip : ValidIpAddress . init ( res . ip ) , port : Port ( res . port ) )
d . ipVote . insert ( n . id , a )
proc revalidateLoop ( d : Protocol ) {. async . } =
## Loop which revalidates the nodes in the routing table by sending the ping
## message.
try :
while true :
2023-05-10 11:50:52 +02:00
await sleepAsync ( milliseconds ( RevalidateMax div 2 + d . rng [ ] . rand ( RevalidateMax div 2 ) ) )
2023-05-11 01:23:06 +02:00
#echo d.localNode.address.get().port, ": ", d.nodesDiscovered()
2022-02-14 01:51:28 +01:00
let n = d . routingTable . nodeToRevalidate ( )
if not n . isNil :
traceAsyncErrors d . revalidateNode ( n )
except CancelledError :
trace " revalidateLoop canceled "
proc refreshLoop ( d : Protocol ) {. async . } =
## Loop that refreshes the routing table by starting a random query in case
2022-09-12 15:45:49 -06:00
## no queries were done since `RefreshInterval` or more.
2022-02-14 01:51:28 +01:00
## It also refreshes the majority address voted for via pong responses.
try :
await d . populateTable ( )
while true :
let currentTime = now ( chronos . Moment )
2022-09-12 15:45:49 -06:00
if currentTime > ( d . lastLookup + RefreshInterval ) :
2022-02-14 01:51:28 +01:00
let randomQuery = await d . queryRandom ( )
trace " Discovered nodes in random target query " , nodes = randomQuery . len
debug " Total nodes in discv5 routing table " , total = d . routingTable . len ( )
2022-09-12 15:45:49 -06:00
await sleepAsync ( RefreshInterval )
2022-02-14 01:51:28 +01:00
except CancelledError :
trace " refreshLoop canceled "
proc ipMajorityLoop ( d : Protocol ) {. async . } =
2022-04-06 14:18:14 +02:00
#TODO this should be handled by libp2p, not the DHT
2022-02-14 01:51:28 +01:00
## When `enrAutoUpdate` is enabled, the IP:port combination returned
2022-03-07 01:19:49 +01:00
## by the majority will be used to update the local SPR.
2022-02-14 01:51:28 +01:00
## This should be safe as long as the routing table is not overwhelmed by
## malicious nodes trying to provide invalid addresses.
## Why is that?
## - Only one vote per NodeId is counted, and they are removed over time.
## - IP:port values are provided through the pong message. The local node
## initiates this by first sending a ping message. Unsolicited pong messages
## are ignored.
## - At interval pings are send to the least recently contacted node (tail of
## bucket) from a random bucket from the routing table.
## - Only messages that our node initiates (ping, findnode, talkreq) and that
## successfully get a response move a node to the head of the bucket.
## Additionally, findNode requests have typically a randomness to it, as they
## usually come from a query for random NodeId.
## - Currently, when a peer fails the respond, it gets replaced. It doesn't
## remain at the tail of the bucket.
## - There are IP limits on the buckets and the whole routing table.
try :
while true :
let majority = d . ipVote . majority ( )
if majority . isSome ( ) :
if d . localNode . address ! = majority :
let address = majority . get ( )
let previous = d . localNode . address
if d . enrAutoUpdate :
let res = d . localNode . update ( d . privateKey ,
ip = some ( address . ip ) , udpPort = some ( address . port ) )
if res . isErr :
2022-03-07 01:19:49 +01:00
warn " Failed updating SPR with newly discovered external address " ,
2022-02-14 01:51:28 +01:00
majority , previous , error = res . error
else :
discovery_enr_auto_update . inc ( )
2022-03-07 01:19:49 +01:00
info " Updated SPR with newly discovered external address " ,
2022-02-14 01:51:28 +01:00
majority , previous , uri = toURI ( d . localNode . record )
else :
2022-03-07 01:19:49 +01:00
warn " Discovered new external address but SPR auto update is off " ,
2022-02-14 01:51:28 +01:00
majority , previous
else :
debug " Discovered external address matches current address " , majority ,
current = d . localNode . address
2022-09-12 15:45:49 -06:00
await sleepAsync ( IpMajorityInterval )
2022-02-14 01:51:28 +01:00
except CancelledError :
trace " ipMajorityLoop canceled "
func init * (
T : type DiscoveryConfig ,
tableIpLimit : uint ,
bucketIpLimit : uint ,
bitsPerHop : int ) : T =
DiscoveryConfig (
tableIpLimits : TableIpLimits (
tableIpLimit : tableIpLimit ,
bucketIpLimit : bucketIpLimit ) ,
bitsPerHop : bitsPerHop
)
proc newProtocol * (
2022-03-25 15:00:57 +11:00
privKey : PrivateKey ,
2022-02-14 01:51:28 +01:00
enrIp : Option [ ValidIpAddress ] ,
enrTcpPort , enrUdpPort : Option [ Port ] ,
localEnrFields : openArray [ ( string , seq [ byte ] ) ] = [ ] ,
2022-03-07 01:19:49 +01:00
bootstrapRecords : openArray [ SignedPeerRecord ] = [ ] ,
previousRecord = none [ SignedPeerRecord ] ( ) ,
2022-02-14 01:51:28 +01:00
bindPort : Port ,
2023-05-10 16:13:42 +02:00
bindIp = IPv4_loopback ( ) ,
2022-02-14 01:51:28 +01:00
enrAutoUpdate = false ,
config = defaultDiscoveryConfig ,
2022-09-29 19:49:55 -04:00
rng = newRng ( ) ,
2022-10-03 15:14:47 -06:00
providers = ProvidersManager . new (
SQLiteDatastore . new ( Memory )
. expect ( " Should not fail! " ) ) ) :
2022-02-14 01:51:28 +01:00
Protocol =
# TODO: Tried adding bindPort = udpPort as parameter but that gave
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
# Anyhow, nim-beacon-chain would also require some changes to support port
# remapping through NAT and this API is also subject to change once we
# introduce support for ipv4 + ipv6 binding/listening.
2022-03-07 01:19:49 +01:00
# TODO: Implement SignedPeerRecord custom fields?
# let extraFields = mapIt(localEnrFields, toFieldPair(it[0], it[1]))
2022-02-14 01:51:28 +01:00
# TODO:
2022-03-07 01:19:49 +01:00
# - Defect as is now or return a result for spr errors?
# - In case incorrect key, allow for new spr based on new key (new node id)?
var record : SignedPeerRecord
2022-02-14 01:51:28 +01:00
if previousRecord . isSome ( ) :
record = previousRecord . get ( )
2022-03-07 01:19:49 +01:00
record . update ( privKey , enrIp , enrTcpPort , enrUdpPort )
. expect ( " SignedPeerRecord within size limits and correct key " )
2022-02-14 01:51:28 +01:00
else :
2022-03-07 01:19:49 +01:00
record = SignedPeerRecord . init ( 1 , privKey , enrIp , enrTcpPort , enrUdpPort )
. expect ( " SignedPeerRecord within size limits " )
2022-02-14 01:51:28 +01:00
2022-03-07 01:19:49 +01:00
info " SPR initialized " , ip = enrIp , tcp = enrTcpPort , udp = enrUdpPort ,
2022-02-14 01:51:28 +01:00
seqNum = record . seqNum , uri = toURI ( record )
if enrIp . isNone ( ) :
if enrAutoUpdate :
2022-03-07 01:19:49 +01:00
notice " No external IP provided for the SPR, this node will not be " &
" discoverable until the SPR is updated with the discovered external IP address "
2022-02-14 01:51:28 +01:00
else :
2022-03-07 01:19:49 +01:00
warn " No external IP provided for the SPR, this node will not be discoverable "
2022-02-14 01:51:28 +01:00
let node = newNode ( record ) . expect ( " Properly initialized record " )
# TODO Consider whether this should be a Defect
doAssert rng ! = nil , " RNG initialization failed "
2022-09-12 15:45:49 -06:00
let
routingTable = RoutingTable . init (
node ,
config . bitsPerHop ,
config . tableIpLimits ,
rng )
2022-04-06 14:18:14 +02:00
result = Protocol (
privateKey : privKey ,
localNode : node ,
bootstrapRecords : @ bootstrapRecords ,
ipVote : IpVote . init ( ) ,
2022-09-12 15:45:49 -06:00
enrAutoUpdate : enrAutoUpdate ,
routingTable : routingTable ,
rng : rng ,
2022-09-29 19:49:55 -04:00
providers : providers )
2022-04-06 14:18:14 +02:00
result . transport = newTransport ( result , privKey , node , bindPort , bindIp , rng )
2022-09-12 18:41:44 -06:00
proc newProtocol * (
privKey : PrivateKey ,
bindPort : Port ,
record : SignedPeerRecord ,
bootstrapRecords : openArray [ SignedPeerRecord ] = [ ] ,
2023-05-10 16:13:42 +02:00
bindIp = IPv4_loopback ( ) ,
2022-09-12 18:41:44 -06:00
config = defaultDiscoveryConfig ,
2022-09-29 19:49:55 -04:00
rng = newRng ( ) ,
2022-10-31 22:41:33 -06:00
providers = ProvidersManager . new ( SQLiteDatastore . new ( Memory )
. expect ( " Should not fail! " ) ) ) : Protocol =
## Initialize DHT protocol
##
2022-09-12 18:41:44 -06:00
info " Discovery SPR initialized " , seqNum = record . seqNum , uri = toURI ( record )
2022-10-31 22:41:33 -06:00
let
node = newNode (
bindIp ,
bindPort ,
privKey . getPublicKey . expect ( " Should get public key " ) ,
record ) . expect ( " Properly initialized record " )
2022-09-12 18:41:44 -06:00
# TODO Consider whether this should be a Defect
doAssert rng ! = nil , " RNG initialization failed "
result = Protocol (
privateKey : privKey ,
localNode : node ,
bootstrapRecords : @ bootstrapRecords ,
ipVote : IpVote . init ( ) ,
enrAutoUpdate : false , #TODO this should be removed from nim-libp2p-dht
routingTable : RoutingTable . init (
node , config . bitsPerHop , config . tableIpLimits , rng ) ,
2022-09-29 19:49:55 -04:00
rng : rng ,
providers : providers )
2022-09-12 18:41:44 -06:00
result . transport = newTransport ( result , privKey , node , bindPort , bindIp , rng )
2022-02-14 23:44:12 +01:00
2022-02-14 01:51:28 +01:00
proc open * ( d : Protocol ) {. raises : [ Defect , CatchableError ] . } =
2022-02-14 23:44:12 +01:00
info " Starting discovery node " , node = d . localNode
2022-02-14 01:51:28 +01:00
2022-02-14 23:44:12 +01:00
d . transport . open ( )
2022-02-14 01:51:28 +01:00
d . seedTable ( )
2022-10-01 12:05:03 -04:00
proc start * ( d : Protocol ) {. async . } =
2022-02-14 01:51:28 +01:00
d . refreshLoop = refreshLoop ( d )
d . revalidateLoop = revalidateLoop ( d )
d . ipMajorityLoop = ipMajorityLoop ( d )
2022-10-01 12:05:03 -04:00
await d . providers . start ( )
2022-02-14 01:51:28 +01:00
proc closeWait * ( d : Protocol ) {. async . } =
2022-02-14 23:44:12 +01:00
doAssert ( not d . transport . closed )
2022-02-14 01:51:28 +01:00
debug " Closing discovery node " , node = d . localNode
if not d . revalidateLoop . isNil :
await d . revalidateLoop . cancelAndWait ( )
if not d . refreshLoop . isNil :
await d . refreshLoop . cancelAndWait ( )
if not d . ipMajorityLoop . isNil :
await d . ipMajorityLoop . cancelAndWait ( )
2022-02-14 23:44:12 +01:00
await d . transport . closeWait ( )