mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
parameterize floodsub/gossipsub
This commit is contained in:
parent
7b14649f2d
commit
407ec3534f
@ -24,7 +24,6 @@ export
|
|||||||
proc newStandardSwitch*(privKey = none(PrivateKey),
|
proc newStandardSwitch*(privKey = none(PrivateKey),
|
||||||
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||||
triggerSelf = false,
|
triggerSelf = false,
|
||||||
gossip = false,
|
|
||||||
verifySignature = libp2p_pubsub_verify,
|
verifySignature = libp2p_pubsub_verify,
|
||||||
sign = libp2p_pubsub_sign,
|
sign = libp2p_pubsub_sign,
|
||||||
transportFlags: set[ServerFlags] = {}): Switch =
|
transportFlags: set[ServerFlags] = {}): Switch =
|
||||||
@ -43,14 +42,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
|
|||||||
let secureManagers = {NoiseCodec: newNoise(seckey).Secure}.toTable
|
let secureManagers = {NoiseCodec: newNoise(seckey).Secure}.toTable
|
||||||
else:
|
else:
|
||||||
let secureManagers = {SecioCodec: newSecio(seckey).Secure}.toTable
|
let secureManagers = {SecioCodec: newSecio(seckey).Secure}.toTable
|
||||||
let pubSub = if gossip:
|
let pubSub = PubSub newPubSub(WakuSub, peerInfo, triggerSelf)
|
||||||
PubSub newPubSub(GossipSub, peerInfo, triggerSelf)
|
|
||||||
else:
|
|
||||||
# Creating switch from generate node
|
|
||||||
# XXX: Hacky test, hijacking WakuSub here
|
|
||||||
debug "Using WakuSub here"
|
|
||||||
#PubSub newPubSub(FloodSub, peerInfo, triggerSelf)
|
|
||||||
PubSub newPubSub(WakuSub, peerInfo, triggerSelf)
|
|
||||||
|
|
||||||
result = newSwitch(peerInfo,
|
result = newSwitch(peerInfo,
|
||||||
transports,
|
transports,
|
||||||
|
|||||||
@ -141,7 +141,7 @@ proc run(config: WakuNodeConf) =
|
|||||||
|
|
||||||
# switch.pubsub = wakusub, plus all the peer info etc
|
# switch.pubsub = wakusub, plus all the peer info etc
|
||||||
# And it has wakuProto lets use wakuProto maybe, cause it has switch
|
# And it has wakuProto lets use wakuProto maybe, cause it has switch
|
||||||
var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true, gossip = false)
|
var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true)
|
||||||
let wakuProto = newWakuProto(switch)
|
let wakuProto = newWakuProto(switch)
|
||||||
switch.mount(wakuProto)
|
switch.mount(wakuProto)
|
||||||
|
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import strutils
|
|||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import libp2p/protocols/pubsub/pubsub,
|
import libp2p/protocols/pubsub/pubsub,
|
||||||
libp2p/protocols/pubsub/pubsubpeer,
|
libp2p/protocols/pubsub/pubsubpeer,
|
||||||
|
libp2p/protocols/pubsub/floodsub,
|
||||||
libp2p/protocols/pubsub/gossipsub,
|
libp2p/protocols/pubsub/gossipsub,
|
||||||
libp2p/protocols/pubsub/rpc/[messages],
|
libp2p/protocols/pubsub/rpc/[messages],
|
||||||
libp2p/connection
|
libp2p/connection
|
||||||
@ -28,6 +29,7 @@ type
|
|||||||
WakuSub* = ref object of GossipSub
|
WakuSub* = ref object of GossipSub
|
||||||
# XXX: just playing
|
# XXX: just playing
|
||||||
text*: string
|
text*: string
|
||||||
|
gossip_enabled*: bool
|
||||||
|
|
||||||
method init(w: WakuSub) =
|
method init(w: WakuSub) =
|
||||||
debug "init"
|
debug "init"
|
||||||
@ -51,8 +53,14 @@ method initPubSub*(w: WakuSub) =
|
|||||||
debug "initWakuSub"
|
debug "initWakuSub"
|
||||||
w.text = "Foobar"
|
w.text = "Foobar"
|
||||||
debug "w.text", text = w.text
|
debug "w.text", text = w.text
|
||||||
# XXX
|
|
||||||
procCall GossipSub(w).initPubSub()
|
w.gossip_enabled = true
|
||||||
|
|
||||||
|
if w.gossip_enabled:
|
||||||
|
procCall GossipSub(w).initPubSub()
|
||||||
|
else:
|
||||||
|
procCall FloodSub(w).initPubSub()
|
||||||
|
|
||||||
w.init()
|
w.init()
|
||||||
|
|
||||||
method subscribe*(w: WakuSub,
|
method subscribe*(w: WakuSub,
|
||||||
@ -60,7 +68,11 @@ method subscribe*(w: WakuSub,
|
|||||||
handler: TopicHandler) {.async.} =
|
handler: TopicHandler) {.async.} =
|
||||||
debug "subscribe", topic=topic
|
debug "subscribe", topic=topic
|
||||||
# XXX: Pubsub really
|
# XXX: Pubsub really
|
||||||
await procCall GossipSub(w).subscribe(topic, handler)
|
|
||||||
|
if w.gossip_enabled:
|
||||||
|
await procCall GossipSub(w).subscribe(topic, handler)
|
||||||
|
else:
|
||||||
|
await procCall FloodSub(w).subscribe(topic, handler)
|
||||||
|
|
||||||
# Subscribing a peer to a specified topic
|
# Subscribing a peer to a specified topic
|
||||||
method subscribeTopic*(w: WakuSub,
|
method subscribeTopic*(w: WakuSub,
|
||||||
@ -68,7 +80,11 @@ method subscribeTopic*(w: WakuSub,
|
|||||||
subscribe: bool,
|
subscribe: bool,
|
||||||
peerId: string) {.async, gcsafe.} =
|
peerId: string) {.async, gcsafe.} =
|
||||||
debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId
|
debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId
|
||||||
await procCall GossipSub(w).subscribeTopic(topic, subscribe, peerId)
|
|
||||||
|
if w.gossip_enabled:
|
||||||
|
await procCall GossipSub(w).subscribeTopic(topic, subscribe, peerId)
|
||||||
|
else:
|
||||||
|
await procCall FloodSub(w).subscribeTopic(topic, subscribe, peerId)
|
||||||
|
|
||||||
# TODO: Fix decrement connected peers here or somewhere else
|
# TODO: Fix decrement connected peers here or somewhere else
|
||||||
method handleDisconnect*(w: WakuSub, peer: PubSubPeer) {.async.} =
|
method handleDisconnect*(w: WakuSub, peer: PubSubPeer) {.async.} =
|
||||||
@ -82,24 +98,41 @@ method rpcHandler*(w: WakuSub,
|
|||||||
|
|
||||||
# XXX: Right place?
|
# XXX: Right place?
|
||||||
total_messages.inc()
|
total_messages.inc()
|
||||||
await procCall GossipSub(w).rpcHandler(peer, rpcMsgs)
|
|
||||||
|
if w.gossip_enabled:
|
||||||
|
await procCall GossipSub(w).rpcHandler(peer, rpcMsgs)
|
||||||
|
else:
|
||||||
|
await procCall FloodSub(w).rpcHandler(peer, rpcMsgs)
|
||||||
|
|
||||||
method publish*(w: WakuSub,
|
method publish*(w: WakuSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
data: seq[byte]) {.async.} =
|
data: seq[byte]) {.async.} =
|
||||||
debug "publish", topic=topic
|
debug "publish", topic=topic
|
||||||
await procCall GossipSub(w).publish(topic, data)
|
|
||||||
|
if w.gossip_enabled:
|
||||||
|
await procCall GossipSub(w).publish(topic, data)
|
||||||
|
else:
|
||||||
|
await procCall FloodSub(w).publish(topic, data)
|
||||||
|
|
||||||
method unsubscribe*(w: WakuSub,
|
method unsubscribe*(w: WakuSub,
|
||||||
topics: seq[TopicPair]) {.async.} =
|
topics: seq[TopicPair]) {.async.} =
|
||||||
debug "unsubscribe"
|
debug "unsubscribe"
|
||||||
await procCall GossipSub(w).unsubscribe(topics)
|
if w.gossip_enabled:
|
||||||
|
await procCall GossipSub(w).unsubscribe(topics)
|
||||||
|
else:
|
||||||
|
await procCall FloodSub(w).unsubscribe(topics)
|
||||||
|
|
||||||
# GossipSub specific methods
|
# GossipSub specific methods
|
||||||
method start*(w: WakuSub) {.async.} =
|
method start*(w: WakuSub) {.async.} =
|
||||||
debug "start"
|
debug "start"
|
||||||
await procCall GossipSub(w).start()
|
if w.gossip_enabled:
|
||||||
|
await procCall GossipSub(w).start()
|
||||||
|
else:
|
||||||
|
await procCall FloodSub(w).start()
|
||||||
|
|
||||||
method stop*(w: WakuSub) {.async.} =
|
method stop*(w: WakuSub) {.async.} =
|
||||||
debug "stop"
|
debug "stop"
|
||||||
await procCall GossipSub(w).stop()
|
if w.gossip_enabled:
|
||||||
|
await procCall GossipSub(w).stop()
|
||||||
|
else:
|
||||||
|
await procCall FloodSub(w).stop()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user