mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-14 17:04:53 +00:00
feature/use-crypto-rng (#190)
* fixes * fix * Update waku_types.nim * fixes * fix * Improve the generateRequestId call Co-authored-by: kdeme <kim.demey@gmail.com>
This commit is contained in:
parent
25b48bb99e
commit
3eb015ee7b
@ -37,7 +37,7 @@ procSuite "Waku Filter":
|
|||||||
completionFut.complete(true)
|
completionFut.complete(true)
|
||||||
|
|
||||||
let
|
let
|
||||||
proto = WakuFilter.init(dialSwitch, handle)
|
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
|
||||||
rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic")
|
rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic")
|
||||||
|
|
||||||
dialSwitch.mount(proto)
|
dialSwitch.mount(proto)
|
||||||
@ -46,7 +46,7 @@ procSuite "Waku Filter":
|
|||||||
discard
|
discard
|
||||||
|
|
||||||
let
|
let
|
||||||
proto2 = WakuFilter.init(listenSwitch, emptyHandle)
|
proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle)
|
||||||
subscription = proto2.subscription()
|
subscription = proto2.subscription()
|
||||||
|
|
||||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||||
|
@ -30,7 +30,7 @@ procSuite "Waku Store":
|
|||||||
discard await listenSwitch.start()
|
discard await listenSwitch.start()
|
||||||
|
|
||||||
let
|
let
|
||||||
proto = WakuStore.init(dialSwitch)
|
proto = WakuStore.init(dialSwitch, crypto.newRng())
|
||||||
subscription = proto.subscription()
|
subscription = proto.subscription()
|
||||||
rpc = HistoryQuery(topics: @["topic"])
|
rpc = HistoryQuery(topics: @["topic"])
|
||||||
|
|
||||||
|
@ -4,14 +4,13 @@
|
|||||||
|
|
||||||
import
|
import
|
||||||
std/tables,
|
std/tables,
|
||||||
chronos,
|
chronos, bearssl, stew/byteutils,
|
||||||
random,
|
|
||||||
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
|
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/switch,
|
libp2p/switch,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub]
|
libp2p/protocols/pubsub/[pubsub, gossipsub]
|
||||||
|
|
||||||
# Common data types -----------------------------------------------------------
|
# Common data types -----------------------------------------------------------
|
||||||
|
|
||||||
@ -63,6 +62,7 @@ type
|
|||||||
|
|
||||||
WakuStore* = ref object of LPProtocol
|
WakuStore* = ref object of LPProtocol
|
||||||
switch*: Switch
|
switch*: Switch
|
||||||
|
rng*: ref BrHmacDrbgContext
|
||||||
peers*: seq[HistoryPeer]
|
peers*: seq[HistoryPeer]
|
||||||
messages*: seq[WakuMessage]
|
messages*: seq[WakuMessage]
|
||||||
|
|
||||||
@ -86,6 +86,7 @@ type
|
|||||||
MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.}
|
MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.}
|
||||||
|
|
||||||
WakuFilter* = ref object of LPProtocol
|
WakuFilter* = ref object of LPProtocol
|
||||||
|
rng*: ref BrHmacDrbgContext
|
||||||
switch*: Switch
|
switch*: Switch
|
||||||
subscribers*: seq[Subscriber]
|
subscribers*: seq[Subscriber]
|
||||||
pushHandler*: MessagePushHandler
|
pushHandler*: MessagePushHandler
|
||||||
@ -102,6 +103,7 @@ type
|
|||||||
messages*: seq[(Topic, WakuMessage)]
|
messages*: seq[(Topic, WakuMessage)]
|
||||||
filters*: Filters
|
filters*: Filters
|
||||||
subscriptions*: MessageNotificationSubscriptions
|
subscriptions*: MessageNotificationSubscriptions
|
||||||
|
rng*: ref BrHmacDrbgContext
|
||||||
|
|
||||||
WakuRelay* = ref object of GossipSub
|
WakuRelay* = ref object of GossipSub
|
||||||
gossipEnabled*: bool
|
gossipEnabled*: bool
|
||||||
@ -131,6 +133,7 @@ proc notify*(filters: Filters, msg: WakuMessage) =
|
|||||||
if msg.contentTopic in filter.contentFilter.topics:
|
if msg.contentTopic in filter.contentFilter.topics:
|
||||||
filter.handler(msg.payload)
|
filter.handler(msg.payload)
|
||||||
|
|
||||||
proc generateRequestId*(): string =
|
proc generateRequestId*(rng: ref BrHmacDrbgContext): string =
|
||||||
for _ in .. 10:
|
var bytes: array[10, byte]
|
||||||
add(result, char(rand(int('A') .. int('z'))))
|
brHmacDrbgGenerate(rng[], bytes)
|
||||||
|
toHex(bytes)
|
||||||
|
@ -85,6 +85,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||||||
|
|
||||||
result = WakuNode(
|
result = WakuNode(
|
||||||
switch: switch,
|
switch: switch,
|
||||||
|
rng: crypto.newRng(),
|
||||||
peerInfo: peerInfo,
|
peerInfo: peerInfo,
|
||||||
wakuRelay: wakuRelay,
|
wakuRelay: wakuRelay,
|
||||||
subscriptions: newTable[string, MessageNotificationSubscription]()
|
subscriptions: newTable[string, MessageNotificationSubscription]()
|
||||||
@ -103,14 +104,14 @@ proc start*(node: WakuNode) {.async.} =
|
|||||||
|
|
||||||
# NOTE WakuRelay is being instantiated as part of initing node
|
# NOTE WakuRelay is being instantiated as part of initing node
|
||||||
|
|
||||||
node.wakuStore = WakuStore.init(node.switch)
|
node.wakuStore = WakuStore.init(node.switch, node.rng)
|
||||||
node.switch.mount(node.wakuStore)
|
node.switch.mount(node.wakuStore)
|
||||||
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
|
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
|
||||||
|
|
||||||
proc pushHandler(msg: MessagePush) {.async, gcsafe.} =
|
proc pushHandler(msg: MessagePush) {.async, gcsafe.} =
|
||||||
info "push received"
|
info "push received"
|
||||||
|
|
||||||
node.wakuFilter = WakuFilter.init(node.switch, pushHandler)
|
node.wakuFilter = WakuFilter.init(node.switch, node.rng, pushHandler)
|
||||||
node.switch.mount(node.wakuFilter)
|
node.switch.mount(node.wakuFilter)
|
||||||
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import
|
import
|
||||||
std/tables,
|
std/tables,
|
||||||
|
bearssl,
|
||||||
chronos, chronicles, metrics, stew/results,
|
chronos, chronicles, metrics, stew/results,
|
||||||
libp2p/protocols/pubsub/pubsubpeer,
|
libp2p/protocols/pubsub/pubsubpeer,
|
||||||
libp2p/protocols/pubsub/floodsub,
|
libp2p/protocols/pubsub/floodsub,
|
||||||
@ -7,6 +8,7 @@ import
|
|||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
libp2p/switch,
|
libp2p/switch,
|
||||||
./message_notifier,
|
./message_notifier,
|
||||||
./../../node/v2/waku_types
|
./../../node/v2/waku_types
|
||||||
@ -114,8 +116,9 @@ method init*(wf: WakuFilter) =
|
|||||||
wf.handler = handle
|
wf.handler = handle
|
||||||
wf.codec = WakuFilterCodec
|
wf.codec = WakuFilterCodec
|
||||||
|
|
||||||
proc init*(T: type WakuFilter, switch: Switch, handler: MessagePushHandler): T =
|
proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
|
||||||
new result
|
new result
|
||||||
|
result.rng = crypto.newRng()
|
||||||
result.switch = switch
|
result.switch = switch
|
||||||
result.pushHandler = handler
|
result.pushHandler = handler
|
||||||
result.init()
|
result.init()
|
||||||
@ -139,4 +142,4 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
|||||||
|
|
||||||
proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} =
|
proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} =
|
||||||
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
|
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
|
||||||
await conn.writeLP(FilterRPC(requestId: generateRequestId(), request: request).encode().buffer)
|
await conn.writeLP(FilterRPC(requestId: generateRequestId(wf.rng), request: request).encode().buffer)
|
||||||
|
@ -4,7 +4,6 @@
|
|||||||
## Instead, it should likely be on top of GossipSub with a similar interface.
|
## Instead, it should likely be on top of GossipSub with a similar interface.
|
||||||
|
|
||||||
import
|
import
|
||||||
std/strutils,
|
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub],
|
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub],
|
||||||
libp2p/protocols/pubsub/rpc/messages,
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
import
|
import
|
||||||
std/tables,
|
std/tables,
|
||||||
|
bearssl,
|
||||||
chronos, chronicles, metrics, stew/results,
|
chronos, chronicles, metrics, stew/results,
|
||||||
libp2p/switch,
|
libp2p/switch,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
@ -97,8 +99,9 @@ method init*(ws: WakuStore) =
|
|||||||
ws.handler = handle
|
ws.handler = handle
|
||||||
ws.codec = WakuStoreCodec
|
ws.codec = WakuStoreCodec
|
||||||
|
|
||||||
proc init*(T: type WakuStore, switch: Switch): T =
|
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext): T =
|
||||||
new result
|
new result
|
||||||
|
result.rng = rng
|
||||||
result.switch = switch
|
result.switch = switch
|
||||||
result.init()
|
result.init()
|
||||||
|
|
||||||
@ -127,7 +130,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
|
|||||||
let peer = w.peers[0]
|
let peer = w.peers[0]
|
||||||
let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
|
let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
|
||||||
|
|
||||||
await conn.writeLP(HistoryRPC(requestId: "foo", query: query).encode().buffer)
|
await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer)
|
||||||
|
|
||||||
var message = await conn.readLp(64*1024)
|
var message = await conn.readLp(64*1024)
|
||||||
let response = HistoryRPC.init(message)
|
let response = HistoryRPC.init(message)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user