Fixes and tweaks related to the beacon node integration

* Bugfix: Dialing an already connected peer may lead to crash

* Introduced a standard_setup module allowing to instantiate
  the `Switch` object in an easier manner.

* Added `Switch.disconnect(peer)`

* Trailing space removed (sorry about polluting the diff)
This commit is contained in:
Zahary Karadjov 2019-12-08 23:06:58 +02:00
parent 31aaa2c8ec
commit 454f658ba8
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
15 changed files with 128 additions and 131 deletions

View File

@ -126,8 +126,8 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) =
let mplexProvider = newMuxerProvider(createMplex, MplexCodec) # create multiplexer
let transports = @[Transport(newTransport(TcpTransport))] # add all transports (tcp only for now, but can be anything in the future)
let muxers = [(MplexCodec, mplexProvider)].toTable() # add all muxers
let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() # setup the secio and any other secure provider
let muxers = {MplexCodec: mplexProvider}.toTable() # add all muxers
let secureManagers = {SecioCodec: Secure(newSecio(seckey))}.toTable() # setup the secio and any other secure provider
# create the switch
let switch = newSwitch(peerInfo,

View File

@ -74,7 +74,7 @@ proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} =
p.conn = await p.switch.dial(remotePeer, ChatCodec)
p.connected = true
proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
while true:
if not p.connected:
# echo &"{p.id} ->"
@ -86,7 +86,7 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
if line.startsWith("/help") or line.startsWith("/?") or not p.started:
echo Help
continue
if line.startsWith("/disconnect"):
echo "Ending current session"
if p.connected and p.conn.closed.not:
@ -100,7 +100,7 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
if yesno.cmpIgnoreCase("y") == 0:
await p.conn.close()
p.connected = false
elif yesno.cmpIgnoreCase("n") == 0:
elif yesno.cmpIgnoreCase("n") == 0:
continue
else:
echo "unrecognized response"
@ -131,22 +131,22 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} =
# echo getCurrentExceptionMsg()
proc readWriteLoop(p: ChatProto) {.async, gcsafe.} =
asyncCheck p.writeAndPrint()
asyncCheck p.readAndPrint()
asyncCheck p.writeAndPrint()
asyncCheck p.readAndPrint()
method init(p: ChatProto) {.gcsafe.} =
proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
if p.connected and not p.conn.closed:
echo "a chat session is already in progress - disconnecting!"
await stream.close()
p.conn = stream
p.connected = true
else:
p.conn = stream
p.connected = true
p.codec = ChatCodec
p.handler = handle
proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto =
proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto =
new result
result.switch = switch
result.transp = transp
@ -156,7 +156,7 @@ proc threadMain(wfd: AsyncFD) {.thread.} =
## This procedure performs reading from `stdin` and sends data over
## pipe to main thread.
var transp = fromPipe(wfd)
while true:
var line = stdin.readLine()
discard waitFor transp.write(line & "\r\n")
@ -217,7 +217,7 @@ proc main() {.async.} =
var (rfd, wfd) = createAsyncPipe()
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
raise newException(ValueError, "Could not initialize pipe!")
data.consoleFd = rfd
data.serveFut = serveThread(data)
var thread: Thread[AsyncFD]

View File

@ -6,7 +6,12 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import libp2p/daemon/[daemonapi, transpool]
import libp2p/protobuf/minprotobuf
import libp2p/varint
export daemonapi, minprotobuf, varint, transpool
import
libp2p/daemon/[daemonapi, transpool],
libp2p/protobuf/minprotobuf,
libp2p/varint
export
daemonapi, transpool, minprotobuf, varint

View File

@ -427,7 +427,7 @@ proc `$`*(sig: Signature): string =
## Get string representation of signature ``sig``.
result = toHex(sig.data)
proc sign*(key: PrivateKey, data: openarray[byte]): Signature =
proc sign*(key: PrivateKey, data: openarray[byte]): Signature {.gcsafe.} =
## Sign message ``data`` using private key ``key`` and return generated
## signature in raw binary form.
if key.scheme == RSA:

View File

@ -863,7 +863,7 @@ proc getSecret*(pubkey: EcPublicKey, seckey: EcPrivateKey): seq[byte] =
copyMem(addr result[0], addr data[0], res)
proc sign*[T: byte|char](seckey: EcPrivateKey,
message: openarray[T]): EcSignature =
message: openarray[T]): EcSignature {.gcsafe.} =
## Get ECDSA signature of data ``message`` using private key ``seckey``.
doAssert(not isNil(seckey))
var hc: BrHashCompatContext

View File

@ -1836,7 +1836,7 @@ proc clear*(pair: var EdKeyPair) =
burnMem(pair.pubkey.data)
proc sign*[T: byte|char](key: EdPrivateKey,
message: openarray[T]): EdSignature {.noinit.} =
message: openarray[T]): EdSignature {.gcsafe, noinit.} =
## Create ED25519 signature of data ``message`` using private key ``key``.
var ctx: sha512
var r: GeP3

View File

@ -723,13 +723,13 @@ proc `==`*(a, b: RsaPublicKey): bool =
result = r1 and r2
proc sign*[T: byte|char](key: RsaPrivateKey,
message: openarray[T]): RsaSignature =
message: openarray[T]): RsaSignature {.gcsafe.} =
## Get RSA PKCS1.5 signature of data ``message`` using SHA256 and private
## key ``key``.
doAssert(not isNil(key))
var hc: BrHashCompatContext
var hash: array[32, byte]
var impl = BrRsaPkcs1SignGetDefault()
let impl = BrRsaPkcs1SignGetDefault()
result = new RsaSignature
result.buffer = newSeq[byte]((key.seck.nBitlen + 7) shr 3)
var kv = addr sha256Vtable

View File

@ -339,7 +339,7 @@ proc `$`*(sig: SkSignature): string =
discard sig.toBytes(ssig)
result = toHex(ssig)
proc sign*[T: byte|char](key: SkPrivateKey, msg: openarray[T]): SkSignature =
proc sign*[T: byte|char](key: SkPrivateKey, msg: openarray[T]): SkSignature {.gcsafe.} =
## Sign message `msg` using private key `key` and return signature object.
let ctx = getContext()
var hash = sha256.digest(msg)

View File

@ -11,8 +11,8 @@ import chronos
import ../connection
type
LPProtoHandler* = proc (conn: Connection,
proto: string):
LPProtoHandler* = proc (conn: Connection,
proto: string):
Future[void] {.gcsafe, closure.}
LPProtocol* = ref object of RootObj

View File

@ -27,7 +27,7 @@ proc msgId*(m: Message): string =
proc fromPeerId*(m: Message): PeerId =
PeerID.init(m.fromPeer)
proc sign*(p: PeerInfo, msg: Message): Message {.gcsafe.} =
proc sign*(p: PeerInfo, msg: Message): Message {.gcsafe.} =
var buff = initProtoBuffer()
encodeMessage(msg, buff)
let prefix = cast[seq[byte]](PubSubPrefix)
@ -54,7 +54,7 @@ proc verify*(p: PeerInfo, m: Message): bool =
proc newMessage*(p: PeerInfo,
data: seq[byte],
name: string,
sign: bool = true): Message {.gcsafe.} =
sign: bool = true): Message {.gcsafe.} =
var seqno: seq[byte] = newSeq[byte](20)
if p.publicKey.isSome and randomBytes(addr seqno[0], 20) > 0:
var key: seq[byte] = p.publicKey.get().getBytes()

35
libp2p/standard_setup.nim Normal file
View File

@ -0,0 +1,35 @@
import
options, tables,
switch, peer, peerinfo, connection, multiaddress,
crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, mplex/types],
protocols/[identify, secure/secure, secure/secio],
protocols/pubsub/[pubsub, gossipsub, floodsub]
export
switch, peer, peerinfo, connection, multiaddress, crypto
proc newStandardSwitch*(privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0"),
triggerSelf = false, gossip = false): Switch =
proc createMplex(conn: Connection): Muxer =
result = newMplex(conn)
let
seckey = privKey.get(otherwise = PrivateKey.random(RSA))
peerInfo = PeerInfo.init(seckey, @[address])
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
transports = @[Transport(newTransport(TcpTransport))]
muxers = {MplexCodec: mplexProvider}.toTable
identify = newIdentify(peerInfo)
secureManagers = {SecioCodec: Secure(newSecio seckey)}.toTable
pubSub = if gossip: PubSub newPubSub(GossipSub, peerInfo, triggerSelf)
else: PubSub newPubSub(FloodSub, peerInfo, triggerSelf)
result = newSwitch(peerInfo,
transports,
identify,
muxers,
secureManagers = secureManagers,
pubSub = some(pubSub))

View File

@ -25,9 +25,9 @@ logScope:
topic = "Switch"
#TODO: General note - use a finite state machine to manage the different
# steps of connections establishing and upgrading. This makes everything
# more robust and less prone to ordering attacks - i.e. muxing can come if
# and only if the channel has been secured (i.e. if a secure manager has been
# steps of connections establishing and upgrading. This makes everything
# more robust and less prone to ordering attacks - i.e. muxing can come if
# and only if the channel has been secured (i.e. if a secure manager has been
# previously provided)
type
@ -46,10 +46,10 @@ type
secureManagers*: Table[string, Secure]
pubSub*: Option[PubSub]
proc newNoPubSubException(): ref Exception {.inline.} =
proc newNoPubSubException(): ref Exception {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!")
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
## secure the incoming connection
let managers = toSeq(s.secureManagers.keys)
@ -110,11 +110,9 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
let handlerFut = muxer.handle()
# add muxer handler cleanup proc
handlerFut.addCallback(
proc(udata: pointer = nil) {.gcsafe.} =
trace "muxer handler completed for peer",
peer = conn.peerInfo.get().id
)
handlerFut.addCallback do (udata: pointer = nil):
trace "muxer handler completed for peer",
peer = conn.peerInfo.get().id
# do identify first, so that we have a
# PeerInfo in case we didn't before
@ -141,7 +139,12 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
await s.connections[id].close()
s.connections.del(id)
proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} =
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
let conn = s.connections.getOrDefault(peer.id)
if conn != nil:
await s.cleanupConn(conn)
proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} =
# if there is a muxer for the connection
# use it instead to create a muxed stream
if peerInfo.id in s.muxed:
@ -162,7 +165,7 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
await s.mux(result) # mux it if possible
s.connections[conn.peerInfo.get().id] = result
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection"
let ms = newMultistream()
@ -189,41 +192,40 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
# handle secured connections
await ms.handle(conn)
proc dial*(s: Switch,
peer: PeerInfo,
proto: string = ""):
Future[Connection] {.async.} =
proc dial*(s: Switch,
peer: PeerInfo,
proto: string = ""):
Future[Connection] {.async.} =
let id = peer.id
trace "dialing peer", peer = id
for t in s.transports: # for each transport
for a in peer.addrs: # for each address
if t.handles(a): # check if it can dial it
if id notin s.connections:
trace "dialing address", address = $a
trace "Dialing peer", peer = id
result = s.connections.getOrDefault(id)
if result == nil or result.closed:
for t in s.transports: # for each transport
for a in peer.addrs: # for each address
if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a
result = await t.dial(a)
# make sure to assign the peer to the connection
result.peerInfo = some(peer)
result = await s.upgradeOutgoing(result)
result.closeEvent.wait().addCallback(
proc(udata: pointer) =
result.peerInfo = some peer
result = await s.upgradeOutgoing(result)
result.closeEvent.wait().addCallback do (udata: pointer):
asyncCheck s.cleanupConn(result)
)
break
else:
trace "Reusing existing connection"
if proto.len > 0 and not result.closed:
let stream = await s.getMuxedStream(peer)
if stream.isSome:
trace "connection is muxed, return muxed stream"
result = stream.get()
trace "attempting to select remote", proto = proto
if proto.len > 0 and not result.closed:
let stream = await s.getMuxedStream(peer)
if stream.isSome:
trace "Connection is muxed, return muxed stream"
result = stream.get()
trace "Attempting to select remote", proto = proto
if not (await s.ms.select(result, proto)):
error "unable to select protocol: ", proto = proto
raise newException(CatchableError,
&"unable to select protocol: {proto}")
if not await s.ms.select(result, proto):
error "Unable to select sub-protocol", proto = proto
raise newException(CatchableError, &"unable to select protocol: {proto}")
break # don't dial more than one addr on the same transport
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
if isNil(proto.handler):
raise newException(CatchableError,
"Protocol has to define a handle method or proc")
@ -234,7 +236,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
s.ms.addHandler(proto.codec, proto)
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
trace "starting switch"
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
@ -253,13 +255,13 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
var server = await t.listen(a, handle)
s.peerInfo.addrs[i] = t.ma # update peer's address
startFuts.add(server)
if s.pubSub.isSome:
await s.pubSub.get().start()
result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} =
proc stop*(s: Switch) {.async.} =
trace "stopping switch"
if s.pubSub.isSome:
@ -274,21 +276,21 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
let conn = await s.dial(peerInfo, s.pubSub.get().codec)
await s.pubSub.get().subscribeToPeer(conn)
proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} =
proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} =
## subscribe to a pubsub topic
if s.pubSub.isNone:
raise newNoPubSubException()
result = s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] {.gcsafe.} =
proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] {.gcsafe.} =
## unsubscribe from topics
if s.pubSub.isNone:
raise newNoPubSubException()
result = s.pubSub.get().unsubscribe(topics)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
# pubslish to pubsub topic
if s.pubSub.isNone:
raise newNoPubSubException()
@ -312,7 +314,7 @@ proc newSwitch*(peerInfo: PeerInfo,
result.secureManagers = initTable[string, Secure]()
let s = result # can't capture result
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
trace "handling connection for", peerInfo = stream.peerInfo
await s.ms.handle(stream) # handle incoming connection
@ -337,3 +339,4 @@ proc newSwitch*(peerInfo: PeerInfo,
if pubSub.isSome:
result.pubSub = pubSub
result.mount(pubSub.get())

View File

@ -9,7 +9,7 @@
import unittest, sequtils, options
import chronos
import utils,
import utils,
../../libp2p/[switch, crypto/crypto]
suite "FloodSub":
@ -72,7 +72,7 @@ suite "FloodSub":
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<10:
nodes.add(createNode())
nodes.add(newStandardSwitch())
var awaitters: seq[Future[void]]
for node in nodes:
@ -104,7 +104,7 @@ suite "FloodSub":
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<10:
nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true))
nodes.add newStandardSwitch(triggerSelf = true)
var awaitters: seq[Future[void]]
for node in nodes:
@ -118,7 +118,7 @@ suite "FloodSub":
for node in nodes:
await node.publish("foobar", cast[seq[byte]]("Hello!"))
await sleepAsync(10.millis)
await sleepAsync(100.millis)
await allFutures(nodes.mapIt(it.stop()))

View File

@ -63,7 +63,7 @@ suite "GossipSub":
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<2:
nodes.add(createNode(gossip = true))
nodes.add newStandardSwitch(gossip = true)
var awaitters: seq[Future[void]]
for node in nodes:
@ -143,7 +143,7 @@ suite "GossipSub":
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<2:
nodes.add(createNode(gossip = true))
nodes.add newStandardSwitch(gossip = true)
var awaitters: seq[Future[void]]
for node in nodes:
@ -384,7 +384,7 @@ suite "GossipSub":
var awaitters: seq[Future[void]]
for i in 0..<10:
nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true, true))
nodes.add newStandardSwitch(triggerSelf = true, gossip = true)
awaitters.add((await nodes[i].start()))
var seen: Table[string, int]

View File

@ -1,57 +1,11 @@
import options, tables
import chronos
import ../../libp2p/[switch,
peer,
connection,
multiaddress,
peerinfo,
muxers/muxer,
crypto/crypto,
muxers/mplex/mplex,
muxers/mplex/types,
protocols/identify,
transports/transport,
transports/tcptransport,
protocols/secure/secure,
protocols/secure/secio,
protocols/pubsub/pubsub,
protocols/pubsub/gossipsub,
protocols/pubsub/floodsub]
proc createMplex(conn: Connection): Muxer =
result = newMplex(conn)
proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
address: string = "/ip4/127.0.0.1/tcp/0",
triggerSelf: bool = false,
gossip: bool = false): Switch =
var seckey = privKey
if privKey.isNone:
seckey = some(PrivateKey.random(RSA))
var peerInfo = PeerInfo.init(seckey.get(), @[Multiaddress.init(address)])
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(newTransport(TcpTransport))]
let muxers = [(MplexCodec, mplexProvider)].toTable()
let identify = newIdentify(peerInfo)
let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable()
var pubSub: Option[PubSub]
if gossip:
pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf)))
else:
pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf)))
result = newSwitch(peerInfo,
transports,
identify,
muxers,
secureManagers = secureManagers,
pubSub = pubSub)
import ../../libp2p/standard_setup
export standard_setup
proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
for i in 0..<num:
result.add(createNode(gossip = gossip))
result.add(newStandardSwitch(gossip = gossip))
proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
for dialer in nodes: