mirror of
https://github.com/codex-storage/nim-libp2p.git
synced 2025-01-11 11:34:24 +00:00
got pubsub working without signing
This commit is contained in:
parent
80267e81ec
commit
9f3b80b60c
@ -7,7 +7,7 @@
|
|||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import sequtils, tables, options, sets, sequtils, strutils
|
import sequtils, tables, options, sets, sequtils, strutils, sets
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import pubsub,
|
import pubsub,
|
||||||
pubsubpeer,
|
pubsubpeer,
|
||||||
@ -51,13 +51,14 @@ proc rpcHandler(f: FloodSub,
|
|||||||
|
|
||||||
debug "processing RPC message", peer = peer.id, msg = rpcMsgs
|
debug "processing RPC message", peer = peer.id, msg = rpcMsgs
|
||||||
for m in rpcMsgs: # for all RPC messages
|
for m in rpcMsgs: # for all RPC messages
|
||||||
|
debug "processing message", msg = rpcMsgs
|
||||||
if m.subscriptions.len > 0: # if there are any subscriptions
|
if m.subscriptions.len > 0: # if there are any subscriptions
|
||||||
if peer.peerInfo.peerId.isSome:
|
|
||||||
debug "no valid PeerId for peer"
|
|
||||||
return
|
|
||||||
|
|
||||||
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
|
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
|
||||||
let id = peer.id
|
let id = peer.id
|
||||||
|
|
||||||
|
if not f.peerTopics.contains(s.topic):
|
||||||
|
f.peerTopics[s.topic] = initSet[string]()
|
||||||
|
|
||||||
if s.subscribe:
|
if s.subscribe:
|
||||||
debug "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
debug "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
||||||
# subscribe the peer to the topic
|
# subscribe the peer to the topic
|
||||||
@ -72,7 +73,7 @@ proc rpcHandler(f: FloodSub,
|
|||||||
await p.send(@[RPCMsg(subscriptions: m.subscriptions)])
|
await p.send(@[RPCMsg(subscriptions: m.subscriptions)])
|
||||||
|
|
||||||
if m.messages.len > 0: # if there are any messages
|
if m.messages.len > 0: # if there are any messages
|
||||||
var toSendPeers: HashSet[string]
|
var toSendPeers: HashSet[string] = initSet[string]()
|
||||||
for msg in m.messages: # for every message
|
for msg in m.messages: # for every message
|
||||||
for t in msg.topicIDs: # for every topic in the message
|
for t in msg.topicIDs: # for every topic in the message
|
||||||
toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic
|
toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic
|
||||||
|
@ -13,6 +13,7 @@ import ../../connection,
|
|||||||
../../protobuf/minprotobuf,
|
../../protobuf/minprotobuf,
|
||||||
../../peerinfo,
|
../../peerinfo,
|
||||||
../../peer,
|
../../peer,
|
||||||
|
../../crypto/crypto,
|
||||||
rpcmsg
|
rpcmsg
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
@ -29,11 +30,11 @@ type
|
|||||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
proc handle*(p: PubSubPeer) {.async, gcsafe.} =
|
proc handle*(p: PubSubPeer) {.async, gcsafe.} =
|
||||||
debug "pubsub rpc", peer = p.id
|
debug "handling pubsub rpc", peer = p.id
|
||||||
try:
|
try:
|
||||||
while not p.conn.closed:
|
while not p.conn.closed:
|
||||||
let data = await p.conn.readLp()
|
let data = await p.conn.readLp()
|
||||||
debug "Read data from peer", peer = p.peerInfo, data = data
|
debug "Read data from peer", peer = p.peerInfo, data = data.toHex()
|
||||||
let msg = decodeRpcMsg(data)
|
let msg = decodeRpcMsg(data)
|
||||||
debug "Decoded msg from peer", peer = p.peerInfo, msg = msg
|
debug "Decoded msg from peer", peer = p.peerInfo, msg = msg
|
||||||
await p.handler(p, @[msg])
|
await p.handler(p, @[msg])
|
||||||
|
@ -79,47 +79,62 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
|
|||||||
var pb = initProtoBuffer(msg)
|
var pb = initProtoBuffer(msg)
|
||||||
|
|
||||||
result.subscriptions = newSeq[SubOpts]()
|
result.subscriptions = newSeq[SubOpts]()
|
||||||
var subscr = newSeq[byte](1)
|
while true:
|
||||||
|
# decode SubOpts array
|
||||||
# decode SubOpts array
|
var field = pb.enterSubMessage()
|
||||||
if pb.enterSubMessage() > 0:
|
debug "processing submessage", field = field
|
||||||
while true:
|
case field:
|
||||||
var subOpt: SubOpts
|
of 0:
|
||||||
if pb.getBytes(1, subscr) < 0:
|
break
|
||||||
break
|
of 1:
|
||||||
subOpt.subscribe = cast[bool](subscr)
|
|
||||||
|
|
||||||
if pb.getString(2, subOpt.topic) < 0:
|
|
||||||
break
|
|
||||||
result.subscriptions.add(subOpt)
|
|
||||||
|
|
||||||
result.messages = newSeq[Message]()
|
|
||||||
# TODO: which of this fields are really optional?
|
|
||||||
# Decode Messages array
|
|
||||||
if pb.enterSubMessage() > 0:
|
|
||||||
while true:
|
|
||||||
var msg: Message
|
|
||||||
if pb.getBytes(1, msg.fromPeer) < 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
if pb.getBytes(2, msg.data) < 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
if pb.getBytes(3, msg.seqno) < 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
var topic: string
|
|
||||||
while true:
|
while true:
|
||||||
if pb.getString(4, topic) < 0:
|
var subOpt: SubOpts
|
||||||
break
|
var subscr: int
|
||||||
topic.add(topic)
|
discard pb.getVarintValue(1, subscr)
|
||||||
topic = ""
|
subOpt.subscribe = cast[bool](subscr)
|
||||||
|
debug "read subscribe field", subscribe = subOpt.subscribe
|
||||||
if pb.getBytes(5, msg.signature) < 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
if pb.getBytes(6, msg.key) < 0:
|
if pb.getString(2, subOpt.topic) < 0:
|
||||||
break
|
break
|
||||||
|
debug "read subscribe field", topicName = subOpt.topic
|
||||||
|
|
||||||
|
result.subscriptions.add(subOpt)
|
||||||
|
debug "got subscriptions", subscriptions = result.subscriptions
|
||||||
|
|
||||||
|
of 2:
|
||||||
|
result.messages = newSeq[Message]()
|
||||||
|
# TODO: which of this fields are really optional?
|
||||||
|
while true:
|
||||||
|
var msg: Message
|
||||||
|
if pb.getBytes(1, msg.fromPeer) < 0:
|
||||||
|
break
|
||||||
|
debug "read message field", fromPeer = msg.fromPeer
|
||||||
|
|
||||||
|
if pb.getBytes(2, msg.data) < 0:
|
||||||
|
break
|
||||||
|
debug "read message field", data = msg.data
|
||||||
|
|
||||||
|
if pb.getBytes(3, msg.seqno) < 0:
|
||||||
|
break
|
||||||
|
debug "read message field", seqno = msg.seqno
|
||||||
|
|
||||||
|
var topic: string
|
||||||
|
while true:
|
||||||
|
if pb.getString(4, topic) < 0:
|
||||||
|
break
|
||||||
|
msg.topicIDs.add(topic)
|
||||||
|
debug "read message field", topicName = topic
|
||||||
|
topic = ""
|
||||||
|
|
||||||
|
discard pb.getBytes(5, msg.signature)
|
||||||
|
debug "read message field", signature = msg.signature
|
||||||
|
|
||||||
|
discard pb.getBytes(6, msg.key)
|
||||||
|
debug "read message field", key = msg.key
|
||||||
|
|
||||||
|
result.messages.add(msg)
|
||||||
|
else:
|
||||||
|
raise newException(CatchableError, "message type not recognizedd")
|
||||||
|
|
||||||
var prefix {.threadvar.}: seq[byte]
|
var prefix {.threadvar.}: seq[byte]
|
||||||
proc getPreix(): var seq[byte] =
|
proc getPreix(): var seq[byte] =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user