wip... lvalues don't work properly sadly...

This commit is contained in:
Giovanni Petrantoni 2020-06-27 14:59:48 +09:00
parent 1a2f336eb5
commit dc3c568dc0
5 changed files with 97 additions and 77 deletions

View File

@ -117,8 +117,9 @@ method subscribeToPeer*(p: FloodSub,
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]) {.async.} = data: seq[byte]): Future[int] {.async.} =
await procCall PubSub(f).publish(topic, data) # base returns always 0
discard await procCall PubSub(f).publish(topic, data)
if data.len <= 0 or topic.len <= 0: if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish" trace "topic or data missing, skipping publish"
@ -143,6 +144,8 @@ method publish*(f: FloodSub,
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
return sent.filterIt(not it.failed).len
method unsubscribe*(f: FloodSub, method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics) await procCall PubSub(f).unsubscribe(topics)

View File

@ -56,6 +56,7 @@ type
heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatFut: Future[void] # cancellation future for heartbeat interval
heartbeatRunning: bool heartbeatRunning: bool
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
subLock: AsyncLock
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"])
@ -75,7 +76,7 @@ method init*(g: GossipSub) =
proc replenishFanout(g: GossipSub, topic: string) = proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic ## get fanout peers for a topic
debug "about to replenish fanout", topic debug "about to replenish fanout", topic, avail = g.gossipsub[topic].len
var topicHash = g.fanout.mgetOrPut(topic, initHashSet[string]()) var topicHash = g.fanout.mgetOrPut(topic, initHashSet[string]())
@ -87,15 +88,17 @@ proc replenishFanout(g: GossipSub, topic: string) =
# set the fanout expiry time # set the fanout expiry time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
if topicHash.len == GossipSubD: if topicHash.len == GossipSubD:
break break
libp2p_gossipsub_peers_per_topic_fanout.set(topicHash.len.int64, labelValues = [topic]) libp2p_gossipsub_peers_per_topic_fanout.set(topicHash.len.int64, labelValues = [topic])
debug "fanout replenished with peers", peers = topicHash.len debug "fanout replenished with peers", peers = g.fanout[topic].len
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
try: try:
trace "about to rebalance mesh" trace "about to rebalance mesh"
await g.subLock.acquire()
var var
topicHash = g.mesh.mgetOrPut(topic, initHashSet[string]()) topicHash = g.mesh.mgetOrPut(topic, initHashSet[string]())
fanOutHash = g.fanout.mgetOrPut(topic, initHashSet[string]()) fanOutHash = g.fanout.mgetOrPut(topic, initHashSet[string]())
@ -106,10 +109,9 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# replenish the mesh if we're below GossipSubDlo # replenish the mesh if we're below GossipSubDlo
while topicHash.len < GossipSubD: while topicHash.len < GossipSubD:
trace "gathering peers", peers = topicHash.len trace "gathering peers", peers = topicHash.len
await sleepAsync(1.millis) # don't starve the event loop
var id: string var id: string
if fanOutHash.len > 0: if fanOutHash.len > 0:
trace "getting peer from fanout", topic, debug "getting peer from fanout", topic,
peers = fanOutHash.len peers = fanOutHash.len
id = sample(toSeq(fanOutHash)) id = sample(toSeq(fanOutHash))
@ -120,7 +122,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "got fanout peer", peer = id trace "got fanout peer", peer = id
elif gossipHash.len > 0: elif gossipHash.len > 0:
trace "getting peer from gossipsub", topic, debug "getting peer from gossipsub", topic,
peers = gossipHash.len peers = gossipHash.len
id = sample(toSeq(gossipHash)) id = sample(toSeq(gossipHash))
@ -161,10 +163,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(topicHash.len.int64, labelValues = [topic]) .set(topicHash.len.int64, labelValues = [topic])
trace "mesh balanced, got peers", peers = topicHash.len, debug "mesh balanced, got peers", peers = g.mesh[topic].len,
topicId = topic topicId = topic
except CatchableError as exc: except CatchableError as exc:
trace "exception occurred re-balancing mesh", exc = exc.msg trace "exception occurred re-balancing mesh", exc = exc.msg
finally:
g.subLock.release()
proc dropFanoutPeers(g: GossipSub) {.async.} = proc dropFanoutPeers(g: GossipSub) {.async.} =
# drop peers that we haven't published to in # drop peers that we haven't published to in
@ -174,6 +178,7 @@ proc dropFanoutPeers(g: GossipSub) {.async.} =
if Moment.now > val: if Moment.now > val:
dropping.add(topic) dropping.add(topic)
g.fanout.del(topic) g.fanout.del(topic)
debug "dropping fanout topic", topic
for topic in dropping: for topic in dropping:
g.lastFanoutPubSub.del(topic) g.lastFanoutPubSub.del(topic)
@ -221,6 +226,7 @@ proc heartbeat(g: GossipSub) {.async.} =
for t in toSeq(g.topics.keys): for t in toSeq(g.topics.keys):
await g.rebalanceMesh(t) await g.rebalanceMesh(t)
g.replenishFanout(t)
await g.dropFanoutPeers() await g.dropFanoutPeers()
let peers = g.getGossipPeers() let peers = g.getGossipPeers()
@ -281,20 +287,26 @@ method subscribeTopic*(g: GossipSub,
peerId: string) {.gcsafe, async.} = peerId: string) {.gcsafe, async.} =
await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId)
if topic notin g.gossipsub: try:
g.gossipsub[topic] = initHashSet[string]() await g.subLock.acquire()
if subscribe: var gossipHash = g.gossipsub.mgetOrPut(topic, initHashSet[string]())
trace "adding subscription for topic", peer = peerId, name = topic
# subscribe remote peer to the topic if subscribe:
g.gossipsub[topic].incl(peerId) debug "adding subscription for topic", peer = peerId, name = topic
else: # subscribe remote peer to the topic
trace "removing subscription for topic", peer = peerId, name = topic gossipHash.incl(peerId)
# unsubscribe remote peer from the topic else:
g.gossipsub[topic].excl(peerId) trace "removing subscription for topic", peer = peerId, name = topic
# unsubscribe remote peer from the topic
gossipHash.excl(peerId)
finally:
g.subLock.release()
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) .set(g.gossipsub[topic].len.int64, labelValues = [topic])
debug "gossip peers", peers = g.gossipsub[topic].len, topic
if topic in g.topics: if topic in g.topics:
await g.rebalanceMesh(topic) await g.rebalanceMesh(topic)
@ -385,7 +397,6 @@ method rpcHandler*(g: GossipSub,
continue continue
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
await g.rebalanceMesh(t) # gather peers for each topic
if t in g.floodsub: if t in g.floodsub:
toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic
@ -458,48 +469,48 @@ method unsubscribe*(g: GossipSub,
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,
data: seq[byte]) {.async.} = data: seq[byte]): Future[int] {.async.} =
await procCall PubSub(g).publish(topic, data) # base returns always 0
discard await procCall PubSub(g).publish(topic, data)
trace "about to publish message on topic", name = topic, trace "about to publish message on topic", name = topic,
data = data.shortLog data = data.shortLog
var peers: HashSet[string] var peers: HashSet[string]
# TODO: we probably don't need to try multiple times
if data.len > 0 and topic.len > 0:
for _ in 0..<5: # try to get peers up to 5 times
if peers.len > 0:
break
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh if topic.len > 0: # data could be 0/empty
await g.rebalanceMesh(topic) if topic in g.topics: # if we're subscribed use the mesh
peers = g.mesh.getOrDefault(topic) peers = g.mesh.getOrDefault(topic)
else: # send to fanout peers else: # not subscribed, send to fanout peers
await g.replenishFanout(topic) peers = g.fanout.getOrDefault(topic)
if topic in g.fanout:
peers = g.fanout.getOrDefault(topic)
# set the fanout expiry time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
# wait a second between tries
await sleepAsync(1.seconds)
let msg = newMessage(g.peerInfo, data, topic, g.sign) let msg = newMessage(g.peerInfo, data, topic, g.sign)
trace "created new message", msg trace "created new message", msg
trace "publishing on topic", name = topic, peers = peers
if msg.msgId notin g.mcache:
g.mcache.put(msg)
var sent: seq[Future[void]] var sent: seq[Future[void]]
for p in peers: for p in peers:
# avoid sending to self
if p == g.peerInfo.id: if p == g.peerInfo.id:
continue continue
trace "publishing on topic", name = topic let peer = g.peers.getOrDefault(p)
if msg.msgId notin g.mcache: if not isNil(peer.peerInfo):
g.mcache.put(msg) trace "publish: sending message to peer", peer = p
sent.add(peer.send(@[RPCMsg(messages: @[msg])]))
if p in g.peers: sent = await allFinished(sent)
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) checkFutures(sent)
checkFutures(await allFinished(sent))
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
return sent.filterIt(not it.failed).len
else:
return 0
method start*(g: GossipSub) {.async.} = method start*(g: GossipSub) {.async.} =
debug "gossipsub start" debug "gossipsub start"
@ -543,3 +554,4 @@ method initPubSub*(g: GossipSub) =
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
g.control = initTable[string, ControlMessage]() # pending control messages g.control = initTable[string, ControlMessage]() # pending control messages
g.heartbeatLock = newAsyncLock() g.heartbeatLock = newAsyncLock()
g.subLock = newAsyncLock()

View File

@ -225,8 +225,7 @@ method subscribe*(p: PubSub,
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]) {.base, async.} = data: seq[byte]): Future[int] {.base, async.} =
# TODO: Should throw indicating success/failure
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics: if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler: for h in p.topics[topic].handler:
@ -241,6 +240,8 @@ method publish*(p: PubSub,
# more cleanup though # more cleanup though
debug "Could not write to pubsub connection", msg = exc.msg debug "Could not write to pubsub connection", msg = exc.msg
return 0
method initPubSub*(p: PubSub) {.base.} = method initPubSub*(p: PubSub) {.base.} =
## perform pubsub initialization ## perform pubsub initialization
p.observers = new(seq[PubSubObserver]) p.observers = new(seq[PubSubObserver])

View File

@ -556,7 +556,7 @@ proc subscribe*(s: Switch, topic: string,
retFuture.fail(newNoPubSubException()) retFuture.fail(newNoPubSubException())
return retFuture return retFuture
result = s.pubSub.get().subscribe(topic, handler) return s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] = proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] =
## unsubscribe from topics ## unsubscribe from topics
@ -565,16 +565,16 @@ proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] =
retFuture.fail(newNoPubSubException()) retFuture.fail(newNoPubSubException())
return retFuture return retFuture
result = s.pubSub.get().unsubscribe(topics) return s.pubSub.get().unsubscribe(topics)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] = proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] =
# pubslish to pubsub topic # pubslish to pubsub topic
if s.pubSub.isNone: if s.pubSub.isNone:
var retFuture = newFuture[void]("Switch.publish") var retFuture = newFuture[int]("Switch.publish")
retFuture.fail(newNoPubSubException()) retFuture.fail(newNoPubSubException())
return retFuture return retFuture
result = s.pubSub.get().publish(topic, data) return s.pubSub.get().publish(topic, data)
proc addValidator*(s: Switch, proc addValidator*(s: Switch,
topics: varargs[string], topics: varargs[string],

View File

@ -43,6 +43,14 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
dec ceil dec ceil
doAssert(ceil > 0, "waitSub timeout!") doAssert(ceil > 0, "waitSub timeout!")
template tryPublish(call: untyped, require: int, times: int = 10, wait: Duration = 1.seconds): untyped =
var limit = times
while (call) < require and limit > 0:
await sleepAsync(wait)
limit.dec()
if limit == 0:
doAssert(false, "Failed to publish!")
suite "GossipSub": suite "GossipSub":
teardown: teardown:
for tracker in testTrackers(): for tracker in testTrackers():
@ -64,9 +72,7 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await waitSub(nodes[1], nodes[0], "foobar")
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
@ -77,7 +83,7 @@ suite "GossipSub":
result = true result = true
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = (await validatorFut) and (await handlerFut) result = (await validatorFut) and (await handlerFut)
await allFuturesThrowing( await allFuturesThrowing(
@ -101,17 +107,16 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): message: Message):
Future[bool] {.async.} = Future[bool] {.async.} =
validatorFut.complete(true)
result = false result = false
validatorFut.complete(true)
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await validatorFut result = await validatorFut
await allFuturesThrowing( await allFuturesThrowing(
@ -135,10 +140,9 @@ suite "GossipSub":
awaiters.add((await nodes[1].start())) awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("foo", handler)
await waitSub(nodes[0], nodes[1], "foo")
await nodes[1].subscribe("bar", handler) await nodes[1].subscribe("bar", handler)
await waitSub(nodes[0], nodes[1], "bar")
var passed, failed: Future[bool] = newFuture[bool]() var passed, failed: Future[bool] = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
@ -152,8 +156,8 @@ suite "GossipSub":
false false
nodes[1].addValidator("foo", "bar", validator) nodes[1].addValidator("foo", "bar", validator)
await nodes[0].publish("foo", "Hello!".toBytes()) tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1
await nodes[0].publish("bar", "Hello!".toBytes()) tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
result = ((await passed) and (await failed) and (await handlerFut)) result = ((await passed) and (await failed) and (await handlerFut))
await allFuturesThrowing( await allFuturesThrowing(
@ -179,7 +183,7 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await sleepAsync(1.seconds) await sleepAsync(10.seconds)
let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get()) let gossip2 = GossipSub(nodes[1].pubSub.get())
@ -273,7 +277,7 @@ suite "GossipSub":
nodes[1].pubsub.get().addObserver(obs1) nodes[1].pubsub.get().addObserver(obs1)
nodes[0].pubsub.get().addObserver(obs2) nodes[0].pubsub.get().addObserver(obs2)
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get())
@ -310,7 +314,7 @@ suite "GossipSub":
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar") await waitSub(nodes[0], nodes[1], "foobar")
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await passed result = await passed
@ -353,10 +357,10 @@ suite "GossipSub":
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
await wait(nodes[0].publish("foobar", tryPublish await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)), nodes[1].peerInfo.id)),
1.minutes) 1.minutes), runs
await wait(seenFut, 2.minutes) await wait(seenFut, 2.minutes)
check: seen.len >= runs check: seen.len >= runs
@ -401,10 +405,10 @@ suite "GossipSub":
subs &= dialer.subscribe("foobar", handler) subs &= dialer.subscribe("foobar", handler)
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
await wait(nodes[0].publish("foobar", tryPublish await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)), nodes[1].peerInfo.id)),
1.minutes) 1.minutes), runs
await wait(seenFut, 5.minutes) await wait(seenFut, 5.minutes)
check: seen.len >= runs check: seen.len >= runs