136 lines
4.4 KiB
Nim
136 lines
4.4 KiB
Nim
## Nim-LibP2P
|
|
## Copyright (c) 2018 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
import sequtils, tables, options, sets
|
|
import chronos, chronicles
|
|
import pubsub,
|
|
pubsubpeer,
|
|
rpcmsg,
|
|
../../connection,
|
|
../../peerinfo,
|
|
../../peer
|
|
|
|
logScope:
|
|
topic = "FloodSub"
|
|
|
|
const FloodSubCodec* = "/floodsub/1.0.0"
|
|
|
|
type
|
|
FloodSub = ref object of PubSub
|
|
|
|
proc sendSubs(f: FloodSub,
|
|
peer: PubSubPeer,
|
|
topics: seq[string],
|
|
subscribe: bool)
|
|
{.async, gcsafe.} =
|
|
## send subscriptions to remote peer
|
|
var msg: RPCMsg
|
|
for t in topics:
|
|
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
|
|
|
|
await peer.send(@[msg])
|
|
|
|
proc rpcHandler(f: FloodSub,
|
|
peer: PubSubPeer,
|
|
rpcMsgs: seq[RPCMsg])
|
|
{.async, gcsafe.} =
|
|
## method called by a PubSubPeer every
|
|
## time it receives an RPC message
|
|
##
|
|
## The RPC message might contain subscriptions
|
|
## or messages forwarded to this peer
|
|
##
|
|
|
|
for m in rpcMsgs: # for all RPC messages
|
|
if m.subscriptions.len > 0: # if there are any subscriptions
|
|
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
|
|
let id = peer.conn.peerInfo.get().peerId.pretty
|
|
if s.subscribe:
|
|
# subscribe the peer to the topic
|
|
f.peerTopics[s.topic].incl(id)
|
|
else:
|
|
# unsubscribe the peer to the topic
|
|
f.peerTopics[s.topic].excl(id)
|
|
|
|
# send subscriptions to every peer
|
|
for p in f.peers.values:
|
|
await p.send(@[RPCMsg(subscriptions: m.subscriptions)])
|
|
|
|
if m.messages.len > 0: # if there are any messages
|
|
var toSendPeers: HashSet[string]
|
|
for msg in m.messages: # for every message
|
|
for t in msg.topicIDs: # for every topic in the message
|
|
toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic
|
|
if f.topics.contains(t): # check that we're subscribed to it
|
|
await f.topics[t].handler(t, msg.data) # trigger user provided handler
|
|
|
|
for p in toSendPeers:
|
|
await f.peers[p].send(@[RPCMsg(messages: m.messages)])
|
|
|
|
proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} =
|
|
## handle incoming/outgoing connections
|
|
##
|
|
## this proc will:
|
|
## 1) register a new PubSubPeer for the connection
|
|
## 2) register a handler with the peer;
|
|
## this handler gets called on every rpc message
|
|
## that the peer receives
|
|
## 3) ask the peer to subscribe us to every topic
|
|
## that we're interested in
|
|
##
|
|
|
|
proc handleRpc(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} =
|
|
await f.rpcHandler(peer, msgs)
|
|
|
|
var peer = newPubSubPeer(conn, handleRpc)
|
|
f.peers[peer.conn.peerInfo.get().peerId.pretty()] = peer
|
|
let topics = toSeq(f.topics.keys)
|
|
await f.sendSubs(peer, topics, true)
|
|
asyncCheck peer.handle()
|
|
|
|
method init(f: FloodSub) =
|
|
proc handler(conn: Connection, proto: string) {.async, gcsafe.} =
|
|
## main protocol handler that gets triggered on every
|
|
## connection for a protocol string
|
|
## e.g. ``/floodsub/1.0.0``, etc...
|
|
##
|
|
|
|
await f.handleConn(conn)
|
|
|
|
f.handler = handler
|
|
f.codec = FloodSubCodec
|
|
|
|
method subscribePeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} =
|
|
await f.handleConn(conn)
|
|
|
|
method publish*(f: FloodSub,
|
|
topic: string,
|
|
data: seq[byte])
|
|
{.async, gcsafe.} =
|
|
for p in f.peerTopics[topic]:
|
|
f.peers[p].send(Message(fromPeer: f.peerInfo.peerId.data,
|
|
data: data))
|
|
|
|
method subscribe*(f: FloodSub,
|
|
topic: string,
|
|
handler: TopicHandler)
|
|
{.async, gcsafe.} =
|
|
await procCall PubSub(f).subscribe(topic, handler)
|
|
for p in f.peers.values:
|
|
await f.sendSubs(p, @[topic], true)
|
|
|
|
method unsubscribe*(f: FloodSub, topics: seq[string]) {.async, gcsafe.} =
|
|
await procCall PubSub(f).unsubscribe(topics)
|
|
for p in f.peers.values:
|
|
await f.sendSubs(p, topics, false)
|
|
|
|
proc newFloodSub*(peerInfo: PeerInfo): FloodSub =
|
|
new result
|
|
result.peerInfo = peerInfo
|