2019-09-09 20:14:24 -06:00
|
|
|
## Nim-LibP2P
|
2019-09-24 11:48:23 -06:00
|
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
2019-09-09 20:14:24 -06:00
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
2020-05-06 18:31:47 +02:00
|
|
|
import sequtils, tables, sets, strutils
|
2020-06-07 16:15:21 +09:00
|
|
|
import chronos, chronicles, metrics
|
2019-09-09 20:15:52 -06:00
|
|
|
import pubsub,
|
|
|
|
pubsubpeer,
|
2019-12-05 20:16:18 -06:00
|
|
|
timedcache,
|
2020-07-15 13:18:55 -06:00
|
|
|
peertable,
|
2019-12-05 20:16:18 -06:00
|
|
|
rpc/[messages, message],
|
2020-06-19 11:29:43 -06:00
|
|
|
../../stream/connection,
|
2020-07-01 15:25:09 +09:00
|
|
|
../../peerid,
|
2020-07-14 02:02:16 +02:00
|
|
|
../../peerinfo
|
2019-09-09 20:14:24 -06:00
|
|
|
|
|
|
|
logScope:
|
2020-06-10 11:48:01 +03:00
|
|
|
topics = "floodsub"
|
2019-09-09 20:14:24 -06:00
|
|
|
|
|
|
|
const FloodSubCodec* = "/floodsub/1.0.0"
|
|
|
|
|
|
|
|
type
|
2019-10-03 16:22:49 -06:00
|
|
|
FloodSub* = ref object of PubSub
|
2020-07-15 13:18:55 -06:00
|
|
|
floodsub*: PeerTable # topic to remote peer map
|
|
|
|
seen*: TimedCache[string] # list of messages forwarded to peers
|
2019-12-05 20:16:18 -06:00
|
|
|
|
|
|
|
method subscribeTopic*(f: FloodSub,
|
|
|
|
topic: string,
|
|
|
|
subscribe: bool,
|
2020-05-27 12:33:49 -06:00
|
|
|
peerId: string) {.gcsafe, async.} =
|
|
|
|
await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
|
2019-12-05 20:16:18 -06:00
|
|
|
|
2020-07-13 22:32:38 +09:00
|
|
|
let peer = f.peers.getOrDefault(peerId)
|
|
|
|
if peer == nil:
|
2020-08-06 11:12:52 +09:00
|
|
|
debug "subscribeTopic on a nil peer!", peer = peerId
|
2020-07-13 22:32:38 +09:00
|
|
|
return
|
|
|
|
|
2019-12-10 14:50:35 -06:00
|
|
|
if topic notin f.floodsub:
|
2020-07-13 22:32:38 +09:00
|
|
|
f.floodsub[topic] = initHashSet[PubSubPeer]()
|
2019-12-10 14:50:35 -06:00
|
|
|
|
|
|
|
if subscribe:
|
2020-07-13 22:32:38 +09:00
|
|
|
trace "adding subscription for topic", peer = peer.id, name = topic
|
2019-12-10 14:50:35 -06:00
|
|
|
# subscribe the peer to the topic
|
2020-07-13 22:32:38 +09:00
|
|
|
f.floodsub[topic].incl(peer)
|
2019-12-10 14:50:35 -06:00
|
|
|
else:
|
2020-07-13 22:32:38 +09:00
|
|
|
trace "removing subscription for topic", peer = peer.id, name = topic
|
2019-12-10 14:50:35 -06:00
|
|
|
# unsubscribe the peer from the topic
|
2020-07-13 22:32:38 +09:00
|
|
|
f.floodsub[topic].excl(peer)
|
2019-12-10 14:50:35 -06:00
|
|
|
|
2020-07-07 18:33:05 -06:00
|
|
|
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
|
2019-12-05 20:16:18 -06:00
|
|
|
## handle peer disconnects
|
2020-08-06 11:12:52 +09:00
|
|
|
##
|
|
|
|
|
2020-07-07 18:33:05 -06:00
|
|
|
procCall PubSub(f).handleDisconnect(peer)
|
|
|
|
|
2020-08-06 11:12:52 +09:00
|
|
|
if not(isNil(peer)) and peer.peerInfo notin f.conns:
|
|
|
|
for t in toSeq(f.floodsub.keys):
|
|
|
|
if t in f.floodsub:
|
|
|
|
f.floodsub[t].excl(peer)
|
|
|
|
|
2019-12-05 20:16:18 -06:00
|
|
|
method rpcHandler*(f: FloodSub,
|
|
|
|
peer: PubSubPeer,
|
2019-12-16 23:24:03 -06:00
|
|
|
rpcMsgs: seq[RPCMsg]) {.async.} =
|
|
|
|
await procCall PubSub(f).rpcHandler(peer, rpcMsgs)
|
2019-12-05 20:16:18 -06:00
|
|
|
|
2020-01-07 02:04:02 -06:00
|
|
|
for m in rpcMsgs: # for all RPC messages
|
2019-12-05 20:16:18 -06:00
|
|
|
if m.messages.len > 0: # if there are any messages
|
2020-07-13 22:32:38 +09:00
|
|
|
var toSendPeers = initHashSet[PubSubPeer]()
|
2019-12-05 20:16:18 -06:00
|
|
|
for msg in m.messages: # for every message
|
2020-06-28 17:56:38 +02:00
|
|
|
let msgId = f.msgIdProvider(msg)
|
|
|
|
logScope: msgId
|
|
|
|
|
|
|
|
if msgId notin f.seen:
|
|
|
|
f.seen.put(msgId) # add the message to the seen cache
|
2019-12-16 23:24:03 -06:00
|
|
|
|
2020-05-06 03:26:08 -06:00
|
|
|
if f.verifySignature and not msg.verify(peer.peerInfo):
|
2019-12-16 23:24:03 -06:00
|
|
|
trace "dropping message due to failed signature verification"
|
|
|
|
continue
|
|
|
|
|
|
|
|
if not (await f.validate(msg)):
|
|
|
|
trace "dropping message due to failed validation"
|
|
|
|
continue
|
|
|
|
|
2019-12-05 20:16:18 -06:00
|
|
|
for t in msg.topicIDs: # for every topic in the message
|
|
|
|
if t in f.floodsub:
|
|
|
|
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
|
|
|
|
if t in f.topics: # check that we're subscribed to it
|
|
|
|
for h in f.topics[t].handler:
|
2020-06-28 17:56:38 +02:00
|
|
|
trace "calling handler for message", topicId = t,
|
2019-12-23 12:45:12 -06:00
|
|
|
localPeer = f.peerInfo.id,
|
2020-06-28 17:56:38 +02:00
|
|
|
fromPeer = msg.fromPeer.pretty
|
2020-07-07 18:33:05 -06:00
|
|
|
|
|
|
|
try:
|
|
|
|
await h(t, msg.data) # trigger user provided handler
|
2020-08-06 10:30:57 +09:00
|
|
|
except CancelledError as exc:
|
|
|
|
raise exc
|
2020-07-07 18:33:05 -06:00
|
|
|
except CatchableError as exc:
|
|
|
|
trace "exception in message handler", exc = exc.msg
|
2019-09-09 20:15:52 -06:00
|
|
|
|
2019-09-14 07:56:02 -06:00
|
|
|
# forward the message to all peers interested in it
|
2020-08-02 23:20:11 -06:00
|
|
|
let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
|
2020-06-16 22:14:02 -06:00
|
|
|
|
2020-07-17 13:46:24 -06:00
|
|
|
trace "forwared message to peers", peers = published
|
2019-09-09 20:15:52 -06:00
|
|
|
|
2020-06-13 07:54:12 +08:00
|
|
|
method init*(f: FloodSub) =
|
2019-12-16 23:24:03 -06:00
|
|
|
proc handler(conn: Connection, proto: string) {.async.} =
|
2019-09-12 04:08:11 -06:00
|
|
|
## main protocol handler that gets triggered on every
|
2019-09-09 20:15:52 -06:00
|
|
|
## connection for a protocol string
|
|
|
|
## e.g. ``/floodsub/1.0.0``, etc...
|
|
|
|
##
|
|
|
|
|
2019-12-05 20:16:18 -06:00
|
|
|
await f.handleConn(conn, proto)
|
2019-09-09 20:14:24 -06:00
|
|
|
|
|
|
|
f.handler = handler
|
2019-09-09 20:15:52 -06:00
|
|
|
f.codec = FloodSubCodec
|
|
|
|
|
2020-07-07 18:33:05 -06:00
|
|
|
method subscribePeer*(p: FloodSub,
|
|
|
|
conn: Connection) =
|
|
|
|
procCall PubSub(p).subscribePeer(conn)
|
2020-05-21 11:33:48 -06:00
|
|
|
asyncCheck p.handleConn(conn, FloodSubCodec)
|
|
|
|
|
2019-09-09 20:15:52 -06:00
|
|
|
method publish*(f: FloodSub,
|
|
|
|
topic: string,
|
2020-08-02 23:20:11 -06:00
|
|
|
data: seq[byte],
|
|
|
|
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
|
2020-07-07 18:33:05 -06:00
|
|
|
# base returns always 0
|
2020-08-02 23:20:11 -06:00
|
|
|
discard await procCall PubSub(f).publish(topic, data, timeout)
|
2019-10-03 16:22:49 -06:00
|
|
|
|
2019-12-05 20:16:18 -06:00
|
|
|
if data.len <= 0 or topic.len <= 0:
|
|
|
|
trace "topic or data missing, skipping publish"
|
|
|
|
return
|
|
|
|
|
|
|
|
if topic notin f.floodsub:
|
|
|
|
trace "missing peers for topic, skipping publish"
|
|
|
|
return
|
|
|
|
|
|
|
|
trace "publishing on topic", name = topic
|
2020-07-15 12:51:33 +09:00
|
|
|
inc f.msgSeqno
|
|
|
|
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
|
2020-07-17 13:46:24 -06:00
|
|
|
|
2020-04-11 13:08:25 +09:00
|
|
|
# start the future but do not wait yet
|
2020-08-02 23:20:11 -06:00
|
|
|
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout)
|
2019-09-09 20:14:24 -06:00
|
|
|
|
2020-08-05 01:27:59 +02:00
|
|
|
when defined(libp2p_expensive_metrics):
|
|
|
|
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
2020-06-16 22:14:02 -06:00
|
|
|
|
2020-07-17 13:46:24 -06:00
|
|
|
trace "published message to peers", peers = published,
|
2020-07-07 18:33:05 -06:00
|
|
|
msg = msg.shortLog()
|
2020-07-17 13:46:24 -06:00
|
|
|
return published
|
2020-07-07 18:33:05 -06:00
|
|
|
|
2019-12-10 14:50:35 -06:00
|
|
|
method unsubscribe*(f: FloodSub,
|
2019-12-16 23:24:03 -06:00
|
|
|
topics: seq[TopicPair]) {.async.} =
|
2019-09-09 20:15:52 -06:00
|
|
|
await procCall PubSub(f).unsubscribe(topics)
|
2019-12-05 20:16:18 -06:00
|
|
|
|
2019-09-09 20:15:52 -06:00
|
|
|
for p in f.peers.values:
|
2019-09-24 10:16:39 -06:00
|
|
|
await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
|
2019-09-09 20:14:24 -06:00
|
|
|
|
2020-07-21 01:16:13 +09:00
|
|
|
method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
|
|
|
|
await procCall PubSub(f).unsubscribeAll(topic)
|
|
|
|
|
|
|
|
for p in f.peers.values:
|
|
|
|
await f.sendSubs(p, @[topic], false)
|
|
|
|
|
2019-12-05 20:16:18 -06:00
|
|
|
method initPubSub*(f: FloodSub) =
|
2020-04-30 22:22:31 +09:00
|
|
|
procCall PubSub(f).initPubSub()
|
2019-10-03 16:22:49 -06:00
|
|
|
f.peers = initTable[string, PubSubPeer]()
|
|
|
|
f.topics = initTable[string, Topic]()
|
2020-07-13 22:32:38 +09:00
|
|
|
f.floodsub = initTable[string, HashSet[PubSubPeer]]()
|
2019-12-05 20:16:18 -06:00
|
|
|
f.seen = newTimedCache[string](2.minutes)
|
2019-10-03 16:22:49 -06:00
|
|
|
f.init()
|