Merge pull request #38 from status-im/fixes-for-integration
Fixes for dagger integration
This commit is contained in:
commit
e801660b44
|
@ -222,16 +222,29 @@ func getRecord*(d: Protocol): SignedPeerRecord =
|
||||||
## Get the SPR of the local node.
|
## Get the SPR of the local node.
|
||||||
d.localNode.record
|
d.localNode.record
|
||||||
|
|
||||||
proc updateRecord*(d: Protocol): DiscResult[void] =
|
proc updateRecord*(
|
||||||
|
d: Protocol,
|
||||||
|
spr: Option[SignedPeerRecord] = SignedPeerRecord.none): DiscResult[void] =
|
||||||
## Update the ENR of the local node with provided `enrFields` k:v pairs.
|
## Update the ENR of the local node with provided `enrFields` k:v pairs.
|
||||||
|
##
|
||||||
|
|
||||||
# TODO: Do we need this proc? This simply serves so that seqNo will be
|
if spr.isSome:
|
||||||
# incremented to satisfy the tests...
|
let
|
||||||
d.localNode.record.incSeqNo(d.privateKey)
|
newSpr = spr.get()
|
||||||
|
seqNo = d.localNode.record.seqNum
|
||||||
|
|
||||||
|
info "Updated discovery SPR", uri = toURI(newSpr)
|
||||||
|
|
||||||
|
d.localNode.record = newSpr
|
||||||
|
d.localNode.record.data.seqNo = seqNo
|
||||||
|
|
||||||
|
? d.localNode.record.incSeqNo(d.privateKey)
|
||||||
|
|
||||||
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
||||||
# we stored a handshake with in order to get that ENR updated?
|
# we stored a handshake with in order to get that ENR updated?
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
proc sendResponse(d: Protocol, dstId: NodeId, dstAddr: Address,
|
proc sendResponse(d: Protocol, dstId: NodeId, dstAddr: Address,
|
||||||
message: SomeMessage, reqId: RequestId) =
|
message: SomeMessage, reqId: RequestId) =
|
||||||
## send Response using the specifid reqId
|
## send Response using the specifid reqId
|
||||||
|
@ -316,7 +329,7 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
d.sendResponse(fromId, fromAddr, talkresp, reqId)
|
d.sendResponse(fromId, fromAddr, talkresp, reqId)
|
||||||
|
|
||||||
proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) =
|
proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) =
|
||||||
trace "adding provider to local db", n=p.localNode, cId, prov
|
trace "adding provider to local db", cid=cId, spr=prov.data
|
||||||
p.providers.mgetOrPut(cId, @[]).add(prov)
|
p.providers.mgetOrPut(cId, @[]).add(prov)
|
||||||
|
|
||||||
proc handleAddProvider(d: Protocol, fromId: NodeId, fromAddr: Address,
|
proc handleAddProvider(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
|
@ -328,7 +341,7 @@ proc handleGetProviders(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
|
|
||||||
#TODO: add checks, add signed version
|
#TODO: add checks, add signed version
|
||||||
let provs = d.providers.getOrDefault(getProviders.cId)
|
let provs = d.providers.getOrDefault(getProviders.cId)
|
||||||
trace "providers:", provs
|
trace "providers:", prov=provs.mapIt(it.data)
|
||||||
|
|
||||||
##TODO: handle multiple messages
|
##TODO: handle multiple messages
|
||||||
let response = ProvidersMessage(total: 1, provs: provs)
|
let response = ProvidersMessage(total: 1, provs: provs)
|
||||||
|
@ -692,7 +705,7 @@ proc getProviders*(
|
||||||
|
|
||||||
# What providers do we know about?
|
# What providers do we know about?
|
||||||
var res = d.getProvidersLocal(cId, maxitems)
|
var res = d.getProvidersLocal(cId, maxitems)
|
||||||
trace "local providers:", res
|
trace "local providers:", prov=res.mapIt(it.data)
|
||||||
|
|
||||||
let nodesNearby = await d.lookup(cId)
|
let nodesNearby = await d.lookup(cId)
|
||||||
trace "nearby:", nodesNearby
|
trace "nearby:", nodesNearby
|
||||||
|
@ -710,7 +723,6 @@ proc getProviders*(
|
||||||
providersFut.del(index)
|
providersFut.del(index)
|
||||||
|
|
||||||
let providersMsg2 = await providersMsg
|
let providersMsg2 = await providersMsg
|
||||||
trace "2", providersMsg2
|
|
||||||
|
|
||||||
let providersMsgRes = providersMsg.read
|
let providersMsgRes = providersMsg.read
|
||||||
if providersMsgRes.isOk:
|
if providersMsgRes.isOk:
|
||||||
|
@ -720,7 +732,7 @@ proc getProviders*(
|
||||||
error "Sending of GetProviders message failed", error = providersMsgRes.error
|
error "Sending of GetProviders message failed", error = providersMsgRes.error
|
||||||
# TODO: should we consider this as an error result if all GetProviders
|
# TODO: should we consider this as an error result if all GetProviders
|
||||||
# requests fail??
|
# requests fail??
|
||||||
trace "getProviders collected: ", res
|
trace "getProviders collected: ", res=res.mapIt(it.data)
|
||||||
|
|
||||||
return ok res
|
return ok res
|
||||||
|
|
||||||
|
@ -889,6 +901,7 @@ proc refreshLoop(d: Protocol) {.async.} =
|
||||||
trace "refreshLoop canceled"
|
trace "refreshLoop canceled"
|
||||||
|
|
||||||
proc ipMajorityLoop(d: Protocol) {.async.} =
|
proc ipMajorityLoop(d: Protocol) {.async.} =
|
||||||
|
#TODO this should be handled by libp2p, not the DHT
|
||||||
## When `enrAutoUpdate` is enabled, the IP:port combination returned
|
## When `enrAutoUpdate` is enabled, the IP:port combination returned
|
||||||
## by the majority will be used to update the local SPR.
|
## by the majority will be used to update the local SPR.
|
||||||
## This should be safe as long as the routing table is not overwhelmed by
|
## This should be safe as long as the routing table is not overwhelmed by
|
||||||
|
@ -1008,6 +1021,33 @@ proc newProtocol*(
|
||||||
|
|
||||||
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
|
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
|
||||||
|
|
||||||
|
proc newProtocol*(
|
||||||
|
privKey: PrivateKey,
|
||||||
|
bindPort: Port,
|
||||||
|
record: SignedPeerRecord,
|
||||||
|
bootstrapRecords: openArray[SignedPeerRecord] = [],
|
||||||
|
bindIp = IPv4_any(),
|
||||||
|
config = defaultDiscoveryConfig,
|
||||||
|
rng = newRng()):
|
||||||
|
Protocol =
|
||||||
|
info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record)
|
||||||
|
let node = newNode(record).expect("Properly initialized record")
|
||||||
|
|
||||||
|
# TODO Consider whether this should be a Defect
|
||||||
|
doAssert rng != nil, "RNG initialization failed"
|
||||||
|
|
||||||
|
result = Protocol(
|
||||||
|
privateKey: privKey,
|
||||||
|
localNode: node,
|
||||||
|
bootstrapRecords: @bootstrapRecords,
|
||||||
|
ipVote: IpVote.init(),
|
||||||
|
enrAutoUpdate: false, #TODO this should be removed from nim-libp2p-dht
|
||||||
|
routingTable: RoutingTable.init(
|
||||||
|
node, config.bitsPerHop, config.tableIpLimits, rng),
|
||||||
|
rng: rng)
|
||||||
|
|
||||||
|
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
|
||||||
|
|
||||||
|
|
||||||
proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
|
proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
|
||||||
info "Starting discovery node", node = d.localNode
|
info "Starting discovery node", node = d.localNode
|
||||||
|
|
|
@ -31,7 +31,6 @@ proc seqNum*(r: SignedPeerRecord): uint64 =
|
||||||
r.data.seqNo
|
r.data.seqNo
|
||||||
|
|
||||||
proc fromBytes(r: var SignedPeerRecord, s: openArray[byte]): bool =
|
proc fromBytes(r: var SignedPeerRecord, s: openArray[byte]): bool =
|
||||||
|
|
||||||
let decoded = SignedPeerRecord.decode(@s)
|
let decoded = SignedPeerRecord.decode(@s)
|
||||||
if decoded.isErr:
|
if decoded.isErr:
|
||||||
error "Error decoding SignedPeerRecord", error = decoded.error
|
error "Error decoding SignedPeerRecord", error = decoded.error
|
||||||
|
@ -47,22 +46,21 @@ proc get*(r: SignedPeerRecord, T: type PublicKey): Option[T] =
|
||||||
r.envelope.publicKey.some
|
r.envelope.publicKey.some
|
||||||
|
|
||||||
proc incSeqNo*(
|
proc incSeqNo*(
|
||||||
r: var SignedPeerRecord,
|
r: var SignedPeerRecord,
|
||||||
pk: PrivateKey): RecordResult[void] =
|
pk: PrivateKey): RecordResult[void] =
|
||||||
|
|
||||||
r.data.seqNo.inc()
|
r.data.seqNo.inc()
|
||||||
r = ? SignedPeerRecord.init(pk, r.data).mapErr(
|
r = ? SignedPeerRecord.init(pk, r.data).mapErr(
|
||||||
(e: CryptoError) =>
|
(e: CryptoError) =>
|
||||||
("Error initialising SignedPeerRecord with incremented seqNo: " &
|
("Error initializing SignedPeerRecord with incremented seqNo: " & $e).cstring)
|
||||||
$e).cstring
|
|
||||||
)
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
proc update*(
|
||||||
proc update*(r: var SignedPeerRecord, pk: crypto.PrivateKey,
|
r: var SignedPeerRecord,
|
||||||
ip: Option[ValidIpAddress],
|
pk: crypto.PrivateKey,
|
||||||
tcpPort, udpPort: Option[Port] = none[Port]()):
|
ip: Option[ValidIpAddress],
|
||||||
RecordResult[void] =
|
tcpPort, udpPort: Option[Port] = none[Port]()):
|
||||||
|
RecordResult[void] =
|
||||||
## Update a `SignedPeerRecord` with given ip address, tcp port, udp port and optional
|
## Update a `SignedPeerRecord` with given ip address, tcp port, udp port and optional
|
||||||
## custom k:v pairs.
|
## custom k:v pairs.
|
||||||
##
|
##
|
||||||
|
@ -110,7 +108,6 @@ proc update*(r: var SignedPeerRecord, pk: crypto.PrivateKey,
|
||||||
transProtoPort = udpPort.get
|
transProtoPort = udpPort.get
|
||||||
|
|
||||||
updated = MultiAddress.init(ipAddr, transProto, transProtoPort)
|
updated = MultiAddress.init(ipAddr, transProto, transProtoPort)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
let
|
let
|
||||||
existing = r.data.addresses[0].address
|
existing = r.data.addresses[0].address
|
||||||
|
@ -158,22 +155,21 @@ proc update*(r: var SignedPeerRecord, pk: crypto.PrivateKey,
|
||||||
|
|
||||||
r = ? SignedPeerRecord.init(pk, r.data)
|
r = ? SignedPeerRecord.init(pk, r.data)
|
||||||
.mapErr((e: CryptoError) =>
|
.mapErr((e: CryptoError) =>
|
||||||
("Failed to update SignedPeerRecord: " & $e).cstring
|
("Failed to update SignedPeerRecord: " & $e).cstring)
|
||||||
)
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc toTypedRecord*(r: SignedPeerRecord) : RecordResult[SignedPeerRecord] = ok(r)
|
proc toTypedRecord*(r: SignedPeerRecord) : RecordResult[SignedPeerRecord] = ok(r)
|
||||||
|
|
||||||
proc ip*(r: SignedPeerRecord): Option[array[4, byte]] =
|
proc ip*(r: SignedPeerRecord): Option[array[4, byte]] =
|
||||||
let ma = r.data.addresses[0].address
|
for address in r.data.addresses:
|
||||||
|
let ma = address.address
|
||||||
let code = ma[0].get.protoCode()
|
let code = ma[0].get.protoCode()
|
||||||
if code.isOk and code.get == multiCodec("ip4"):
|
if code.isOk and code.get == multiCodec("ip4"):
|
||||||
var ipbuf: array[4, byte]
|
var ipbuf: array[4, byte]
|
||||||
let res = ma[0].get.protoArgument(ipbuf)
|
let res = ma[0].get.protoArgument(ipbuf)
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
return some(ipbuf)
|
return some(ipbuf)
|
||||||
|
|
||||||
# err("Incorrect IPv4 address")
|
# err("Incorrect IPv4 address")
|
||||||
# else:
|
# else:
|
||||||
|
@ -188,15 +184,16 @@ proc ip*(r: SignedPeerRecord): Option[array[4, byte]] =
|
||||||
# err("MultiAddress must be wire address (tcp, udp or unix)")
|
# err("MultiAddress must be wire address (tcp, udp or unix)")
|
||||||
|
|
||||||
proc udp*(r: SignedPeerRecord): Option[int] =
|
proc udp*(r: SignedPeerRecord): Option[int] =
|
||||||
let ma = r.data.addresses[0].address
|
for address in r.data.addresses:
|
||||||
|
let ma = address.address
|
||||||
|
|
||||||
let code = ma[1].get.protoCode()
|
let code = ma[1].get.protoCode()
|
||||||
if code.isOk and code.get == multiCodec("udp"):
|
if code.isOk and code.get == multiCodec("udp"):
|
||||||
var pbuf: array[2, byte]
|
var pbuf: array[2, byte]
|
||||||
let res = ma[1].get.protoArgument(pbuf)
|
let res = ma[1].get.protoArgument(pbuf)
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
let p = fromBytesBE(uint16, pbuf)
|
let p = fromBytesBE(uint16, pbuf)
|
||||||
return some(p.int)
|
return some(p.int)
|
||||||
|
|
||||||
proc fromBase64*(r: var SignedPeerRecord, s: string): bool =
|
proc fromBase64*(r: var SignedPeerRecord, s: string): bool =
|
||||||
## Loads SPR from base64-encoded protobuf-encoded bytes, and validates the
|
## Loads SPR from base64-encoded protobuf-encoded bytes, and validates the
|
||||||
|
@ -222,11 +219,13 @@ proc toBase64*(r: SignedPeerRecord): string =
|
||||||
|
|
||||||
proc toURI*(r: SignedPeerRecord): string = "spr:" & r.toBase64
|
proc toURI*(r: SignedPeerRecord): string = "spr:" & r.toBase64
|
||||||
|
|
||||||
proc init*(T: type SignedPeerRecord, seqNum: uint64,
|
proc init*(
|
||||||
pk: PrivateKey,
|
T: type SignedPeerRecord,
|
||||||
ip: Option[ValidIpAddress],
|
seqNum: uint64,
|
||||||
tcpPort, udpPort: Option[Port]):
|
pk: PrivateKey,
|
||||||
RecordResult[T] =
|
ip: Option[ValidIpAddress],
|
||||||
|
tcpPort, udpPort: Option[Port]):
|
||||||
|
RecordResult[T] =
|
||||||
## Initialize a `SignedPeerRecord` with given sequence number, private key, optional
|
## Initialize a `SignedPeerRecord` with given sequence number, private key, optional
|
||||||
## ip address, tcp port, udp port, and optional custom k:v pairs.
|
## ip address, tcp port, udp port, and optional custom k:v pairs.
|
||||||
##
|
##
|
||||||
|
@ -267,7 +266,9 @@ proc init*(T: type SignedPeerRecord, seqNum: uint64,
|
||||||
let ma = MultiAddress.init(ipAddr, proto, protoPort)
|
let ma = MultiAddress.init(ipAddr, proto, protoPort)
|
||||||
|
|
||||||
let pr = PeerRecord.init(peerId, @[ma], seqNum)
|
let pr = PeerRecord.init(peerId, @[ma], seqNum)
|
||||||
SignedPeerRecord.init(pk, pr).mapErr((e: CryptoError) => ("Failed to init SignedPeerRecord: " & $e).cstring)
|
SignedPeerRecord.init(pk, pr)
|
||||||
|
.mapErr(
|
||||||
|
(e: CryptoError) => ("Failed to init SignedPeerRecord: " & $e).cstring)
|
||||||
|
|
||||||
proc contains*(r: SignedPeerRecord, fp: (string, seq[byte])): bool =
|
proc contains*(r: SignedPeerRecord, fp: (string, seq[byte])): bool =
|
||||||
# TODO: use FieldPair for this, but that is a bit cumbersome. Perhaps the
|
# TODO: use FieldPair for this, but that is a bit cumbersome. Perhaps the
|
||||||
|
@ -280,4 +281,5 @@ proc contains*(r: SignedPeerRecord, fp: (string, seq[byte])): bool =
|
||||||
debugEcho "`contains` is not yet implemented for SignedPeerRecords"
|
debugEcho "`contains` is not yet implemented for SignedPeerRecords"
|
||||||
return false
|
return false
|
||||||
|
|
||||||
proc `==`*(a, b: SignedPeerRecord): bool = a.data == b.data
|
proc `==`*(a, b: SignedPeerRecord): bool =
|
||||||
|
a.data == b.data
|
||||||
|
|
|
@ -110,7 +110,7 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
|
||||||
if packet.messageOpt.isSome():
|
if packet.messageOpt.isSome():
|
||||||
let message = packet.messageOpt.get()
|
let message = packet.messageOpt.get()
|
||||||
trace "Received message packet", srcId = packet.srcId, address = a,
|
trace "Received message packet", srcId = packet.srcId, address = a,
|
||||||
kind = message.kind, packet
|
kind = message.kind, p = $packet
|
||||||
t.client.handleMessage(packet.srcId, a, message)
|
t.client.handleMessage(packet.srcId, a, message)
|
||||||
else:
|
else:
|
||||||
trace "Not decryptable message packet received",
|
trace "Not decryptable message packet received",
|
||||||
|
|
|
@ -150,7 +150,7 @@ suite "Discovery v5 Tests":
|
||||||
privKey = PrivateKey.fromHex(key).expect("Valid private key hex")
|
privKey = PrivateKey.fromHex(key).expect("Valid private key hex")
|
||||||
pubKey = privKey.getPublicKey.expect("Valid private key for public key")
|
pubKey = privKey.getPublicKey.expect("Valid private key for public key")
|
||||||
id = pubKey.toNodeId.expect("Public key valid for node id")
|
id = pubKey.toNodeId.expect("Public key valid for node id")
|
||||||
debugEcho ">>> key: ", key
|
# debugEcho ">>> key: ", key
|
||||||
check logDistance(targetId, id) == d
|
check logDistance(targetId, id) == d
|
||||||
|
|
||||||
test "Distance to id check":
|
test "Distance to id check":
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
--path:".."
|
--path:".."
|
||||||
--threads:on
|
--threads:on
|
||||||
--tlsEmulation:off
|
--tlsEmulation:off
|
||||||
|
-d:chronicles_enabled=off
|
||||||
|
|
Loading…
Reference in New Issue