WIP: change seqNo to ttl

regarding the dht, in the meeting swapping seqNo for ttl was discussed. After attempting to implement this change, the tests fail due to the lack of granularity, eg NodeA is instantiated, then NodeB is instantiated with a different ip/port and with NodeA as its previous record. The seqNo of both will be the same as more than a second hasn't elapsed, meaning that the record for NodeB would never been seen as newer.

Maybe the cheapest solution here would be to retain the seqNo field and add a ttl field that can be checked separately for expiration? Or if we only want one field, make it nanosecond resolution?
This commit is contained in:
Eric Mastro 2022-03-23 18:17:54 +11:00
parent d785aa4642
commit 7c0fb7aef4
No known key found for this signature in database
GPG Key ID: 141E3048D95A4E63
5 changed files with 125 additions and 107 deletions

View File

@ -221,12 +221,12 @@ func getRecord*(d: Protocol): SignedPeerRecord =
## Get the SPR of the local node.
d.localNode.record
proc updateRecord*(d: Protocol): DiscResult[void] =
proc updateRecord*(d: Protocol, ttl = TTL_DEFAULT): DiscResult[void] =
## Update the ENR of the local node with provided `enrFields` k:v pairs.
# TODO: Do we need this proc? This simply serves so that seqNo will be
# incremented to satisfy the tests...
d.localNode.record.incSeqNo(d.privateKey)
d.localNode.record.incTtl(d.privateKey)
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
# we stored a handshake with in order to get that ENR updated?
@ -907,16 +907,16 @@ func init*(
proc newProtocol*(
privKey: keys.PrivateKey,
enrIp: Option[ValidIpAddress],
enrTcpPort, enrUdpPort: Option[Port],
localEnrFields: openArray[(string, seq[byte])] = [],
sprIp: Option[ValidIpAddress],
sprTcpPort, sprUdpPort: Option[Port],
bootstrapRecords: openArray[SignedPeerRecord] = [],
previousRecord = none[SignedPeerRecord](),
bindPort: Port,
bindIp = IPv4_any(),
enrAutoUpdate = false,
config = defaultDiscoveryConfig,
rng = keys.newRng()):
rng = keys.newRng(),
sprTtl = TTL_DEFAULT):
Protocol =
# TODO: Tried adding bindPort = udpPort as parameter but that gave
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
@ -933,15 +933,18 @@ proc newProtocol*(
var record: SignedPeerRecord
if previousRecord.isSome():
record = previousRecord.get()
record.update(privKey, enrIp, enrTcpPort, enrUdpPort)
debugEcho ">>> [protocol.newProtocol] previousRecord seqNum BEFORE: ", record.seqNum
record.update(privKey, sprIp, sprTcpPort, sprUdpPort, sprTtl)
.expect("SignedPeerRecord within size limits and correct key")
debugEcho ">>> [protocol.newProtocol] record seqNum AFTER: ", record.seqNum
debugEcho ">>> [protocol.newProtocol] previousRecord seqNum AFTER: ", previousRecord.get().seqNum
else:
record = SignedPeerRecord.init(1, privKey, enrIp, enrTcpPort, enrUdpPort)
record = SignedPeerRecord.init(privKey, sprIp, sprTcpPort, sprUdpPort)
.expect("SignedPeerRecord within size limits")
info "SPR initialized", ip = enrIp, tcp = enrTcpPort, udp = enrUdpPort,
info "SPR initialized", ip = sprIp, tcp = sprTcpPort, udp = sprUdpPort,
seqNum = record.seqNum, uri = toURI(record)
if enrIp.isNone():
if sprIp.isNone():
if enrAutoUpdate:
notice "No external IP provided for the SPR, this node will not be " &
"discoverable until the SPR is updated with the discovered external IP address"

View File

@ -6,7 +6,7 @@
#
import
chronicles,
std/[options, sequtils, strutils, sugar],
std/[options, sequtils, strutils, sugar, times],
pkg/stew/[results, byteutils, arrayops],
stew/endians2,
stew/shims/net,
@ -29,8 +29,24 @@ type
RecordResult*[T] = Result[T, cstring]
const
TTL_DEFAULT* = initDuration(minutes = 30)
TTL_MAX* = initDuration(minutes = 30)
proc seqNum*(r: SignedPeerRecord): uint64 =
r.data.seqNo
r.data.seqNo
proc nowUnix*(): uint64 = now().utc.toTime.toUnix.uint64
proc afterNow*(ttl: Duration): uint64 =
debugEcho ">>> [spr.afterNow] nowUnix(): ", nowUnix()
debugEcho ">>> [spr.afterNow] ttl.inSeconds.uint64: ", ttl.inSeconds.uint64
let res = nowUnix() + ttl.inSeconds.uint64
debugEcho ">>> [spr.afterNow] res: ", res
return res
proc isExpired*(r: SignedPeerRecord): bool =
r.seqNum < nowUnix()
#proc encode
proc append*(rlpWriter: var RlpWriter, value: SignedPeerRecord) =
@ -88,13 +104,14 @@ proc get*(r: SignedPeerRecord, T: type keys.PublicKey): Option[T] =
pk = r.envelope.publicKey
pkToPk(pk)
proc incSeqNo*(
proc incTtl*(
r: var SignedPeerRecord,
pk: keys.PrivateKey): RecordResult[void] =
pk: keys.PrivateKey,
ttl: Duration = TTL_DEFAULT): RecordResult[void] =
let cryptoPk = pk.pkToPk.get() # TODO: remove when eth/keys removed
r.data.seqNo.inc()
r.data.seqNo = ttl.afterNow
r = ? SignedPeerRecord.init(cryptoPk, r.data).mapErr(
(e: CryptoError) =>
("Error initialising SignedPeerRecord with incremented seqNo: " &
@ -104,9 +121,10 @@ proc incSeqNo*(
proc update*(r: var SignedPeerRecord, pk: crypto.PrivateKey,
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port] = none[Port]()):
RecordResult[void] =
ip: Option[ValidIpAddress],
tcpPort,
udpPort: Option[Port] = Port.none,
ttl: Duration = TTL_DEFAULT): RecordResult[void] =
## Update a `SignedPeerRecord` with given ip address, tcp port, udp port and optional
## custom k:v pairs.
##
@ -194,12 +212,17 @@ proc update*(r: var SignedPeerRecord, pk: crypto.PrivateKey,
transProtoPort = udpPort.get
updated = MultiAddress.init(ipAddr, transProto, transProtoPort)
debugEcho ">>> [spr.update] existing ma: ", $existing
debugEcho ">>> [spr.update] updated ma: ", $updated
changed = existing != updated
r.data.addresses[0].address = updated
# increase the sequence number only if we've updated the multiaddress
if changed: r.data.seqNo.inc()
# increase the sequence number (ttl) only if we've updated the multiaddress
debugEcho ">>> [spr.update] changed: ", changed
debugEcho ">>> [spr.update] changed seqNo BEFORE: ", r.data.seqNo
if changed: r.data.seqNo = ttl.afterNow()
debugEcho ">>> [spr.update] changed seqNo AFTER: ", r.data.seqNo
r = ? SignedPeerRecord.init(pk, r.data)
.mapErr((e: CryptoError) =>
@ -210,10 +233,11 @@ proc update*(r: var SignedPeerRecord, pk: crypto.PrivateKey,
proc update*(r: var SignedPeerRecord, pk: keys.PrivateKey,
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port] = none[Port]()):
tcpPort, udpPort: Option[Port] = none[Port](),
ttl = TTL_DEFAULT):
RecordResult[void] =
let cPk = pkToPk(pk).get
r.update(cPk, ip, tcpPort, udpPort)
r.update(cPk, ip, tcpPort, udpPort, ttl)
proc toTypedRecord*(r: SignedPeerRecord) : RecordResult[SignedPeerRecord] = ok(r)
@ -274,11 +298,12 @@ proc toBase64*(r: SignedPeerRecord): string =
proc toURI*(r: SignedPeerRecord): string = "spr:" & r.toBase64
proc init*(T: type SignedPeerRecord, seqNum: uint64,
pk: crypto.PrivateKey,
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port]):
RecordResult[T] =
proc init*(T: type SignedPeerRecord,
pk: crypto.PrivateKey,
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port],
ttl: Duration = TTL_DEFAULT): RecordResult[T] =
## Initialize a `SignedPeerRecord` with given sequence number, private key, optional
## ip address, tcp port, udp port, and optional custom k:v pairs.
##
@ -286,65 +311,42 @@ proc init*(T: type SignedPeerRecord, seqNum: uint64,
let peerId = PeerId.init(pk).get
if tcpPort.isSome() and udpPort.isSome:
if tcpPort.isSome and udpPort.isSome:
warn "Both tcp and udp ports specified, using udp in multiaddress",
tcpPort, udpPort
var
ipAddr = try: ValidIpAddress.init("127.0.0.1")
except ValueError as e:
return err ("Existing address contains invalid address: " & $e.msg).cstring
proto: IpTransportProtocol
protoPort: Port
if ip.isSome():
let
fallbackIp = ValidIpAddress.init(IPv4_loopback())
ipAddr = ip.get(fallbackIp)
ipAddr = ip.get
if tcpPort.isSome():
proto = IpTransportProtocol.tcpProtocol
protoPort = tcpPort.get()
if udpPort.isSome():
proto = IpTransportProtocol.udpProtocol
protoPort = udpPort.get()
if tcpPort.isSome():
proto = IpTransportProtocol.tcpProtocol
protoPort = tcpPort.get()
if udpPort.isSome():
proto = IpTransportProtocol.udpProtocol
protoPort = udpPort.get()
else:
if tcpPort.isSome():
proto = IpTransportProtocol.tcpProtocol
protoPort = tcpPort.get()
if udpPort.isSome():
proto = IpTransportProtocol.udpProtocol
protoPort = udpPort.get()
let
ma = MultiAddress.init(ipAddr, proto, protoPort)
pr = PeerRecord.init(peerId, @[ma], ttl.afterNow)
SignedPeerRecord.init(pk, pr)
.mapErr(
(e: CryptoError) => ("Failed to init SignedPeerRecord: " & $e).cstring
)
let ma = MultiAddress.init(ipAddr, proto, protoPort)
# if ip.isSome:
# let
# ipAddr = ip.get
# proto = ipAddr.family
# address = if proto == IPv4: ipAddr.address_v4
# else: ipAddr.address_v6
# u and udpPort.isSome
# # let ta = initTAddress(ip.get, udpPort.get)
# # echo ta
# # ma = MultiAddress.init(ta).get
# #let ma1 = MultiAddress.init("/ip4/127.0.0.1").get() #TODO
# #let ma2 = MultiAddress.init(multiCodec("udp"), udpPort.get.int).get
# #ma = ma1 & ma2
# ma = MultiAddress.init("/ip4/127.0.0.1/udp/" & $udpPort.get.int).get #TODO
# else:
# ma = MultiAddress.init()
# # echo "not implemented"
proc init*(T: type SignedPeerRecord,
pk: keys.PrivateKey,
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port],
ttl: Duration = TTL_DEFAULT): RecordResult[T] =
let pr = PeerRecord.init(peerId, @[ma], seqNum)
SignedPeerRecord.init(pk, pr).mapErr((e: CryptoError) => ("Failed to init SignedPeerRecord: " & $e).cstring)
proc init*(T: type SignedPeerRecord, seqNum: uint64,
pk: keys.PrivateKey,
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port]):
RecordResult[T] =
let kPk = pkToPk(pk).get
SignedPeerRecord.init(seqNum, kPk, ip, tcpPort, udpPort)
SignedPeerRecord.init(kPk, ip, tcpPort, udpPort, ttl)
proc contains*(r: SignedPeerRecord, fp: (string, seq[byte])): bool =
# TODO: use FieldPair for this, but that is a bit cumbersome. Perhaps the

View File

@ -16,7 +16,6 @@ proc initDiscoveryNode*(
privKey: keys.PrivateKey,
address: Address,
bootstrapRecords: openArray[SignedPeerRecord] = [],
localEnrFields: openArray[(string, seq[byte])] = [],
previousRecord = none[SignedPeerRecord]()):
discv5_protocol.Protocol =
# set bucketIpLimit to allow bucket split
@ -28,7 +27,6 @@ proc initDiscoveryNode*(
some(address.port), some(address.port),
bindPort = address.port,
bootstrapRecords = bootstrapRecords,
localEnrFields = localEnrFields,
previousRecord = previousRecord,
config = config,
rng = rng)
@ -45,7 +43,7 @@ proc generateNode*(privKey: keys.PrivateKey, port: int = 20302,
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node =
let
port = Port(port)
spr = SignedPeerRecord.init(1, privKey, some(ip), some(port), some(port))
spr = SignedPeerRecord.init(privKey, some(ip), some(port), some(port))
.expect("Properly intialized private key")
result = newNode(spr).expect("Properly initialized node")

View File

@ -1,7 +1,7 @@
{.used.}
import
std/tables,
std/[tables, times],
chronos, chronicles, stint, asynctest, stew/shims/net,
stew/byteutils, bearssl,
eth/keys,
@ -318,6 +318,7 @@ suite "Discovery v5 Tests":
targetAddress = localAddress(20303)
targetNode = initDiscoveryNode(rng, targetKey, targetAddress)
targetId = targetNode.localNode.id
ttl = initDuration(seconds = 1)
var targetSeqNum = targetNode.localNode.record.seqNum
@ -347,22 +348,22 @@ suite "Discovery v5 Tests":
nodes[].len == 1
mainNode.addNode(nodes[][0])
targetSeqNum.inc()
# need to add something to get the spr sequence number incremented
let update = targetNode.updateRecord()
let update = targetNode.updateRecord(ttl)
check update.isOk()
var n = mainNode.getNode(targetId)
check:
n.isSome()
n.get().id == targetId
n.get().record.seqNum == targetSeqNum - 1
n.get().record.seqNum == targetSeqNum
n = await mainNode.resolve(targetId)
check:
n.isSome()
n.get().id == targetId
n.get().record.seqNum == targetSeqNum
n.get().record.seqNum - ttl.inSeconds.uint64 > targetSeqNum
not n.get().record.isExpired
# Add the updated version
discard mainNode.addNode(n.get())
@ -370,8 +371,8 @@ suite "Discovery v5 Tests":
# Update seqNum in SPR again, ping lookupNode to be added in routing table,
# close targetNode, resolve should lookup, check if we get updated SPR.
block:
targetSeqNum.inc()
let update = targetNode.updateRecord()
targetSeqNum = nowUnix()
let update = targetNode.updateRecord(ttl)
check update.isOk()
# ping node so that its SPR gets added
@ -386,7 +387,8 @@ suite "Discovery v5 Tests":
check:
n.isSome()
n.get().id == targetId
n.get().record.seqNum == targetSeqNum
n.get().record.seqNum - ttl.inSeconds.uint64 > targetSeqNum
not n.get().record.isExpired
await mainNode.closeWait()
await lookupNode.closeWait()
@ -417,21 +419,34 @@ suite "Discovery v5 Tests":
privKey = keys.PrivateKey.random(rng[])
ip = some(ValidIpAddress.init("127.0.0.1"))
port = Port(20301)
now = nowUnix()
ttl1 = initDuration(minutes = 10)
ttl2 = initDuration(minutes = 20)
node = newProtocol(privKey, ip, some(port), some(port), bindPort = port,
rng = rng)
noUpdatesNode = newProtocol(privKey, ip, some(port), some(port),
bindPort = port, rng = rng, previousRecord = some(node.getRecord()))
updatesNode = newProtocol(privKey, ip, some(port), some(Port(20302)),
bindPort = port, rng = rng,
previousRecord = some(noUpdatesNode.getRecord()))
moreUpdatesNode = newProtocol(privKey, ip, some(port), some(port),
bindPort = port, rng = rng, localEnrFields = {"addfield": @[byte 0]},
previousRecord = some(updatesNode.getRecord()))
previousRecord = some(noUpdatesNode.getRecord()),
sprTtl = ttl1)
moreUpdatesNode = newProtocol(privKey,
some(ValidIpAddress.init("127.0.0.2")), some(port), some(Port(20302)),
bindPort = port, rng = rng,
previousRecord = some(updatesNode.getRecord()),
sprTtl = ttl2)
check:
node.getRecord().seqNum == 1
noUpdatesNode.getRecord().seqNum == 1
updatesNode.getRecord().seqNum == 2
moreUpdatesNode.getRecord().seqNum == 3
node.getRecord().seqNum == now + TTL_DEFAULT.inSeconds.uint64
not node.getRecord().isExpired
noUpdatesNode.getRecord().seqNum == node.getRecord().seqNum
not noUpdatesNode.getRecord().isExpired
updatesNode.getRecord().seqNum == now + ttl1.inSeconds.uint64
not updatesNode.getRecord().isExpired
moreUpdatesNode.getRecord().seqNum == now + ttl2.inSeconds.uint64
not moreUpdatesNode.getRecord().isExpired
# Defect (for now?) on incorrect key use
expect ResultDefect:
@ -509,7 +524,7 @@ suite "Discovery v5 Tests":
test "Verify records of nodes message":
let
port = Port(9000)
fromNoderecord = SignedPeerRecord.init(1, keys.PrivateKey.random(rng[]),
fromNoderecord = SignedPeerRecord.init(keys.PrivateKey.random(rng[]),
some(ValidIpAddress.init("11.12.13.14")),
some(port), some(port))[]
fromNode = newNode(fromNoderecord)[]
@ -520,7 +535,7 @@ suite "Discovery v5 Tests":
block: # Duplicates
let
record = SignedPeerRecord.init(
1, pk, some(ValidIpAddress.init("12.13.14.15")),
pk, some(ValidIpAddress.init("12.13.14.15")),
some(port), some(port))[]
# Exact duplicates
@ -530,7 +545,7 @@ suite "Discovery v5 Tests":
# Node id duplicates
let recordSameId = SignedPeerRecord.init(
1, pk, some(ValidIpAddress.init("212.13.14.15")),
pk, some(ValidIpAddress.init("212.13.14.15")),
some(port), some(port))[]
records.add(recordSameId)
nodes = verifyNodesRecords(records, fromNode, limit, targetDistance)
@ -539,7 +554,7 @@ suite "Discovery v5 Tests":
block: # No address
let
recordNoAddress = SignedPeerRecord.init(
1, pk, none(ValidIpAddress), some(port), some(port))[]
pk, none(ValidIpAddress), some(port), some(port))[]
records = [recordNoAddress]
test = verifyNodesRecords(records, fromNode, limit, targetDistance)
check test.len == 0
@ -547,7 +562,7 @@ suite "Discovery v5 Tests":
block: # Invalid address - site local
let
recordInvalidAddress = SignedPeerRecord.init(
1, pk, some(ValidIpAddress.init("10.1.2.3")),
pk, some(ValidIpAddress.init("10.1.2.3")),
some(port), some(port))[]
records = [recordInvalidAddress]
test = verifyNodesRecords(records, fromNode, limit, targetDistance)
@ -556,7 +571,7 @@ suite "Discovery v5 Tests":
block: # Invalid address - loopback
let
recordInvalidAddress = SignedPeerRecord.init(
1, pk, some(ValidIpAddress.init("127.0.0.1")),
pk, some(ValidIpAddress.init("127.0.0.1")),
some(port), some(port))[]
records = [recordInvalidAddress]
test = verifyNodesRecords(records, fromNode, limit, targetDistance)
@ -565,7 +580,7 @@ suite "Discovery v5 Tests":
block: # Invalid distance
let
recordInvalidDistance = SignedPeerRecord.init(
1, pk, some(ValidIpAddress.init("12.13.14.15")),
pk, some(ValidIpAddress.init("12.13.14.15")),
some(port), some(port))[]
records = [recordInvalidDistance]
test = verifyNodesRecords(records, fromNode, limit, @[0'u16])
@ -574,7 +589,7 @@ suite "Discovery v5 Tests":
block: # Invalid distance but distance validation is disabled
let
recordInvalidDistance = SignedPeerRecord.init(
1, pk, some(ValidIpAddress.init("12.13.14.15")),
pk, some(ValidIpAddress.init("12.13.14.15")),
some(port), some(port))[]
records = [recordInvalidDistance]
test = verifyNodesRecords(records, fromNode, limit)
@ -600,7 +615,7 @@ suite "Discovery v5 Tests":
for i in 0 ..< 5:
let
privKey = keys.PrivateKey.random(rng[])
enrRec = SignedPeerRecord.init(1, privKey,
enrRec = SignedPeerRecord.init(privKey,
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
sendNode = newNode(enrRec).expect("Properly initialized record")
@ -629,7 +644,7 @@ suite "Discovery v5 Tests":
# and "receive" them on receiveNode
let
privKey = keys.PrivateKey.random(rng[])
enrRec = SignedPeerRecord.init(1, privKey,
enrRec = SignedPeerRecord.init(privKey,
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
sendNode = newNode(enrRec).expect("Properly initialized record")
@ -660,7 +675,7 @@ suite "Discovery v5 Tests":
let
a = localAddress(20303)
privKey = keys.PrivateKey.random(rng[])
enrRec = SignedPeerRecord.init(1, privKey,
enrRec = SignedPeerRecord.init(privKey,
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
sendNode = newNode(enrRec).expect("Properly initialized record")

View File

@ -262,11 +262,11 @@ suite "Discovery v5.1 Packet Encodings Test Vectors":
privKeyB = keys.PrivateKey.fromHex(nodeBKey)[] # receive -> decode
let
enrRecA = SignedPeerRecord.init(1, privKeyA,
enrRecA = SignedPeerRecord.init(privKeyA,
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
enrRecB = SignedPeerRecord.init(1, privKeyB,
enrRecB = SignedPeerRecord.init(privKeyB,
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
@ -493,11 +493,11 @@ suite "Discovery v5.1 Additional Encode/Decode":
privKeyB = keys.PrivateKey.random(rng[]) # receiver -> decode
let
enrRecA = SignedPeerRecord.init(1, privKeyA,
enrRecA = SignedPeerRecord.init(privKeyA,
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
enrRecB = SignedPeerRecord.init(1, privKeyB,
enrRecB = SignedPeerRecord.init(privKeyB,
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")