cleanup/test pubsub
This commit is contained in:
parent
f8872dd51c
commit
34d1a641de
|
@ -12,6 +12,7 @@ import chronos, chronicles
|
||||||
import pubsub,
|
import pubsub,
|
||||||
pubsubpeer,
|
pubsubpeer,
|
||||||
rpcmsg,
|
rpcmsg,
|
||||||
|
../../crypto/crypto,
|
||||||
../../connection,
|
../../connection,
|
||||||
../../peerinfo,
|
../../peerinfo,
|
||||||
../../peer
|
../../peer
|
||||||
|
@ -58,11 +59,11 @@ proc rpcHandler(f: FloodSub,
|
||||||
f.peerTopics[s.topic] = initSet[string]()
|
f.peerTopics[s.topic] = initSet[string]()
|
||||||
|
|
||||||
if s.subscribe:
|
if s.subscribe:
|
||||||
trace "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
trace "adding subscription for topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
||||||
# subscribe the peer to the topic
|
# subscribe the peer to the topic
|
||||||
f.peerTopics[s.topic].incl(id)
|
f.peerTopics[s.topic].incl(id)
|
||||||
else:
|
else:
|
||||||
trace "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
trace "removing subscription for topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
||||||
# unsubscribe the peer from the topic
|
# unsubscribe the peer from the topic
|
||||||
f.peerTopics[s.topic].excl(id)
|
f.peerTopics[s.topic].excl(id)
|
||||||
|
|
||||||
|
@ -127,12 +128,14 @@ method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} =
|
||||||
method publish*(f: FloodSub,
|
method publish*(f: FloodSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
data: seq[byte]) {.async, gcsafe.} =
|
data: seq[byte]) {.async, gcsafe.} =
|
||||||
trace "about to publish message on topic", topic = topic, data = data
|
trace "about to publish message on topic", name = topic, data = data.toHex()
|
||||||
let msg = makeMessage(f.peerInfo.peerId.get(), data, topic)
|
if data.len > 0 and topic.len > 0:
|
||||||
if topic in f.peerTopics:
|
let msg = makeMessage(f.peerInfo.peerId.get(), data, topic)
|
||||||
for p in f.peerTopics[topic]:
|
if topic in f.peerTopics:
|
||||||
trace "pubslishing message", topic = topic, peer = p, data = data
|
trace "processing topic", name = topic
|
||||||
await f.peers[p].send(@[RPCMsg(messages: @[msg])])
|
for p in f.peerTopics[topic]:
|
||||||
|
trace "pubslishing message", topic = topic, peer = p, data = data
|
||||||
|
await f.peers[p].send(@[RPCMsg(messages: @[msg])])
|
||||||
|
|
||||||
method subscribe*(f: FloodSub,
|
method subscribe*(f: FloodSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
|
|
|
@ -67,8 +67,9 @@ method subscribe*(p: PubSub,
|
||||||
## on every received message
|
## on every received message
|
||||||
##
|
##
|
||||||
if not p.topics.contains(topic):
|
if not p.topics.contains(topic):
|
||||||
|
trace "subscribing to topic", name = topic
|
||||||
p.topics[topic] = Topic(name: topic)
|
p.topics[topic] = Topic(name: topic)
|
||||||
|
|
||||||
p.topics[topic].handler.add(handler)
|
p.topics[topic].handler.add(handler)
|
||||||
|
|
||||||
method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} =
|
method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} =
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import options
|
import options, sets, hashes, strutils
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import rpcmsg,
|
import rpcmsg,
|
||||||
../../peer,
|
../../peer,
|
||||||
|
@ -26,6 +26,7 @@ type
|
||||||
handler*: RPCHandler
|
handler*: RPCHandler
|
||||||
topics*: seq[string]
|
topics*: seq[string]
|
||||||
id*: string # base58 peer id string
|
id*: string # base58 peer id string
|
||||||
|
seen: HashSet[string] # list of messages forwarded to peers
|
||||||
|
|
||||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
|
@ -48,8 +49,18 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} =
|
||||||
for m in msgs:
|
for m in msgs:
|
||||||
trace "sending msgs to peer", peer = p.id, msgs = msgs
|
trace "sending msgs to peer", peer = p.id, msgs = msgs
|
||||||
let encoded = encodeRpcMsg(m)
|
let encoded = encodeRpcMsg(m)
|
||||||
if encoded.buffer.len > 0:
|
if encoded.buffer.len <= 0:
|
||||||
await p.conn.writeLp(encoded.buffer)
|
trace "empty message, skipping", peer = p.id
|
||||||
|
return
|
||||||
|
|
||||||
|
let encodedHex = encoded.buffer.toHex()
|
||||||
|
trace "sending encoded msgs to peer", peer = p.id, encoded = encodedHex
|
||||||
|
if p.seen.contains(encodedHex):
|
||||||
|
trace "message already sent to peer, skipping", peer = p.id
|
||||||
|
continue
|
||||||
|
|
||||||
|
await p.conn.writeLp(encoded.buffer)
|
||||||
|
p.seen.incl(encodedHex)
|
||||||
|
|
||||||
proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer =
|
proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer =
|
||||||
new result
|
new result
|
||||||
|
@ -57,3 +68,4 @@ proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer =
|
||||||
result.conn = conn
|
result.conn = conn
|
||||||
result.peerInfo = conn.peerInfo
|
result.peerInfo = conn.peerInfo
|
||||||
result.id = conn.peerInfo.peerId.get().pretty()
|
result.id = conn.peerInfo.peerId.get().pretty()
|
||||||
|
result.seen = initSet[string]()
|
||||||
|
|
|
@ -55,7 +55,7 @@ proc encodeSubs(subs: SubOpts, buff: var ProtoBuffer) {.gcsafe.} =
|
||||||
buff.write(initProtoField(2, subs.topic))
|
buff.write(initProtoField(2, subs.topic))
|
||||||
|
|
||||||
proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
|
proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
|
||||||
result = initProtoBuffer({WithVarintLength})
|
result = initProtoBuffer()
|
||||||
trace "encoding msg: ", msg = msg
|
trace "encoding msg: ", msg = msg
|
||||||
|
|
||||||
if msg.subscriptions.len > 0:
|
if msg.subscriptions.len > 0:
|
||||||
|
@ -63,6 +63,7 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
|
||||||
for s in msg.subscriptions:
|
for s in msg.subscriptions:
|
||||||
encodeSubs(s, subs)
|
encodeSubs(s, subs)
|
||||||
|
|
||||||
|
# write subscriptions to protobuf
|
||||||
subs.finish()
|
subs.finish()
|
||||||
result.write(initProtoField(1, subs))
|
result.write(initProtoField(1, subs))
|
||||||
|
|
||||||
|
@ -71,10 +72,12 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
|
||||||
for m in msg.messages:
|
for m in msg.messages:
|
||||||
encodeMessage(m, messages)
|
encodeMessage(m, messages)
|
||||||
|
|
||||||
|
# write messages to protobuf
|
||||||
messages.finish()
|
messages.finish()
|
||||||
result.write(initProtoField(2, messages))
|
result.write(initProtoField(2, messages))
|
||||||
|
|
||||||
result.finish()
|
if result.buffer.len > 0:
|
||||||
|
result.finish()
|
||||||
|
|
||||||
proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
|
proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
|
||||||
var pb = initProtoBuffer(msg)
|
var pb = initProtoBuffer(msg)
|
||||||
|
|
|
@ -7,8 +7,137 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import unittest
|
import unittest, options, tables, sugar, sequtils
|
||||||
import chronos
|
import chronos, chronicles
|
||||||
|
import ../libp2p/switch,
|
||||||
|
../libp2p/multistream,
|
||||||
|
../libp2p/protocols/identify,
|
||||||
|
../libp2p/connection,
|
||||||
|
../libp2p/transports/[transport, tcptransport],
|
||||||
|
../libp2p/multiaddress,
|
||||||
|
../libp2p/peerinfo,
|
||||||
|
../libp2p/crypto/crypto,
|
||||||
|
../libp2p/peer,
|
||||||
|
../libp2p/protocols/protocol,
|
||||||
|
../libp2p/muxers/muxer,
|
||||||
|
../libp2p/muxers/mplex/mplex,
|
||||||
|
../libp2p/muxers/mplex/types,
|
||||||
|
../libp2p/protocols/secure/secure,
|
||||||
|
../libp2p/protocols/secure/secio,
|
||||||
|
../libp2p/protocols/pubsub/pubsub,
|
||||||
|
../libp2p/protocols/pubsub/floodsub
|
||||||
|
|
||||||
|
proc createMplex(conn: Connection): Muxer =
|
||||||
|
result = newMplex(conn)
|
||||||
|
|
||||||
|
proc createNode(privKey: Option[PrivateKey] = none(PrivateKey),
|
||||||
|
address: string = "/ip4/127.0.0.1/tcp/0"): Switch =
|
||||||
|
var peerInfo: PeerInfo
|
||||||
|
var seckey = privKey
|
||||||
|
if privKey.isNone:
|
||||||
|
seckey = some(PrivateKey.random(RSA))
|
||||||
|
|
||||||
|
peerInfo.peerId = some(PeerID.init(seckey.get()))
|
||||||
|
peerInfo.addrs.add(Multiaddress.init(address))
|
||||||
|
|
||||||
|
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||||
|
let transports = @[Transport(newTransport(TcpTransport))]
|
||||||
|
let muxers = [(MplexCodec, mplexProvider)].toTable()
|
||||||
|
let identify = newIdentify(peerInfo)
|
||||||
|
let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable()
|
||||||
|
let pubSub = some(PubSub(newFloodSub(peerInfo)))
|
||||||
|
result = newSwitch(peerInfo,
|
||||||
|
transports,
|
||||||
|
identify,
|
||||||
|
muxers,
|
||||||
|
secureManagers = secureManagers,
|
||||||
|
pubSub = pubSub)
|
||||||
|
|
||||||
|
proc generateNodes*(num: Natural): seq[Switch] =
|
||||||
|
for i in 0..<num:
|
||||||
|
result.add(createNode())
|
||||||
|
|
||||||
|
proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
|
||||||
|
var pending: seq[Future[void]]
|
||||||
|
for dialer in nodes:
|
||||||
|
for node in nodes:
|
||||||
|
pending.add(dialer.subscribeToPeer(node.peerInfo))
|
||||||
|
await allFutures(pending)
|
||||||
|
|
||||||
suite "PubSub":
|
suite "PubSub":
|
||||||
test "basic FloodSub": discard
|
test "FloodSub basic publish/subscribe A -> B":
|
||||||
|
proc testBasicPubSub(): Future[bool] {.async.} =
|
||||||
|
var passed: bool
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
passed = true
|
||||||
|
|
||||||
|
var nodes = generateNodes(2)
|
||||||
|
var wait = await nodes[1].start()
|
||||||
|
|
||||||
|
await nodes[0].subscribeToPeer(nodes[1].peerInfo)
|
||||||
|
|
||||||
|
await nodes[1].subscribe("foobar", handler)
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
await nodes[1].stop()
|
||||||
|
await allFutures(wait)
|
||||||
|
result = passed
|
||||||
|
|
||||||
|
check:
|
||||||
|
waitFor(testBasicPubSub()) == true
|
||||||
|
|
||||||
|
test "FloodSub basic publish/subscribe B -> A":
|
||||||
|
proc testBasicPubSub(): Future[bool] {.async.} =
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
|
||||||
|
var nodes = generateNodes(2)
|
||||||
|
var wait = await nodes[1].start()
|
||||||
|
|
||||||
|
await nodes[0].subscribeToPeer(nodes[1].peerInfo)
|
||||||
|
|
||||||
|
await nodes[0].subscribe("foobar", handler)
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
await nodes[1].publish("foobar", cast[seq[byte]]("Hello!"))
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
await nodes[1].stop()
|
||||||
|
await allFutures(wait)
|
||||||
|
result = true
|
||||||
|
|
||||||
|
check:
|
||||||
|
waitFor(testBasicPubSub()) == true
|
||||||
|
|
||||||
|
test "basic FloodSub":
|
||||||
|
proc testBasicFloodSub(): Future[bool] {.async.} =
|
||||||
|
var passed: bool
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
passed = true
|
||||||
|
|
||||||
|
var nodes: seq[Switch] = generateNodes(4)
|
||||||
|
var awaitters: seq[Future[void]]
|
||||||
|
for node in nodes:
|
||||||
|
awaitters.add(await node.start())
|
||||||
|
await node.subscribe("foobar", handler)
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
await subscribeNodes(nodes)
|
||||||
|
await sleepAsync(500.millis)
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
await node.publish("foobar", cast[seq[byte]]("Hello!"))
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
await allFutures(nodes.mapIt(it.stop()))
|
||||||
|
await allFutures(awaitters)
|
||||||
|
|
||||||
|
result = passed
|
||||||
|
|
||||||
|
check:
|
||||||
|
waitFor(testBasicFloodSub()) == true
|
||||||
|
|
Loading…
Reference in New Issue