mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 22:43:09 +00:00
Bump submodules (#157)
* Upgrade all submodules * Use stock standard_setup and remove our version Switch no longer relies on Pubsub argument * Fix peerId * Add reference to WakuRelay in WakuNode * Use WakuRelay ref directly instead of via switch * Tweak standard switch sig * Fix start_network peerid * Import nim-libp2p utils test * Use WakuRelay in test utils * Fix utils imports * Tweak * Fix trigger self test * Disable broken filter test * Fix and amend logscope topics * Make subscribe calls async to use await * Add debug stuff to nimble file * Await for subscribe content * Sleeping in tests * Local checkout fixes * XXX: Try to use .PubSub on WakuRelay * Revert "XXX: Try to use .PubSub on WakuRelay" This reverts commit 3a3139e4cfbb5ae9500fd30b2e79c676ccc4a53b. * Only using gossip seems to work Subscribe for floodsub broken * Fix await in examples * Get rid of double publish, still need sleep
This commit is contained in:
parent
dfee0359af
commit
a8dbf8a7b6
@ -32,7 +32,7 @@ proc runBackground() {.async.} =
|
|||||||
let message = WakuMessage.init(data).value
|
let message = WakuMessage.init(data).value
|
||||||
let payload = cast[string](message.payload)
|
let payload = cast[string](message.payload)
|
||||||
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
|
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
|
||||||
node.subscribe(topic, handler)
|
await node.subscribe(topic, handler)
|
||||||
|
|
||||||
# Publish to a topic
|
# Publish to a topic
|
||||||
let payload = cast[seq[byte]]("hello world")
|
let payload = cast[seq[byte]]("hello world")
|
||||||
|
|||||||
@ -3,5 +3,6 @@ import
|
|||||||
# TODO: enable this when it is altered into a proper waku relay test
|
# TODO: enable this when it is altered into a proper waku relay test
|
||||||
# ./v2/test_waku,
|
# ./v2/test_waku,
|
||||||
./v2/test_wakunode,
|
./v2/test_wakunode,
|
||||||
./v2/test_waku_store,
|
./v2/test_waku_store
|
||||||
./v2/test_waku_filter
|
# NOTE: Disabling broken filter protocol, we don't rely on it for Nangang
|
||||||
|
# ./v2/test_waku_filter
|
||||||
|
|||||||
@ -45,10 +45,10 @@ procSuite "WakuNode":
|
|||||||
await node.start()
|
await node.start()
|
||||||
|
|
||||||
# Subscribe our node to the pubSubTopic where all chat data go onto.
|
# Subscribe our node to the pubSubTopic where all chat data go onto.
|
||||||
node.subscribe(pubSubTopic, relayHandler)
|
await node.subscribe(pubSubTopic, relayHandler)
|
||||||
# Subscribe a contentFilter to trigger a specific application handler when
|
# Subscribe a contentFilter to trigger a specific application handler when
|
||||||
# WakuMessages with that content are received
|
# WakuMessages with that content are received
|
||||||
node.subscribe(contentFilter, contentHandler)
|
await node.subscribe(contentFilter, contentHandler)
|
||||||
|
|
||||||
node.publish(pubSubTopic, message)
|
node.publish(pubSubTopic, message)
|
||||||
|
|
||||||
@ -92,12 +92,18 @@ procSuite "WakuNode":
|
|||||||
await allFutures([node1.start(), node2.start()])
|
await allFutures([node1.start(), node2.start()])
|
||||||
|
|
||||||
# Subscribe our node to the pubSubTopic where all chat data go onto.
|
# Subscribe our node to the pubSubTopic where all chat data go onto.
|
||||||
node1.subscribe(pubSubTopic, relayHandler)
|
await node1.subscribe(pubSubTopic, relayHandler)
|
||||||
# Subscribe a contentFilter to trigger a specific application handler when
|
# Subscribe a contentFilter to trigger a specific application handler when
|
||||||
# WakuMessages with that content are received
|
# WakuMessages with that content are received
|
||||||
node1.subscribe(contentFilter, contentHandler1)
|
await node1.subscribe(contentFilter, contentHandler1)
|
||||||
# Connect peers by dialing from node2 to node1
|
# Connect peers by dialing from node2 to node1
|
||||||
let conn = await node2.switch.dial(node1.peerInfo, WakuRelayCodec)
|
let conn = await node2.switch.dial(node1.peerInfo, WakuRelayCodec)
|
||||||
|
#
|
||||||
|
# We need to sleep to allow the subscription to go through
|
||||||
|
info "Going to sleep to allow subscribe to go through"
|
||||||
|
await sleepAsync(2000.millis)
|
||||||
|
|
||||||
|
info "Waking up and publishing"
|
||||||
node2.publish(pubSubTopic, message)
|
node2.publish(pubSubTopic, message)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
|||||||
@ -1,16 +1,76 @@
|
|||||||
|
# compile time options here
|
||||||
|
const
|
||||||
|
libp2p_pubsub_sign {.booldefine.} = true
|
||||||
|
libp2p_pubsub_verify {.booldefine.} = true
|
||||||
|
|
||||||
|
import random
|
||||||
import chronos
|
import chronos
|
||||||
import ../../waku/node/v2/standard_setup
|
import libp2p/[standard_setup,
|
||||||
|
protocols/pubsub/pubsub,
|
||||||
|
protocols/pubsub/floodsub,
|
||||||
|
protocols/pubsub/gossipsub,
|
||||||
|
protocols/secure/secure]
|
||||||
|
import ../../waku/protocol/v2/waku_relay,
|
||||||
|
../../waku/node/v2/waku_types
|
||||||
|
|
||||||
export standard_setup
|
export standard_setup
|
||||||
|
|
||||||
proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
|
randomize()
|
||||||
for i in 0..<num:
|
|
||||||
result.add(newStandardSwitch(gossip = gossip))
|
|
||||||
|
|
||||||
proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
|
proc generateNodes*(
|
||||||
var dials: seq[Future[void]]
|
num: Natural,
|
||||||
|
secureManagers: openarray[SecureProtocol] = [
|
||||||
|
# array cos order matters
|
||||||
|
SecureProtocol.Secio,
|
||||||
|
SecureProtocol.Noise,
|
||||||
|
],
|
||||||
|
msgIdProvider: MsgIdProvider = nil,
|
||||||
|
gossip: bool = false,
|
||||||
|
triggerSelf: bool = false,
|
||||||
|
verifySignature: bool = libp2p_pubsub_verify,
|
||||||
|
sign: bool = libp2p_pubsub_sign): seq[PubSub] =
|
||||||
|
|
||||||
|
for i in 0..<num:
|
||||||
|
let switch = newStandardSwitch(secureManagers = secureManagers)
|
||||||
|
let wakuRelay = WakuRelay.init(
|
||||||
|
switch = switch,
|
||||||
|
triggerSelf = triggerSelf,
|
||||||
|
verifySignature = verifySignature,
|
||||||
|
sign = sign,
|
||||||
|
# XXX unclear why including this causes a compiler error, it is part of WakuRelay type
|
||||||
|
#gossipEnabled = gossip,
|
||||||
|
msgIdProvider = msgIdProvider).PubSub
|
||||||
|
|
||||||
|
switch.mount(wakuRelay)
|
||||||
|
result.add(wakuRelay)
|
||||||
|
|
||||||
|
proc subscribeNodes*(nodes: seq[PubSub]) {.async.} =
|
||||||
for dialer in nodes:
|
for dialer in nodes:
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
if dialer.peerInfo.peerId != node.peerInfo.peerId:
|
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
|
||||||
dials.add(dialer.connect(node.peerInfo))
|
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
|
||||||
# TODO: Hmm, does this make sense?
|
dialer.subscribePeer(node.peerInfo.peerId)
|
||||||
await allFutures(dials)
|
|
||||||
|
proc subscribeSparseNodes*(nodes: seq[PubSub], degree: int = 2) {.async.} =
|
||||||
|
if nodes.len < degree:
|
||||||
|
raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
|
||||||
|
|
||||||
|
for i, dialer in nodes:
|
||||||
|
if (i mod degree) != 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
if dialer.switch.peerInfo.peerId != node.peerInfo.peerId:
|
||||||
|
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
|
||||||
|
dialer.subscribePeer(node.peerInfo.peerId)
|
||||||
|
|
||||||
|
proc subscribeRandom*(nodes: seq[PubSub]) {.async.} =
|
||||||
|
for dialer in nodes:
|
||||||
|
var dialed: seq[PeerID]
|
||||||
|
while dialed.len < nodes.len - 1:
|
||||||
|
let node = sample(nodes)
|
||||||
|
if node.peerInfo.peerId notin dialed:
|
||||||
|
if dialer.peerInfo.peerId != node.peerInfo.peerId:
|
||||||
|
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
|
||||||
|
dialer.subscribePeer(node.peerInfo.peerId)
|
||||||
|
dialed.add(node.peerInfo.peerId)
|
||||||
|
|||||||
2
vendor/news
vendored
2
vendor/news
vendored
@ -1 +1 @@
|
|||||||
Subproject commit a3a6e3ae5ff16126942f4febe746ca4da978072b
|
Subproject commit e1d63564a2a411f264e75694b8f7c66e50c3a4cb
|
||||||
2
vendor/nim-bearssl
vendored
2
vendor/nim-bearssl
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 33b2303fc3b64359970b77bb09274c3e012ff37f
|
Subproject commit ba5f4687987817902c2727e30b35cb5ad1e61203
|
||||||
2
vendor/nim-chronicles
vendored
2
vendor/nim-chronicles
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 3e42e9a33e3e490d9f18d8e33219f67f64f4d8a7
|
Subproject commit b60f70718f8039c5c86dfc2a4680d8c1e37cbce2
|
||||||
2
vendor/nim-chronos
vendored
2
vendor/nim-chronos
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 3968f09ae1cd3e8c4e73729cbccf2587ff669203
|
Subproject commit 21349807444b86f9ce7e2b533a3ecc690f3edc3d
|
||||||
2
vendor/nim-eth
vendored
2
vendor/nim-eth
vendored
@ -1 +1 @@
|
|||||||
Subproject commit ac5155394f25c2049c847e328dcfc67a01547a52
|
Subproject commit ea0dbb256e7f911a1ea39d8f53597c947923acae
|
||||||
2
vendor/nim-faststreams
vendored
2
vendor/nim-faststreams
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 5df69fc6961e58205189cd92ae2477769fa8c4c0
|
Subproject commit 87309f3120d4e627082171a188324d3ee14d8986
|
||||||
2
vendor/nim-json-rpc
vendored
2
vendor/nim-json-rpc
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 244254632b15c745b6e15537aafd401f360d9928
|
Subproject commit 62349c39c62986396a25dad300894899bef5f79f
|
||||||
2
vendor/nim-json-serialization
vendored
2
vendor/nim-json-serialization
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 1cf51931f1037a2c44fa0912386273c01a0e0e42
|
Subproject commit 1dccd4b2ef14c5e3ce30ad3f3a0962e0b98da6a3
|
||||||
2
vendor/nim-libbacktrace
vendored
2
vendor/nim-libbacktrace
vendored
@ -1 +1 @@
|
|||||||
Subproject commit dc2c199d41dc90de75043d1ee4efe5e0323932bf
|
Subproject commit c21107e34c9f5d2c9fd5d313a6a5b554ddafa141
|
||||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 38eb36efaee09551e0cd6c1d4530c9abfe9cb322
|
Subproject commit 96d4c44fec3b16a65d5ed943b85e474bfc3e1de8
|
||||||
2
vendor/nim-metrics
vendored
2
vendor/nim-metrics
vendored
@ -1 +1 @@
|
|||||||
Subproject commit f91deb74228ecb14fb82575e4d0f387ad9732b8a
|
Subproject commit 77305f3b3ddfef5aff65b685effc7e7a1d1cf59c
|
||||||
2
vendor/nim-nat-traversal
vendored
2
vendor/nim-nat-traversal
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 2403c33929c74f2d150f50dc8bc3a598af70661a
|
Subproject commit eb24b2cf56958f2f5bd219abff20ebf45de9a588
|
||||||
2
vendor/nim-secp256k1
vendored
2
vendor/nim-secp256k1
vendored
@ -1 +1 @@
|
|||||||
Subproject commit bf6cc94a3cbab16cf7ffadad11b50c52f161d0a8
|
Subproject commit fb9699702b44f194b5926c8ab4a004cff676b435
|
||||||
2
vendor/nim-serialization
vendored
2
vendor/nim-serialization
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 5b11c4173159a15d52f0bb6a26a1aa2b40f0f2be
|
Subproject commit 474bdbf49cf1634ba504888ad1a1927a2703bd3f
|
||||||
2
vendor/nim-stew
vendored
2
vendor/nim-stew
vendored
@ -1 +1 @@
|
|||||||
Subproject commit ec2f52b0cea1f1daa33f38ca4ba289d8f40f4104
|
Subproject commit 1db43c7234acb9554e3e80bf2e7b61c4cf0435cf
|
||||||
2
vendor/nimbus-build-system
vendored
2
vendor/nimbus-build-system
vendored
@ -1 +1 @@
|
|||||||
Subproject commit e0d9939f9fce2bcbcf3451689b33bb3423fcf58e
|
Subproject commit a8cafce7c0d16c3c5a0d928b4a0e31207d399de0
|
||||||
2
vendor/nimcrypto
vendored
2
vendor/nimcrypto
vendored
@ -1 +1 @@
|
|||||||
Subproject commit f767595f4ddec2b5570b5194feb96954c00a6499
|
Subproject commit a065c1741836462762d18d2fced1fedd46095b02
|
||||||
@ -33,7 +33,11 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
|
|||||||
exec "nim " & lang & " --out:build/" & name & " " & extra_params & " " & srcDir & name & ".nim"
|
exec "nim " & lang & " --out:build/" & name & " " & extra_params & " " & srcDir & name & ".nim"
|
||||||
|
|
||||||
proc test(name: string, lang = "c") =
|
proc test(name: string, lang = "c") =
|
||||||
buildBinary name, "tests/", "-d:chronicles_log_level=ERROR"
|
# XXX: When running `> NIM_PARAMS="-d:chronicles_log_level=INFO" make test2`
|
||||||
|
# I expect compiler flag to be overridden, however it stays with whatever is
|
||||||
|
# specified here.
|
||||||
|
buildBinary name, "tests/", "-d:chronicles_log_level=DEBUG"
|
||||||
|
#buildBinary name, "tests/", "-d:chronicles_log_level=ERROR"
|
||||||
exec "build/" & name
|
exec "build/" & name
|
||||||
|
|
||||||
### Tasks
|
### Tasks
|
||||||
|
|||||||
@ -23,8 +23,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
|
|||||||
|
|
||||||
# TODO: Implement symkey etc logic
|
# TODO: Implement symkey etc logic
|
||||||
rpcsrv.rpc("waku_publish") do(topic: string, payload: seq[byte]) -> bool:
|
rpcsrv.rpc("waku_publish") do(topic: string, payload: seq[byte]) -> bool:
|
||||||
# XXX Why is casting necessary here but not in Nim node API?
|
let wakuRelay = node.wakuRelay
|
||||||
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
|
|
||||||
# XXX also future return type
|
# XXX also future return type
|
||||||
# TODO: Shouldn't we really be doing WakuNode publish here?
|
# TODO: Shouldn't we really be doing WakuNode publish here?
|
||||||
debug "waku_publish", topic=topic, payload=payload
|
debug "waku_publish", topic=topic, payload=payload
|
||||||
@ -61,7 +60,8 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
|
|||||||
var readable_str = cast[string](msg[].payload)
|
var readable_str = cast[string](msg[].payload)
|
||||||
info "Hit subscribe handler", topic=topic, msg=msg[], payload=readable_str
|
info "Hit subscribe handler", topic=topic, msg=msg[], payload=readable_str
|
||||||
|
|
||||||
node.subscribe(topic, handler)
|
# XXX: Can we make this context async to use await?
|
||||||
|
discard node.subscribe(topic, handler)
|
||||||
return true
|
return true
|
||||||
#if not result:
|
#if not result:
|
||||||
# raise newException(ValueError, "Message could not be posted")
|
# raise newException(ValueError, "Message could not be posted")
|
||||||
|
|||||||
@ -1,76 +0,0 @@
|
|||||||
# compile time options here
|
|
||||||
const
|
|
||||||
libp2p_secure {.strdefine.} = ""
|
|
||||||
libp2p_pubsub_sign {.booldefine.} = true
|
|
||||||
libp2p_pubsub_verify {.booldefine.} = true
|
|
||||||
|
|
||||||
import
|
|
||||||
options, tables, chronicles, chronos,
|
|
||||||
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
|
|
||||||
libp2p/stream/connection,
|
|
||||||
libp2p/transports/[transport, tcptransport],
|
|
||||||
libp2p/muxers/[muxer, mplex/mplex, mplex/types],
|
|
||||||
libp2p/protocols/[identify, secure/secure],
|
|
||||||
libp2p/protocols/pubsub/pubsub,
|
|
||||||
../../protocol/v2/waku_relay
|
|
||||||
|
|
||||||
import
|
|
||||||
libp2p/protocols/secure/noise,
|
|
||||||
libp2p/protocols/secure/secio
|
|
||||||
|
|
||||||
export
|
|
||||||
switch, peerinfo, connection, multiaddress, crypto
|
|
||||||
|
|
||||||
type
|
|
||||||
SecureProtocol* {.pure.} = enum
|
|
||||||
Noise,
|
|
||||||
Secio
|
|
||||||
|
|
||||||
proc newStandardSwitch*(privKey = none(PrivateKey),
|
|
||||||
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
|
||||||
triggerSelf = false,
|
|
||||||
gossip = false,
|
|
||||||
secureManagers: openarray[SecureProtocol] = [
|
|
||||||
# NOTE below relates to Eth2
|
|
||||||
# TODO investigate why we're getting fewer peers on public testnets with noise
|
|
||||||
SecureProtocol.Secio,
|
|
||||||
SecureProtocol.Noise, # array cos order matters
|
|
||||||
],
|
|
||||||
verifySignature = libp2p_pubsub_verify,
|
|
||||||
sign = libp2p_pubsub_sign,
|
|
||||||
transportFlags: set[ServerFlags] = {},
|
|
||||||
rng = newRng(),
|
|
||||||
inTimeout: Duration = 1.minutes,
|
|
||||||
outTimeout: Duration = 1.minutes): Switch =
|
|
||||||
info "newStandardSwitch"
|
|
||||||
proc createMplex(conn: Connection): Muxer =
|
|
||||||
Mplex.init(
|
|
||||||
conn,
|
|
||||||
inTimeout = inTimeout,
|
|
||||||
outTimeout = outTimeout)
|
|
||||||
|
|
||||||
let
|
|
||||||
seckey = privKey.get(otherwise = PrivateKey.random(ECDSA, rng[]).tryGet())
|
|
||||||
peerInfo = PeerInfo.init(seckey, [address])
|
|
||||||
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
|
||||||
transports = @[Transport(TcpTransport.init(transportFlags))]
|
|
||||||
muxers = {MplexCodec: mplexProvider}.toTable
|
|
||||||
identify = newIdentify(peerInfo)
|
|
||||||
|
|
||||||
var
|
|
||||||
secureManagerInstances: seq[Secure]
|
|
||||||
for sec in secureManagers:
|
|
||||||
case sec
|
|
||||||
of SecureProtocol.Noise:
|
|
||||||
secureManagerInstances &= newNoise(rng, seckey).Secure
|
|
||||||
of SecureProtocol.Secio:
|
|
||||||
secureManagerInstances &= newSecio(rng, seckey).Secure
|
|
||||||
|
|
||||||
let pubSub = PubSub newPubSub(WakuRelay, peerInfo, triggerSelf)
|
|
||||||
|
|
||||||
result = newSwitch(peerInfo,
|
|
||||||
transports,
|
|
||||||
identify,
|
|
||||||
muxers,
|
|
||||||
secureManagers = secureManagerInstances,
|
|
||||||
pubSub = some(pubSub))
|
|
||||||
@ -46,7 +46,7 @@ proc initNodeCmd(shift: int, staticNodes: seq[string] = @[], master = false, lab
|
|||||||
info "Address", address
|
info "Address", address
|
||||||
# TODO: Need to port shift
|
# TODO: Need to port shift
|
||||||
peerInfo.addrs.add(hostAddress)
|
peerInfo.addrs.add(hostAddress)
|
||||||
let id = peerInfo.id
|
let id = $peerInfo.peerId
|
||||||
|
|
||||||
info "PeerInfo", id = id, addrs = peerInfo.addrs
|
info "PeerInfo", id = id, addrs = peerInfo.addrs
|
||||||
let listenStr = $peerInfo.addrs[0] & "/p2p/" & id
|
let listenStr = $peerInfo.addrs[0] & "/p2p/" & id
|
||||||
|
|||||||
@ -6,7 +6,8 @@ import
|
|||||||
std/tables,
|
std/tables,
|
||||||
chronos,
|
chronos,
|
||||||
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
|
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
|
||||||
libp2p/protobuf/minprotobuf
|
libp2p/protobuf/minprotobuf,
|
||||||
|
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub]
|
||||||
|
|
||||||
# Common data types -----------------------------------------------------------
|
# Common data types -----------------------------------------------------------
|
||||||
|
|
||||||
@ -31,6 +32,7 @@ type
|
|||||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||||
WakuNode* = ref object of RootObj
|
WakuNode* = ref object of RootObj
|
||||||
switch*: Switch
|
switch*: Switch
|
||||||
|
wakuRelay*: WakuRelay
|
||||||
peerInfo*: PeerInfo
|
peerInfo*: PeerInfo
|
||||||
libp2pTransportLoops*: seq[Future[void]]
|
libp2pTransportLoops*: seq[Future[void]]
|
||||||
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
|
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
|
||||||
@ -41,6 +43,9 @@ type
|
|||||||
payload*: seq[byte]
|
payload*: seq[byte]
|
||||||
contentTopic*: string
|
contentTopic*: string
|
||||||
|
|
||||||
|
WakuRelay* = ref object of GossipSub
|
||||||
|
gossipEnabled*: bool
|
||||||
|
|
||||||
# Encoding and decoding -------------------------------------------------------
|
# Encoding and decoding -------------------------------------------------------
|
||||||
|
|
||||||
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
|
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
|||||||
@ -9,8 +9,12 @@ import
|
|||||||
# NOTE For TopicHandler, solve with exports?
|
# NOTE For TopicHandler, solve with exports?
|
||||||
libp2p/protocols/pubsub/pubsub,
|
libp2p/protocols/pubsub/pubsub,
|
||||||
libp2p/peerinfo,
|
libp2p/peerinfo,
|
||||||
|
libp2p/standard_setup,
|
||||||
../../protocol/v2/[waku_relay, waku_store, waku_filter],
|
../../protocol/v2/[waku_relay, waku_store, waku_filter],
|
||||||
./waku_types, ./standard_setup
|
./waku_types
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakunode"
|
||||||
|
|
||||||
# Default clientId
|
# Default clientId
|
||||||
const clientId* = "Nimbus Waku v2 node"
|
const clientId* = "Nimbus Waku v2 node"
|
||||||
@ -67,20 +71,38 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||||||
# XXX: Add this when we create node or start it?
|
# XXX: Add this when we create node or start it?
|
||||||
peerInfo.addrs.add(hostAddress)
|
peerInfo.addrs.add(hostAddress)
|
||||||
|
|
||||||
var switch = newStandardSwitch(some(nodekey), hostAddress, triggerSelf = true)
|
var switch = newStandardSwitch(some(nodekey), hostAddress)
|
||||||
|
# TODO Untested - verify behavior after switch interface change
|
||||||
|
# More like this:
|
||||||
|
# let pubsub = GossipSub.init(
|
||||||
|
# switch = switch,
|
||||||
|
# msgIdProvider = msgIdProvider,
|
||||||
|
# triggerSelf = true, sign = false,
|
||||||
|
# verifySignature = false).PubSub
|
||||||
|
let wakuRelay = WakuRelay.init(
|
||||||
|
switch = switch,
|
||||||
|
# Use default
|
||||||
|
#msgIdProvider = msgIdProvider,
|
||||||
|
triggerSelf = true,
|
||||||
|
sign = false,
|
||||||
|
verifySignature = false)
|
||||||
|
# This gets messy with: .PubSub
|
||||||
|
switch.mount(wakuRelay)
|
||||||
|
|
||||||
result = WakuNode(switch: switch, peerInfo: peerInfo)
|
result = WakuNode(switch: switch, peerInfo: peerInfo, wakuRelay: wakuRelay)
|
||||||
|
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
debug "Hit handler", topic=topic, data=data
|
debug "Hit handler", topic=topic, data=data
|
||||||
|
|
||||||
result.subscribe(topic, handler)
|
# XXX: Is using discard here fine? Not sure if we want init to be async?
|
||||||
|
# Can also move this to the start proc, possibly wiser?
|
||||||
|
discard result.subscribe(topic, handler)
|
||||||
|
|
||||||
proc start*(node: WakuNode) {.async.} =
|
proc start*(node: WakuNode) {.async.} =
|
||||||
node.libp2pTransportLoops = await node.switch.start()
|
node.libp2pTransportLoops = await node.switch.start()
|
||||||
|
|
||||||
# NOTE WakuRelay is being instantiated as part of creating switch with PubSub field set
|
# NOTE WakuRelay is being instantiated as part of initing node
|
||||||
let storeProto = WakuStore.init()
|
let storeProto = WakuStore.init()
|
||||||
node.switch.mount(storeProto)
|
node.switch.mount(storeProto)
|
||||||
|
|
||||||
@ -89,37 +111,37 @@ proc start*(node: WakuNode) {.async.} =
|
|||||||
|
|
||||||
# TODO Get this from WakuNode obj
|
# TODO Get this from WakuNode obj
|
||||||
let peerInfo = node.peerInfo
|
let peerInfo = node.peerInfo
|
||||||
let id = peerInfo.peerId.pretty
|
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
|
||||||
info "PeerInfo", id = id, addrs = peerInfo.addrs
|
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
|
||||||
let listenStr = $peerInfo.addrs[0] & "/p2p/" & id
|
|
||||||
## XXX: this should be /ip4..., / stripped?
|
## XXX: this should be /ip4..., / stripped?
|
||||||
info "Listening on", full = listenStr
|
info "Listening on", full = listenStr
|
||||||
|
|
||||||
proc stop*(node: WakuNode) {.async.} =
|
proc stop*(node: WakuNode) {.async.} =
|
||||||
let wakuRelay = node.switch.pubSub.get()
|
let wakuRelay = node.wakuRelay
|
||||||
await wakuRelay.stop()
|
await wakuRelay.stop()
|
||||||
|
|
||||||
await node.switch.stop()
|
await node.switch.stop()
|
||||||
|
|
||||||
proc subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) =
|
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
|
||||||
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
|
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
|
||||||
## this topic. TopicHandler is a method that takes a topic and some data.
|
## this topic. TopicHandler is a method that takes a topic and some data.
|
||||||
##
|
##
|
||||||
## NOTE The data field SHOULD be decoded as a WakuMessage.
|
## NOTE The data field SHOULD be decoded as a WakuMessage.
|
||||||
## Status: Implemented.
|
## Status: Implemented.
|
||||||
|
info "subscribe", topic=topic
|
||||||
|
|
||||||
let wakuRelay = w.switch.pubSub.get()
|
let wakuRelay = node.wakuRelay
|
||||||
# XXX Consider awaiting here
|
await wakuRelay.subscribe(topic, handler)
|
||||||
discard wakuRelay.subscribe(topic, handler)
|
|
||||||
|
|
||||||
proc subscribe*(w: WakuNode, contentFilter: waku_types.ContentFilter, handler: ContentFilterHandler) =
|
proc subscribe*(node: WakuNode, contentFilter: waku_types.ContentFilter, handler: ContentFilterHandler) {.async.} =
|
||||||
## Subscribes to a ContentFilter. Triggers handler when receiving messages on
|
## Subscribes to a ContentFilter. Triggers handler when receiving messages on
|
||||||
## this content filter. ContentFilter is a method that takes some content
|
## this content filter. ContentFilter is a method that takes some content
|
||||||
## filter, specifically with `ContentTopic`, and a `Message`. The `Message`
|
## filter, specifically with `ContentTopic`, and a `Message`. The `Message`
|
||||||
## has to match the `ContentTopic`.
|
## has to match the `ContentTopic`.
|
||||||
|
info "subscribe content", contentFilter=contentFilter
|
||||||
|
|
||||||
# TODO: get some random id, or use the Filter directly as key
|
# TODO: get some random id, or use the Filter directly as key
|
||||||
w.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler))
|
node.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler))
|
||||||
|
|
||||||
proc unsubscribe*(w: WakuNode, topic: Topic) =
|
proc unsubscribe*(w: WakuNode, topic: Topic) =
|
||||||
echo "NYI"
|
echo "NYI"
|
||||||
@ -144,8 +166,7 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
|
|||||||
## Status: Implemented.
|
## Status: Implemented.
|
||||||
##
|
##
|
||||||
|
|
||||||
# TODO Basic getter function for relay
|
let wakuRelay = node.wakuRelay
|
||||||
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
|
|
||||||
|
|
||||||
# XXX Unclear what the purpose of this is
|
# XXX Unclear what the purpose of this is
|
||||||
# Commenting out as it is later expected to be Message type, not WakuMessage
|
# Commenting out as it is later expected to be Message type, not WakuMessage
|
||||||
|
|||||||
@ -14,6 +14,9 @@ import
|
|||||||
# should be direct payload exchange (a la req-resp), not be coupled with the
|
# should be direct payload exchange (a la req-resp), not be coupled with the
|
||||||
# relay protocol.
|
# relay protocol.
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakufilter"
|
||||||
|
|
||||||
const
|
const
|
||||||
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha5"
|
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha5"
|
||||||
|
|
||||||
|
|||||||
@ -8,19 +8,16 @@ import
|
|||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub],
|
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub],
|
||||||
libp2p/protocols/pubsub/rpc/messages,
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
libp2p/stream/connection
|
libp2p/stream/connection,
|
||||||
|
../../node/v2/waku_types
|
||||||
|
|
||||||
declarePublicGauge total_messages, "number of messages received"
|
declarePublicGauge total_messages, "number of messages received"
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topic = "WakuRelay"
|
topics = "wakurelay"
|
||||||
|
|
||||||
const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2"
|
const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2"
|
||||||
|
|
||||||
type
|
|
||||||
WakuRelay* = ref object of GossipSub
|
|
||||||
gossipEnabled*: bool
|
|
||||||
|
|
||||||
method init*(w: WakuRelay) =
|
method init*(w: WakuRelay) =
|
||||||
debug "init"
|
debug "init"
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
@ -40,7 +37,8 @@ method initPubSub*(w: WakuRelay) =
|
|||||||
debug "initWakuRelay"
|
debug "initWakuRelay"
|
||||||
|
|
||||||
# Not using GossipSub
|
# Not using GossipSub
|
||||||
w.gossipEnabled = false
|
# XXX: FloodSub subscribe doesn't work
|
||||||
|
w.gossipEnabled = true
|
||||||
|
|
||||||
if w.gossipEnabled:
|
if w.gossipEnabled:
|
||||||
procCall GossipSub(w).initPubSub()
|
procCall GossipSub(w).initPubSub()
|
||||||
|
|||||||
@ -7,6 +7,9 @@ import
|
|||||||
./message_notifier,
|
./message_notifier,
|
||||||
./../../node/v2/waku_types
|
./../../node/v2/waku_types
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakustore"
|
||||||
|
|
||||||
const
|
const
|
||||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha5"
|
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha5"
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user