## Nim-LibP2P ## Copyright (c) 2019 Status Research & Development GmbH ## Licensed under either of ## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) ## * MIT license ([LICENSE-MIT](LICENSE-MIT)) ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. import sequtils, tables, sets, strutils import chronos, chronicles, metrics import pubsub, pubsubpeer, timedcache, peertable, rpc/[messages, message], ../../stream/connection, ../../peerid, ../../peerinfo logScope: topics = "floodsub" const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* = ref object of PubSub floodsub*: PeerTable # topic to remote peer map seen*: TimedCache[string] # list of messages forwarded to peers method subscribeTopic*(f: FloodSub, topic: string, subscribe: bool, peer: PubsubPeer) {.gcsafe.} = procCall PubSub(f).subscribeTopic(topic, subscribe, peer) if topic notin f.floodsub: f.floodsub[topic] = initHashSet[PubSubPeer]() if subscribe: trace "adding subscription for topic", peer = peer.id, name = topic # subscribe the peer to the topic f.floodsub[topic].incl(peer) else: trace "removing subscription for topic", peer = peer.id, name = topic # unsubscribe the peer from the topic f.floodsub[topic].excl(peer) method unsubscribePeer*(f: FloodSub, peer: PeerID) = ## handle peer disconnects ## trace "unsubscribing floodsub peer", peer = $peer let pubSubPeer = f.peers.getOrDefault(peer) if pubSubPeer.isNil: return for t in toSeq(f.floodsub.keys): if t in f.floodsub: f.floodsub[t].excl(pubSubPeer) procCall PubSub(f).unsubscribePeer(peer) method rpcHandler*(f: FloodSub, peer: PubSubPeer, rpcMsgs: seq[RPCMsg]) {.async.} = await procCall PubSub(f).rpcHandler(peer, rpcMsgs) for m in rpcMsgs: # for all RPC messages if m.messages.len > 0: # if there are any messages var toSendPeers = initHashSet[PubSubPeer]() for msg in m.messages: # for every message let msgId = f.msgIdProvider(msg) logScope: msgId if msgId notin f.seen: f.seen.put(msgId) # add the message to the seen cache if f.verifySignature and not msg.verify(peer.peerId): trace "dropping message due to failed signature verification" continue if not (await f.validate(msg)): trace "dropping message due to failed validation" continue for t in msg.topicIDs: # for every topic in the message if t in f.floodsub: toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic if t in f.topics: # check that we're subscribed to it for h in f.topics[t].handler: trace "calling handler for message", topicId = t, localPeer = f.peerInfo.id, fromPeer = msg.fromPeer.pretty try: await h(t, msg.data) # trigger user provided handler except CancelledError as exc: raise exc except CatchableError as exc: trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it let published = await f.broadcast( toSeq(toSendPeers), RPCMsg(messages: m.messages), DefaultSendTimeout) trace "forwared message to peers", peers = published method init*(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every ## connection for a protocol string ## e.g. ``/floodsub/1.0.0``, etc... ## await f.handleConn(conn, proto) f.handler = handler f.codec = FloodSubCodec method publish*(f: FloodSub, topic: string, data: seq[byte], timeout: Duration = InfiniteDuration): Future[int] {.async.} = # base returns always 0 discard await procCall PubSub(f).publish(topic, data, timeout) if data.len <= 0 or topic.len <= 0: trace "topic or data missing, skipping publish" return if topic notin f.floodsub: trace "missing peers for topic, skipping publish" return trace "publishing on topic", name = topic inc f.msgSeqno let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) # start the future but do not wait yet let published = await f.broadcast( toSeq(f.floodsub.getOrDefault(topic)), RPCMsg(messages: @[msg]), timeout) when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) trace "published message to peers", peers = published, msg = msg.shortLog() return published method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(f).unsubscribe(topics) for p in f.peers.values: await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = await procCall PubSub(f).unsubscribeAll(topic) for p in f.peers.values: await f.sendSubs(p, @[topic], false) method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.floodsub = initTable[string, HashSet[PubSubPeer]]() f.seen = newTimedCache[string](2.minutes) f.init()