Merge branch 'master' into gossip-one-one

This commit is contained in:
Giovanni Petrantoni 2020-09-08 18:03:13 +09:00
commit 84cc85b536
36 changed files with 951 additions and 934 deletions

View File

@ -88,6 +88,13 @@ steps:
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=2
echo "Found ${ncpu} cores"
if [[ $PLATFORM == "x86" ]]; then
choco --version
choco install --x86 openssl
export PATH="/c/Program Files (x86)/OpenSSL-Win32/bin:${PATH}"
echo "PATH=${PATH}"
fi
# build nim from our own branch - this to avoid the day-to-day churn and
# regressions of the fast-paced Nim development while maintaining the
# flexibility to apply patches

View File

@ -190,7 +190,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let libp2pFuts = await switch.start()
chatProto.started = true
let id = peerInfo.peerId.pretty
let id = $peerInfo.peerId
echo "PeerID: " & id
echo "listening on: "
for a in peerInfo.addrs:

View File

@ -11,7 +11,7 @@ requires "nim >= 1.2.0",
"nimcrypto >= 0.4.1",
"bearssl >= 0.1.4",
"chronicles >= 0.7.2",
"chronos >= 2.3.8",
"chronos >= 2.5.2",
"metrics",
"secp256k1",
"stew >= 0.1.0"

View File

@ -76,11 +76,12 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
return muxer == c.muxed[conn].muxer
proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
trace "cleaning up muxer for peer"
trace "Cleaning up muxer", m = muxerHolder.muxer
await muxerHolder.muxer.close()
if not(isNil(muxerHolder.handle)):
await muxerHolder.handle # TODO noraises?
trace "Cleaned up muxer", m = muxerHolder.muxer
proc delConn(c: ConnManager, conn: Connection) =
let peerId = conn.peerInfo.peerId
@ -91,6 +92,8 @@ proc delConn(c: ConnManager, conn: Connection) =
c.conns.del(peerId)
libp2p_peers.set(c.conns.len.int64)
trace "Removed connection", conn
proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
## clean connection's resources such as muxers and streams
@ -113,17 +116,24 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
finally:
await conn.close()
trace "connection cleaned up", peer = $conn.peerInfo
trace "Connection cleaned up", conn
proc onClose(c: ConnManager, conn: Connection) {.async.} =
## connection close even handler
##
## triggers the connections resource cleanup
##
await conn.join()
trace "triggering connection cleanup", peer = $conn.peerInfo
await c.cleanupConn(conn)
try:
await conn.join()
trace "Connection closed, cleaning up", conn
await c.cleanupConn(conn)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
debug "Unexpected cancellation in connection manager's cleanup", conn
except CatchableError as exc:
debug "Unexpected exception in connection manager's cleanup",
errMsg = exc.msg, conn
proc selectConn*(c: ConnManager,
peerId: PeerID,
@ -160,7 +170,7 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer =
if conn in c.muxed:
return c.muxed[conn].muxer
else:
debug "no muxer for connection", conn = $conn
debug "no muxer for connection", conn
proc storeConn*(c: ConnManager, conn: Connection) =
## store a connection
@ -174,7 +184,7 @@ proc storeConn*(c: ConnManager, conn: Connection) =
let peerId = conn.peerInfo.peerId
if c.conns.getOrDefault(peerId).len > c.maxConns:
trace "too many connections", peer = $peerId,
debug "too many connections", peer = conn,
conns = c.conns.getOrDefault(peerId).len
raise newTooManyConnections()
@ -184,11 +194,13 @@ proc storeConn*(c: ConnManager, conn: Connection) =
c.conns[peerId].incl(conn)
# launch on close listener
asyncCheck c.onClose(conn)
# Launch on close listener
# All the errors are handled inside `onClose()` procedure.
asyncSpawn c.onClose(conn)
libp2p_peers.set(c.conns.len.int64)
trace "stored connection", connections = c.conns.len, peer = peerId
trace "Stored connection",
connections = c.conns.len, conn, direction = $conn.dir
proc storeOutgoing*(c: ConnManager, conn: Connection) =
conn.dir = Direction.Out
@ -214,7 +226,7 @@ proc storeMuxer*(c: ConnManager,
muxer: muxer,
handle: handle)
trace "stored muxer", connections = c.conns.len
trace "Stored muxer", connections = c.conns.len, muxer
proc getMuxedStream*(c: ConnManager,
peerId: PeerID,
@ -248,8 +260,10 @@ proc getMuxedStream*(c: ConnManager,
proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} =
## drop connections and cleanup resources for peer
##
trace "Dropping peer", peerId
let conns = c.conns.getOrDefault(peerId)
for conn in conns:
trace "Removing connection", conn
delConn(c, conn)
var muxers: seq[MuxerHolder]
@ -263,6 +277,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} =
for conn in conns:
await conn.close()
trace "Dropped peer", peerId
proc close*(c: ConnManager) {.async.} =
## cleanup resources for the connection

View File

@ -1304,20 +1304,3 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string,
except Exception as exc:
await api.closeConnection(transp)
raise exc
proc `$`*(pinfo: PeerInfo): string =
## Get string representation of ``PeerInfo`` object.
result = newStringOfCap(128)
result.add("{PeerID: '")
result.add($pinfo.peer.pretty())
result.add("' Addresses: [")
let length = len(pinfo.addresses)
for i in 0..<length:
result.add("'")
result.add($pinfo.addresses[i])
result.add("'")
if i < length - 1:
result.add(", ")
result.add("]}")
if len(pinfo.addresses) > 0:
result = result

View File

@ -31,10 +31,10 @@ proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType =
proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
let header = await conn.readVarint()
trace "read header varint", varint = header
trace "read header varint", varint = header, conn
let data = await conn.readLp(MaxMsgSize)
trace "read data", dataLen = data.len, data = shortLog(data)
trace "read data", dataLen = data.len, data = shortLog(data), conn
let msgType = header and 0x7
if msgType.int > ord(MessageType.ResetOut):
@ -46,7 +46,7 @@ proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]) {.async, gcsafe.} =
trace "sending data over mplex", oid = $conn.oid,
trace "sending data over mplex", conn,
id,
msgType,
data = data.len

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import oids, deques
import std/[oids, strformat]
import chronos, chronicles, metrics
import types,
coder,
@ -66,87 +66,60 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped =
if not(isNil(lock)) and lock.locked:
lock.release()
template withEOFExceptions(body: untyped): untyped =
try:
body
except CancelledError as exc:
raise exc
except LPStreamEOFError as exc:
trace "muxed connection EOF", exc = exc.msg
except LPStreamClosedError as exc:
trace "muxed connection closed", exc = exc.msg
except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg
func shortLog*(s: LPChannel): auto =
if s.isNil: "LPChannel(nil)"
elif s.conn.peerInfo.isNil: $s.oid
elif s.name != $s.oid: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}"
else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}"
chronicles.formatIt(LPChannel): shortLog(it)
proc closeMessage(s: LPChannel) {.async.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
## send close message - this will not raise
## on EOF or Closed
withWriteLock(s.writeLock):
trace "sending close message"
trace "sending close message", s
await s.conn.writeMsg(s.id, s.closeCode) # write close
proc resetMessage(s: LPChannel) {.async.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
## send reset message - this will not raise
withEOFExceptions:
try:
withWriteLock(s.writeLock):
trace "sending reset message"
trace "sending reset message", s
await s.conn.writeMsg(s.id, s.resetCode) # write reset
except CancelledError:
# This procedure is called from one place and never awaited, so there no
# need to re-raise CancelledError.
debug "Unexpected cancellation while resetting channel", s
except LPStreamEOFError as exc:
trace "muxed connection EOF", exc = exc.msg, s
except LPStreamClosedError as exc:
trace "muxed connection closed", exc = exc.msg, s
except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg, s
except CatchableError as exc:
debug "Unhandled exception leak", exc = exc.msg, s
proc open*(s: LPChannel) {.async, gcsafe.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
## NOTE: Don't call withExcAndLock or withWriteLock,
## because this already gets called from writeHandler
## which is locked
await s.conn.writeMsg(s.id, MessageType.New, s.name)
trace "opened channel"
trace "opened channel", s
s.isOpen = true
proc closeRemote*(s: LPChannel) {.async.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
trace "got EOF, closing channel"
trace "closing remote", s
try:
await s.drainBuffer()
s.isEof = true # set EOF immediately to prevent further reads
# close parent bufferstream to prevent further reads
await procCall BufferStream(s).close()
trace "channel closed on EOF"
trace "channel closed on EOF", s
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception closing remote channel", exc = exc.msg
trace "exception closing remote channel", exc = exc.msg, s
trace "closed remote", s
method closed*(s: LPChannel): bool =
## this emulates half-closed behavior
@ -156,24 +129,13 @@ method closed*(s: LPChannel): bool =
s.closedLocal
method reset*(s: LPChannel) {.base, async, gcsafe.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
if s.closedLocal and s.isEof:
trace "channel already closed or reset"
trace "channel already closed or reset", s
return
trace "resetting channel"
trace "resetting channel", s
# we asyncCheck here because the other end
# might be dead already - reset is always
# optimistic
asyncCheck s.resetMessage()
asyncSpawn s.resetMessage()
try:
# drain the buffer before closing
@ -186,48 +148,43 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in reset", exc = exc.msg
trace "exception in reset", exc = exc.msg, s
trace "channel reset"
trace "channel reset", s
method close*(s: LPChannel) {.async, gcsafe.} =
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
# stack = getStackTrace()
if s.closedLocal:
trace "channel already closed"
trace "channel already closed", s
return
trace "closing local lpchannel"
trace "closing local lpchannel", s
proc closeInternal() {.async.} =
try:
await s.closeMessage().wait(2.minutes)
if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close()
except CancelledError as exc:
except CancelledError:
trace "Unexpected cancellation while closing channel", s
await s.reset()
raise exc
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
except CatchableError as exc:
trace "exception closing channel", exc = exc.msg
trace "exception closing channel", exc = exc.msg, s
await s.reset()
trace "lpchannel closed local"
trace "lpchannel closed local", s
s.closedLocal = true
asyncCheck closeInternal()
# All the errors are handled inside `closeInternal()` procedure.
asyncSpawn closeInternal()
method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = "LPChannel"
s.timeoutHandler = proc() {.async, gcsafe.} =
trace "idle timeout expired, resetting LPChannel"
trace "idle timeout expired, resetting LPChannel", s
await s.reset()
procCall BufferStream(s).initStream()
@ -254,34 +211,26 @@ proc init*(
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
dir: if initiator: Direction.Out else: Direction.In)
logScope:
id = chann.id
initiator = chann.initiator
name = chann.name
oid = $chann.oid
peer = $chann.conn.peerInfo
# stack = getStackTrace()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
try:
if chann.isLazy and not(chann.isOpen):
await chann.open()
# writes should happen in sequence
trace "sending data", len = data.len
trace "sending data", len = data.len, conn, chann
await conn.writeMsg(chann.id,
chann.msgCode,
data)
except CatchableError as exc:
trace "exception in lpchannel write handler", exc = exc.msg
await chann.reset()
trace "exception in lpchannel write handler", exc = exc.msg, chann
asyncSpawn conn.close()
raise exc
chann.initBufferStream(writeHandler, size)
when chronicles.enabledLogLevel == LogLevel.TRACE:
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
trace "created new lpchannel"
trace "created new lpchannel", chann
return chann

View File

@ -32,6 +32,7 @@ when defined(libp2p_expensive_metrics):
type
TooManyChannels* = object of CatchableError
InvalidChannelIdError* = object of CatchableError
Mplex* = ref object of Muxer
channels: array[bool, Table[uint64, LPChannel]]
@ -42,20 +43,34 @@ type
oid*: Oid
maxChannCount: int
func shortLog*(m: MPlex): auto = shortLog(m.connection)
chronicles.formatIt(Mplex): shortLog(it)
proc newTooManyChannels(): ref TooManyChannels =
newException(TooManyChannels, "max allowed channel count exceeded")
proc newInvalidChannelIdError(): ref InvalidChannelIdError =
newException(InvalidChannelIdError, "max allowed channel count exceeded")
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables
##
await chann.join()
m.channels[chann.initiator].del(chann.id)
trace "cleaned up channel", id = chann.id, oid = $chann.oid
try:
await chann.join()
m.channels[chann.initiator].del(chann.id)
debug "cleaned up channel", m, chann
when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set(
m.channels[chann.initiator].len.int64,
labelValues = [$chann.initiator, $m.connection.peerInfo])
when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set(
m.channels[chann.initiator].len.int64,
labelValues = [$chann.initiator, $m.connection.peerInfo.peerId])
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
debug "Unexpected cancellation in mplex channel cleanup",
m, chann
except CatchableError as exc:
debug "error cleaning up mplex channel", exc = exc.msg, m, chann
proc newStreamInternal*(m: Mplex,
initiator: bool = true,
@ -70,10 +85,9 @@ proc newStreamInternal*(m: Mplex,
m.currentId.inc(); m.currentId
else: chanId
trace "creating new channel", channelId = id,
initiator = initiator,
name = name,
oid = $m.oid
if id in m.channels[initiator]:
raise newInvalidChannelIdError()
result = LPChannel.init(
id,
m.connection,
@ -85,44 +99,40 @@ proc newStreamInternal*(m: Mplex,
result.peerInfo = m.connection.peerInfo
result.observedAddr = m.connection.observedAddr
doAssert(id notin m.channels[initiator],
"channel slot already taken!")
trace "Creating new channel", id, initiator, name, m, channel = result
m.channels[initiator][id] = result
asyncCheck m.cleanupChann(result)
# All the errors are handled inside `cleanupChann()` procedure.
asyncSpawn m.cleanupChann(result)
when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set(
m.channels[initiator].len.int64,
labelValues = [$initiator, $m.connection.peerInfo])
labelValues = [$initiator, $m.connection.peerInfo.peerId])
proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
## call the muxer stream handler for this channel
##
try:
await m.streamHandler(chann)
trace "finished handling stream"
trace "finished handling stream", m, chann
doAssert(chann.closed, "connection not closed by handler!")
except CancelledError as exc:
trace "cancelling stream handler", exc = exc.msg
except CancelledError:
trace "Unexpected cancellation in stream handler", m, chann
await chann.reset()
raise exc
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
except CatchableError as exc:
trace "exception in stream handler", exc = exc.msg
trace "Exception in mplex stream handler",
exc = exc.msg, m, chann
await chann.reset()
method handle*(m: Mplex) {.async, gcsafe.} =
logScope: moid = $m.oid
trace "starting mplex main loop"
trace "Starting mplex main loop", m
try:
defer:
trace "stopping mplex main loop"
await m.close()
while not m.connection.atEof:
trace "waiting for data"
trace "waiting for data", m
let
(id, msgType, data) = await m.connection.readMsg()
initiator = bool(ord(msgType) and 1)
@ -133,57 +143,57 @@ method handle*(m: Mplex) {.async, gcsafe.} =
msgType = msgType
size = data.len
trace "read message from connection", data = data.shortLog
trace "read message from connection", m, data = data.shortLog
var channel =
if MessageType(msgType) != MessageType.New:
let tmp = m.channels[initiator].getOrDefault(id, nil)
if tmp == nil:
trace "Channel not found, skipping"
trace "Channel not found, skipping", m
continue
tmp
else:
if m.channels[false].len > m.maxChannCount - 1:
warn "too many channels created by remote peer", allowedMax = MaxChannelCount
warn "too many channels created by remote peer",
allowedMax = MaxChannelCount, m
raise newTooManyChannels()
let name = string.fromBytes(data)
m.newStreamInternal(false, id, name, timeout = m.outChannTimeout)
logScope:
name = channel.name
oid = $channel.oid
case msgType:
of MessageType.New:
trace "created channel"
trace "created channel", m, channel
if not isNil(m.streamHandler):
# launch handler task
asyncCheck m.handleStream(channel)
# Launch handler task
# All the errors are handled inside `handleStream()` procedure.
asyncSpawn m.handleStream(channel)
of MessageType.MsgIn, MessageType.MsgOut:
if data.len > MaxMsgSize:
warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize
warn "attempting to send a packet larger than allowed",
allowed = MaxMsgSize, channel
raise newLPStreamLimitError()
trace "pushing data to channel"
trace "pushing data to channel", m, channel
await channel.pushTo(data)
trace "pushed data to channel"
trace "pushed data to channel", m, channel
of MessageType.CloseIn, MessageType.CloseOut:
trace "closing channel"
await channel.closeRemote()
trace "closed channel"
of MessageType.ResetIn, MessageType.ResetOut:
trace "resetting channel"
await channel.reset()
trace "reset channel"
except CancelledError as exc:
raise exc
except CancelledError:
# This procedure is spawned as task and it is not part of public API, so
# there no way for this procedure to be cancelled implicitely.
debug "Unexpected cancellation in mplex handler", m
except CatchableError as exc:
trace "Exception occurred", exception = exc.msg, oid = $m.oid
trace "Exception occurred", exception = exc.msg, m
finally:
trace "stopping mplex main loop", m
await m.close()
proc init*(M: type Mplex,
conn: Connection,
@ -210,7 +220,7 @@ method close*(m: Mplex) {.async, gcsafe.} =
if m.isClosed:
return
trace "closing mplex muxer", moid = $m.oid
trace "closing mplex muxer", m
m.isClosed = true

View File

@ -35,6 +35,9 @@ type
streamHandler*: StreamHandler # triggered every time there is a new stream, called for any muxer instance
muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created
func shortLog*(m: Muxer): auto = shortLog(m.connection)
chronicles.formatIt(Muxer): shortLog(it)
# muxer interface
method newStream*(m: Muxer, name: string = "", lazy: bool = false):
Future[Connection] {.base, async, gcsafe.} = discard
@ -49,7 +52,7 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider
method init(c: MuxerProvider) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
trace "starting muxer handler", proto=proto, peer = $conn
trace "starting muxer handler", proto=proto, conn
try:
let
muxer = c.newMuxer(conn)
@ -68,11 +71,8 @@ method init(c: MuxerProvider) =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in muxer handler", exc = exc.msg, peer = $conn, proto=proto
trace "exception in muxer handler", exc = exc.msg, conn, proto
finally:
await conn.close()
c.handler = handler
proc `$`*(m: Muxer): string =
$m.connection

View File

@ -11,11 +11,13 @@
{.push raises: [Defect].}
import hashes
import std/hashes
import chronicles
import nimcrypto/utils, stew/base58
import crypto/crypto, multicodec, multihash, vbuffer
import protobuf/minprotobuf
import stew/results
export results
const
@ -27,11 +29,24 @@ type
PeerID* = object
data*: seq[byte]
PeerIDError* = object of CatchableError
proc pretty*(pid: PeerID): string {.inline.} =
func `$`*(pid: PeerID): string =
## Return base58 encoded ``pid`` representation.
result = Base58.encode(pid.data)
Base58.encode(pid.data)
func shortLog*(pid: PeerId): string =
## Returns compact string representation of ``pid``.
var spid = $pid
if len(spid) <= 10:
result = spid
else:
result = newStringOfCap(10)
for i in 0..<2:
result.add(spid[i])
result.add("*")
for i in (len(spid) - 6)..spid.high:
result.add(spid[i])
chronicles.formatIt(PeerID): shortLog(it)
proc toBytes*(pid: PeerID, data: var openarray[byte]): int =
## Store PeerID ``pid`` to array of bytes ``data``.
@ -112,19 +127,6 @@ proc extractPublicKey*(pid: PeerID, pubkey: var PublicKey): bool =
let length = len(mh.data.buffer)
result = pubkey.init(mh.data.buffer.toOpenArray(mh.dpos, length - 1))
proc `$`*(pid: PeerID): string =
## Returns compact string representation of ``pid``.
var spid = pid.pretty()
if len(spid) <= 10:
result = spid
else:
result = newStringOfCap(10)
for i in 0..<2:
result.add(spid[i])
result.add("*")
for i in (len(spid) - 6)..spid.high:
result.add(spid[i])
proc init*(pid: var PeerID, data: openarray[byte]): bool =
## Initialize peer id from raw binary representation ``data``.
##

View File

@ -41,20 +41,15 @@ type
of HasPublic:
key: Option[PublicKey]
proc id*(p: PeerInfo): string =
if not(isNil(p)):
return p.peerId.pretty()
proc `$`*(p: PeerInfo): string = p.id
proc shortLog*(p: PeerInfo): auto =
func shortLog*(p: PeerInfo): auto =
(
id: p.id(),
peerId: $p.peerId,
addrs: mapIt(p.addrs, $it),
protocols: mapIt(p.protocols, $it),
protoVersion: p.protoVersion,
agentVersion: p.agentVersion,
)
chronicles.formatIt(PeerInfo): shortLog(it)
template postInit(peerinfo: PeerInfo,
addrs: openarray[MultiAddress],

View File

@ -106,16 +106,16 @@ method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
try:
defer:
trace "exiting identify handler", oid = $conn.oid
trace "exiting identify handler", conn
await conn.close()
trace "handling identify request", oid = $conn.oid
trace "handling identify request", conn
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
await conn.writeLp(pb.buffer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in identify handler", exc = exc.msg
trace "exception in identify handler", exc = exc.msg, conn
p.handler = handle
p.codec = IdentifyCodec
@ -123,10 +123,10 @@ method init*(p: Identify) =
proc identify*(p: Identify,
conn: Connection,
remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} =
trace "initiating identify", peer = $conn
trace "initiating identify", conn
var message = await conn.readLp(64*1024)
if len(message) == 0:
trace "identify: Empty message received!"
trace "identify: Empty message received!", conn
raise newException(IdentityInvalidMsgError, "Empty message received!")
let infoOpt = decodeMsg(message)
@ -144,8 +144,8 @@ proc identify*(p: Identify,
# have in most cases
if peer.get() != remotePeerInfo.peerId:
trace "Peer ids don't match",
remote = peer.get().pretty(),
local = remotePeerInfo.id
remote = peer,
local = remotePeerInfo.peerId
raise newException(IdentityNoMatchError, "Peer ids don't match")

View File

@ -7,16 +7,17 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import sequtils, tables, sets, strutils
import std/[sequtils, sets, tables]
import chronos, chronicles, metrics
import pubsub,
pubsubpeer,
timedcache,
peertable,
rpc/[messages, message],
import ./pubsub,
./pubsubpeer,
./timedcache,
./peertable,
./rpc/[message, messages],
../../stream/connection,
../../peerid,
../../peerinfo
../../peerinfo,
../../utility
logScope:
topics = "floodsub"
@ -38,75 +39,57 @@ method subscribeTopic*(f: FloodSub,
f.floodsub[topic] = initHashSet[PubSubPeer]()
if subscribe:
trace "adding subscription for topic", peer = peer.id, name = topic
trace "adding subscription for topic", peer, topic
# subscribe the peer to the topic
f.floodsub[topic].incl(peer)
else:
trace "removing subscription for topic", peer = peer.id, name = topic
trace "removing subscription for topic", peer, topic
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peer)
method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects
##
trace "unsubscribing floodsub peer", peer = $peer
trace "unsubscribing floodsub peer", peer
let pubSubPeer = f.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(pubSubPeer)
for _, v in f.floodsub.mpairs():
v.excl(pubSubPeer)
procCall PubSub(f).unsubscribePeer(peer)
method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsgs)
rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsg)
for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages
var toSendPeers = initHashSet[PubSubPeer]()
for msg in m.messages: # for every message
let msgId = f.msgIdProvider(msg)
logScope: msgId
for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg)
if msgId notin f.seen:
f.seen.put(msgId) # add the message to the seen cache
if f.seen.put(msgId):
trace "Dropping already-seen message", msgId, peer
continue
if f.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification"
continue
if f.verifySignature and not msg.verify(peer.peerId):
debug "Dropping message due to failed signature verification", msgId, peer
continue
if not (await f.validate(msg)):
trace "dropping message due to failed validation"
continue
if not (await f.validate(msg)):
trace "Dropping message due to failed validation", msgId, peer
continue
for t in msg.topicIDs: # for every topic in the message
if t in f.floodsub:
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
if t in f.topics: # check that we're subscribed to it
for h in f.topics[t].handler:
trace "calling handler for message", topicId = t,
localPeer = f.peerInfo.id,
fromPeer = msg.fromPeer.pretty
var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
f.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
try:
await h(t, msg.data) # trigger user provided handler
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg
await handleData(f, t, msg.data)
# forward the message to all peers interested in it
let published = await f.broadcast(
toSeq(toSendPeers),
RPCMsg(messages: m.messages),
DefaultSendTimeout)
trace "forwared message to peers", peers = published
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg]))
trace "Forwared message to peers", peers = toSendPeers.len
method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
@ -114,59 +97,74 @@ method init*(f: FloodSub) =
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...
##
await f.handleConn(conn, proto)
try:
await f.handleConn(conn, proto)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in floodsub handler", conn
except CatchableError as exc:
trace "FloodSub handler leaks an error", exc = exc.msg, conn
f.handler = handler
f.codec = FloodSubCodec
method publish*(f: FloodSub,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data, timeout)
discard await procCall PubSub(f).publish(topic, data)
if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish"
return
trace "Publishing message on topic", data = data.shortLog, topic
if topic notin f.floodsub:
trace "missing peers for topic, skipping publish"
return
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish", topic
return 0
let peers = toSeq(f.floodsub.getOrDefault(topic))
if peers.len == 0:
debug "No peers for topic, skipping publish", topic
return 0
trace "publishing on topic", name = topic
inc f.msgSeqno
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
let
msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
msgId = f.msgIdProvider(msg)
# start the future but do not wait yet
let published = await f.broadcast(
toSeq(f.floodsub.getOrDefault(topic)),
RPCMsg(messages: @[msg]),
timeout)
trace "Created new message",
msg = shortLog(msg), peers = peers.len, topic, msgId
if f.seen.put(msgId):
# custom msgid providers might cause this
trace "Dropping already-seen message", msgId, topic
return 0
# Try to send to all peers that are known to be interested
f.broadcast(peers, RPCMsg(messages: @[msg]))
when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published,
msg = msg.shortLog()
return published
trace "Published message to peers", msgId, topic
return peers.len
method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics)
for p in f.peers.values:
discard await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
await procCall PubSub(f).unsubscribeAll(topic)
for p in f.peers.values:
discard await f.sendSubs(p, @[topic], false)
f.sendSubs(p, @[topic], false)
method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub()
f.floodsub = initTable[string, HashSet[PubSubPeer]]()
f.seen = newTimedCache[string](2.minutes)
f.seen = TimedCache[string].init(2.minutes)
f.init()

View File

@ -9,18 +9,17 @@
import std/[tables, sets, options, sequtils, random, algorithm]
import chronos, chronicles, metrics
import pubsub,
floodsub,
pubsubpeer,
peertable,
mcache,
timedcache,
rpc/[messages, message],
import ./pubsub,
./floodsub,
./pubsubpeer,
./peertable,
./mcache,
./timedcache,
./rpc/[messages, message],
../protocol,
../../peerinfo,
../../stream/connection,
../../peerinfo,
../../peerid,
../../errors,
../../utility
import stew/results
export results
@ -141,7 +140,6 @@ type
mcache*: MCache # messages cache
heartbeatFut: Future[void] # cancellation future for heartbeat interval
heartbeatRunning: bool
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
peerStats: Table[PubSubPeer, PeerStats]
parameters*: GossipSubParams
@ -268,12 +266,14 @@ method init*(g: GossipSub) =
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...
##
# TODO
# if conn.peerInfo.maintain:
# g.explicitPeers.incl(conn.peerInfo.id)
await g.handleConn(conn, proto)
try:
await g.handleConn(conn, proto)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in gossipsub handler", conn
except CatchableError as exc:
trace "GossipSub handler leaks an error", exc = exc.msg, conn
g.handler = handler
g.codecs &= GossipSubCodec
@ -346,6 +346,7 @@ proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] =
proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic
logScope: topic
trace "about to replenish fanout"
if g.fanout.peers(topic) < GossipSubDLo:
@ -550,14 +551,14 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# Send changes to peers after table updates to avoid stale state
if grafts.len > 0:
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
discard await g.broadcast(grafts, graft, DefaultSendTimeout)
g.broadcast(grafts, graft)
if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(prunes, prune, DefaultSendTimeout)
g.broadcast(prunes, prune)
proc dropFanoutPeers(g: GossipSub) =
# drop peers that we haven't published to in
@ -622,11 +623,12 @@ func `/`(a, b: Duration): float64 =
fa / fb
proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
if peer.sendConn == nil:
if peer.connections.len == 0:
0.0
else:
let
address = peer.sendConn.observedAddr
# TODO, we are just using the first connections for now
address = peer.connections[0].observedAddr
ipPeers = g.peersInIP.getOrDefault(address)
len = ipPeers.len.float64
if len > g.parameters.ipColocationFactorThreshold:
@ -771,7 +773,7 @@ proc heartbeat(g: GossipSub) {.async.} =
topicID: t,
peers: g.peerExchangeList(t),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(prunes, prune, DefaultSendTimeout)
g.broadcast(prunes, prune)
await g.rebalanceMesh(t)
@ -782,14 +784,11 @@ proc heartbeat(g: GossipSub) {.async.} =
g.replenishFanout(t)
let peers = g.getGossipPeers()
var sent: seq[Future[bool]]
for peer, control in peers:
g.peers.withValue(peer.peerId, pubsubPeer) do:
sent &= g.send(
g.send(
pubsubPeer[],
RPCMsg(control: some(control)),
DefaultSendTimeout)
checkFutures(await allFinished(sent))
RPCMsg(control: some(control)))
g.mcache.shift() # shift the cache
except CancelledError as exc:
@ -808,14 +807,16 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
## handle peer disconnects
##
trace "unsubscribing gossipsub peer", peer = $peer
trace "unsubscribing gossipsub peer", peer
let pubSubPeer = g.peers.getOrDefault(peer)
if pubSubPeer.isNil:
trace "no peer to unsubscribe", peer
return
# remove from peer IPs collection too
if pubSubPeer.sendConn != nil:
g.peersInIP.withValue(pubSubPeer.sendConn.observedAddr, s) do:
if pubSubPeer.connections.len > 0:
# TODO, we are just using the first connections for now
g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s) do:
s[].excl(pubSubPeer)
for t in toSeq(g.gossipsub.keys):
@ -858,10 +859,11 @@ method subscribeTopic*(g: GossipSub,
topic: string,
subscribe: bool,
peer: PubSubPeer) {.gcsafe.} =
procCall FloodSub(g).subscribeTopic(topic, subscribe, peer)
# Skip floodsub - we don't want it to add the peer to `g.floodsub`
procCall PubSub(g).subscribeTopic(topic, subscribe, peer)
logScope:
peer = $peer.id
peer
topic
g.onNewPeer(peer)
@ -899,7 +901,7 @@ proc handleGraft(g: GossipSub,
for graft in grafts:
let topic = graft.topicID
logScope:
peer = peer.id
peer
topic
trace "peer grafted topic"
@ -958,7 +960,7 @@ proc handleGraft(g: GossipSub,
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes:
trace "peer pruned topic", peer = peer.id, topic = prune.topicID
trace "peer pruned topic", peer, topic = prune.topicID
# add peer backoff
if prune.backoff > 0:
@ -985,7 +987,7 @@ proc handleIHave(g: GossipSub,
dec peer.iHaveBudget
for ihave in ihaves:
trace "peer sent ihave",
peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs
peer, topic = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.mesh:
for m in ihave.messageIDs:
@ -1000,7 +1002,7 @@ proc handleIWant(g: GossipSub,
else:
for iwant in iwants:
for mid in iwant.messageIDs:
trace "peer sent iwant", peer = peer.id, messageID = mid
trace "peer sent iwant", peer, messageID = mid
let msg = g.mcache.get(mid)
if msg.isSome:
# avoid spam
@ -1021,124 +1023,87 @@ proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) =
method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsgs)
rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsg)
var userHandlers: seq[Future[void]]
for msg in rpcMsg.messages: # for every message
let msgId = g.msgIdProvider(msg)
for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[PubSubPeer]
for msg in m.messages: # for every message
let msgId = g.msgIdProvider(msg)
logScope: msgId
if g.seen.put(msgId):
trace "Dropping already-seen message", msgId, peer
if msgId in g.seen:
trace "message already processed, skipping"
# make sure to update score tho before continuing
for t in msg.topicIDs: # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
var stats = g.peerStats[peer].topicInfos.getOrDefault(t)
if stats.inMesh:
stats.meshMessageDeliveries += 1
if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
# make sure to update score tho before continuing
for t in msg.topicIDs: # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
var stats = g.peerStats[peer].topicInfos.getOrDefault(t)
if stats.inMesh:
stats.meshMessageDeliveries += 1
if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
# commit back to the table
g.peerStats[peer].topicInfos[t] = stats
continue
# commit back to the table
g.peerStats[peer].topicInfos[t] = stats
continue
g.mcache.put(msgId, msg)
trace "processing message"
if g.verifySignature and not msg.verify(peer.peerId):
debug "Dropping message due to failed signature verification", msgId, peer
g.punishPeer(peer, msg)
continue
g.seen.put(msgId) # add the message to the seen cache
if not (await g.validate(msg)):
trace "Dropping message due to failed validation", msgId, peer
g.punishPeer(peer, msg)
continue
if g.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification"
g.punishPeer(peer, msg)
continue
var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
if not (await g.validate(msg)):
trace "dropping message due to failed validation", peer
g.punishPeer(peer, msg)
continue
# contribute to peer score first delivery
var stats = g.peerStats[peer].topicInfos.getOrDefault(t)
stats.firstMessageDeliveries += 1
if stats.firstMessageDeliveries > topicParams.firstMessageDeliveriesCap:
stats.firstMessageDeliveries = topicParams.firstMessageDeliveriesCap
# this shouldn't happen
if g.peerInfo.peerId == msg.fromPeer:
trace "skipping messages from self"
continue
# if in mesh add more delivery score
if stats.inMesh:
stats.meshMessageDeliveries += 1
if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
for t in msg.topicIDs: # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# commit back to the table
g.peerStats[peer].topicInfos[t] = stats
# contribute to peer score first delivery
var stats = g.peerStats[peer].topicInfos.getOrDefault(t)
stats.firstMessageDeliveries += 1
if stats.firstMessageDeliveries > topicParams.firstMessageDeliveriesCap:
stats.firstMessageDeliveries = topicParams.firstMessageDeliveriesCap
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
# if in mesh add more delivery score
if stats.inMesh:
stats.meshMessageDeliveries += 1
if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
await handleData(g, t, msg.data)
# commit back to the table
g.peerStats[peer].topicInfos[t] = stats
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
if t in g.floodsub:
toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic
if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)
if t in g.mesh:
toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic
if t in g.explicit:
toSendPeers.incl(g.explicit[t]) # always forward to explicit peers
if t in g.topics: # if we're subscribed to the topic
for h in g.topics[t].handler:
trace "calling handler for message", topicId = t,
localPeer = g.peerInfo.id,
fromPeer = msg.fromPeer.pretty
userHandlers &= h(t, msg.data) # enqueue user provided handler
# forward the message to all peers interested in it
let published = await g.broadcast(
toSeq(toSendPeers),
RPCMsg(messages: m.messages),
DefaultSendTimeout)
trace "forwared message to peers", peers = published
var respControl: ControlMessage
if m.control.isSome:
let control = m.control.get()
g.handlePrune(peer, control.prune)
respControl.iwant.add(g.handleIHave(peer, control.ihave))
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
respControl.iwant.add(g.handleIHave(peer, control.ihave))
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or messages.len > 0:
if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0:
try:
info "sending control message", msg = respControl
let sent = await g.send(
peer,
RPCMsg(control: some(respControl), messages: messages),
DefaultSendTimeout)
if not sent:
g.unsubscribePeer(peer.peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception forwarding control messages", exc = exc.msg
# await user tasks at the very end
checkFutures(await allFinished(userHandlers));
debug "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
method subscribe*(g: GossipSub,
topic: string,
@ -1157,9 +1122,9 @@ method unsubscribe*(g: GossipSub,
for (topic, handler) in topics:
# delete from mesh only if no handlers are left
if g.topics[topic].handler.len <= 0:
if topic notin g.topics:
if topic in g.mesh:
let peers = g.mesh.getOrDefault(topic)
let peers = g.mesh[topic]
g.mesh.del(topic)
for peer in peers:
g.pruned(peer, topic)
@ -1168,7 +1133,7 @@ method unsubscribe*(g: GossipSub,
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout)
g.broadcast(toSeq(peers), prune)
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
await procCall PubSub(g).unsubscribeAll(topic)
@ -1183,17 +1148,19 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout)
g.broadcast(toSeq(peers), prune)
method publish*(g: GossipSub,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data, timeout)
trace "publishing message on topic", topic, data = data.shortLog
discard await procCall PubSub(g).publish(topic, data)
logScope: topic
trace "Publishing message on topic", data = data.shortLog
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish"
return 0
var peers: HashSet[PubSubPeer]
@ -1226,29 +1193,35 @@ method publish*(g: GossipSub,
# time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
if peers.len == 0:
debug "No peers for topic, skipping publish"
return 0
inc g.msgSeqno
let
msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign)
msgId = g.msgIdProvider(msg)
trace "created new message", msg, topic, peers = peers.len
logScope: msgId
if msgId notin g.mcache:
g.mcache.put(msgId, msg)
trace "Created new message", msg = shortLog(msg), peers = peers.len
if peers.len > 0:
let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout)
when defined(libp2p_expensive_metrics):
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published,
msg = msg.shortLog()
return published
else:
debug "No peers for gossip message", topic, msg = msg.shortLog()
if g.seen.put(msgId):
# custom msgid providers might cause this
trace "Dropping already-seen message"
return 0
g.mcache.put(msgId, msg)
g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]))
when defined(libp2p_expensive_metrics):
if peers.len > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "Published message to peers"
return peers.len
proc maintainDirectPeers(g: GossipSub) {.async.} =
while g.heartbeatRunning:
for id in g.parameters.directPeers:
@ -1264,29 +1237,28 @@ proc maintainDirectPeers(g: GossipSub) {.async.} =
method start*(g: GossipSub) {.async.} =
trace "gossipsub start"
## start pubsub
## start long running/repeating procedures
if not g.heartbeatFut.isNil:
warn "Starting gossipsub twice"
return
withLock g.heartbeatLock:
# setup the heartbeat interval
g.heartbeatRunning = true
g.heartbeatFut = g.heartbeat()
g.directPeersLoop = g.maintainDirectPeers()
g.heartbeatRunning = true
g.heartbeatFut = g.heartbeat()
g.directPeersLoop = g.maintainDirectPeers()
method stop*(g: GossipSub) {.async.} =
trace "gossipsub stop"
if g.heartbeatFut.isNil:
warn "Stopping gossipsub without starting it"
return
## stop pubsub
## stop long running tasks
withLock g.heartbeatLock:
# stop heartbeat interval
g.heartbeatRunning = false
if not g.heartbeatFut.finished:
trace "awaiting last heartbeat"
await g.heartbeatFut
await g.directPeersLoop.cancelAndWait()
# stop heartbeat interval
g.heartbeatRunning = false
g.directPeersLoop.cancel()
if not g.heartbeatFut.finished:
trace "awaiting last heartbeat"
await g.heartbeatFut
trace "heartbeat stopped"
g.heartbeatFut = nil
method initPubSub*(g: GossipSub) =
procCall FloodSub(g).initPubSub()
@ -1297,11 +1269,10 @@ method initPubSub*(g: GossipSub) =
g.parameters.validateParameters().tryGet()
randomize()
g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength)
g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength)
g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer
g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer
g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers
g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
g.control = initTable[string, ControlMessage]() # pending control messages
g.heartbeatLock = newAsyncLock()

View File

@ -7,69 +7,57 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos, chronicles
import tables, options, sets, sequtils
import rpc/[messages], timedcache
import std/[sets, tables, options]
import rpc/[messages]
export sets, tables, messages, options
type
CacheEntry* = object
mid*: string
msg*: Message
topicIDs*: seq[string]
MCache* = ref object of RootObj
msgs*: TimedCache[Message]
MCache* = object of RootObj
msgs*: Table[string, Message]
history*: seq[seq[CacheEntry]]
historySize*: Natural
windowSize*: Natural
proc get*(c: MCache, mid: string): Option[Message] =
func get*(c: MCache, mid: string): Option[Message] =
result = none(Message)
if mid in c.msgs:
result = some(c.msgs[mid])
proc contains*(c: MCache, mid: string): bool =
func contains*(c: MCache, mid: string): bool =
c.get(mid).isSome
proc put*(c: MCache, msgId: string, msg: Message) =
proc handler(key: string, val: Message) {.gcsafe.} =
## make sure we remove the message from history
## to keep things consisten
c.history.applyIt(
it.filterIt(it.mid != msgId)
)
func put*(c: var MCache, msgId: string, msg: Message) =
if msgId notin c.msgs:
c.msgs.put(msgId, msg, handler = handler)
c.history[0].add(CacheEntry(mid: msgId, msg: msg))
c.msgs[msgId] = msg
c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs))
proc window*(c: MCache, topic: string): HashSet[string] =
func window*(c: MCache, topic: string): HashSet[string] =
result = initHashSet[string]()
let len =
if c.windowSize > c.history.len:
c.history.len
else:
c.windowSize
let
len = min(c.windowSize, c.history.len)
if c.history.len > 0:
for slot in c.history[0..<len]:
for entry in slot:
for t in entry.msg.topicIDs:
if t == topic:
result.incl(entry.mid)
break
for i in 0..<len:
for entry in c.history[i]:
for t in entry.topicIDs:
if t == topic:
result.incl(entry.mid)
break
proc shift*(c: MCache) =
while c.history.len > c.historySize:
for entry in c.history.pop():
c.msgs.del(entry.mid)
func shift*(c: var MCache) =
for entry in c.history.pop():
c.msgs.del(entry.mid)
c.history.insert(@[])
proc newMCache*(window: Natural, history: Natural): MCache =
new result
result.historySize = history
result.windowSize = window
result.history = newSeq[seq[CacheEntry]]()
result.history.add(@[]) # initialize with empty slot
result.msgs = newTimedCache[Message](2.minutes)
func init*(T: type MCache, window, history: Natural): T =
T(
history: newSeq[seq[CacheEntry]](history),
historySize: history,
windowSize: window
)

View File

@ -14,9 +14,11 @@ type
PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map
proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool =
let peers = toSeq(t.getOrDefault(topic))
peers.any do (peer: PubSubPeer) -> bool:
peer.peerId == peerId
if topic in t:
for peer in t[topic]:
if peer.peerId == peerId:
return true
false
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added,

View File

@ -72,56 +72,34 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
##
trace "unsubscribing pubsub peer", peer = $peerId
if peerId in p.peers:
p.peers.del(peerId)
p.peers.del(peerId)
libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*(
p: PubSub,
peer: PubSubPeer,
msg: RPCMsg,
timeout: Duration): Future[bool] {.async.} =
## send to remote peer
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) =
## Attempt to send `msg` to remote peer
##
trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg)
try:
await peer.send(msg, timeout)
return true
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending pubsub message to peer",
peer = $peer, msg = shortLog(msg)
# do not unsub during internal testing (no networking)
when not defined(pubsub_internal_testing):
p.unsubscribePeer(peer.peerId)
peer.send(msg)
proc broadcast*(
p: PubSub,
sendPeers: seq[PubSubPeer],
msg: RPCMsg,
timeout: Duration): Future[int] {.async.} =
## send messages and cleanup failed peers
##
sendPeers: openArray[PubSubPeer],
msg: RPCMsg) = # raises: [Defect]
## Attempt to send `msg` to the given peers
trace "broadcasting messages to peers",
peers = sendPeers.len, message = shortLog(msg)
let sent = await allFinished(
sendPeers.mapIt( p.send(it, msg, timeout) ))
return sent.filterIt( it.finished and it.read ).len
peers = sendPeers.len, msg = shortLog(msg)
for peer in sendPeers:
p.send(peer, msg)
proc sendSubs*(p: PubSub,
peer: PubSubPeer,
topics: seq[string],
subscribe: bool): Future[bool] =
subscribe: bool) =
## send subscriptions to remote peer
p.send(
peer,
RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
DefaultSendTimeout)
p.send(peer, RPCMsg.withSubs(topics, subscribe))
method subscribeTopic*(p: PubSub,
topic: string,
@ -132,16 +110,12 @@ method subscribeTopic*(p: PubSub,
method rpcHandler*(p: PubSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, base.} =
rpcMsg: RPCMsg) {.async, base.} =
## handle rpc messages
trace "processing RPC message", peer = peer.id, msgs = rpcMsgs.len
for m in rpcMsgs: # for all RPC messages
trace "processing messages", msg = m.shortLog
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic
p.subscribeTopic(s.topic, s.subscribe, peer)
trace "processing RPC message", msg = rpcMsg.shortLog, peer
for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic, peer
p.subscribeTopic(s.topic, s.subscribe, peer)
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
@ -152,8 +126,12 @@ proc getOrCreatePeer*(
if peer in p.peers:
return p.peers[peer]
proc getConn(): Future[(Connection, RPCMsg)] {.async.} =
let conn = await p.switch.dial(peer, proto)
return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true))
# create new pubsub peer
let pubSubPeer = newPubSubPeer(peer, p.switch, proto)
let pubSubPeer = newPubSubPeer(peer, getConn, proto)
trace "created new pubsub peer", peerId = $peer
p.peers[peer] = pubSubPeer
@ -163,8 +141,24 @@ proc getOrCreatePeer*(
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
pubsubPeer.connect()
return pubSubPeer
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} =
if topic notin p.topics: return # Not subscribed
for h in p.topics[topic].handler:
trace "triggering handler", topicID = topic
try:
await h(topic, data)
except CancelledError as exc:
raise exc
except CatchableError as exc:
# Handlers should never raise exceptions
warn "Error in topic handler", msg = exc.msg
method handleConn*(p: PubSub,
conn: Connection,
proto: string) {.base, async.} =
@ -184,22 +178,20 @@ method handleConn*(p: PubSub,
await conn.close()
return
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] =
# call pubsub rpc handler
await p.rpcHandler(peer, msgs)
p.rpcHandler(peer, msg)
let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto)
try:
let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto)
if p.topics.len > 0:
discard await p.sendSubs(peer, toSeq(p.topics.keys), true)
peer.handler = handler
await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended", peer = peer.id
trace "pubsub peer handler ended", conn
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception ocurred in pubsub handle", exc = exc.msg
trace "exception ocurred in pubsub handle", exc = exc.msg, conn
finally:
await conn.close()
@ -208,31 +200,22 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
## messages
##
let pubsubPeer = p.getOrCreatePeer(peer, p.codec)
pubsubPeer.outbound = true # flag as outbound
if p.topics.len > 0:
# TODO sendSubs may raise, but doing asyncCheck here causes the exception
# to escape to the poll loop.
# With a bit of luck, it may be harmless to ignore exceptions here -
# some cleanup is eventually done in PubSubPeer.send
asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true)
pubsubPeer.subscribed = true
let peer = p.getOrCreatePeer(peer, p.codec)
peer.outbound = true # flag as outbound
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} =
## unsubscribe from a list of ``topic`` strings
for t in topics:
for i, h in p.topics[t.topic].handler:
if h == t.handler:
p.topics[t.topic].handler.del(i)
p.topics.withValue(t.topic, topic):
topic[].handler.keepIf(proc (x: auto): bool = x != t.handler)
# make sure we delete the topic if
# no more handlers are left
if p.topics[t.topic].handler.len <= 0:
p.topics.del(t.topic)
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
if topic[].handler.len == 0:
# make sure we delete the topic if
# no more handlers are left
p.topics.del(t.topic)
libp2p_pubsub_topics.set(p.topics.len.int64)
proc unsubscribe*(p: PubSub,
topic: string,
@ -263,32 +246,22 @@ method subscribe*(p: PubSub,
p.topics[topic].handler.add(handler)
var sent: seq[Future[bool]]
for peer in toSeq(p.peers.values):
sent.add(p.sendSubs(peer, @[topic], true))
checkFutures(await allFinished(sent))
for _, peer in p.peers:
p.sendSubs(peer, @[topic], true)
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
method publish*(p: PubSub,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.base, async.} =
data: seq[byte]): Future[int] {.base, async.} =
## publish to a ``topic``
if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler:
trace "triggering handler", topicID = topic
try:
await h(topic, data)
except CancelledError as exc:
raise exc
except CatchableError as exc:
# TODO these exceptions are ignored since it's likely that if writes are
# are failing, the underlying connection is already closed - this needs
# more cleanup though
debug "Could not write to pubsub connection", msg = exc.msg
## The return value is the number of neighbours that we attempted to send the
## message to, excluding self. Note that this is an optimistic number of
## attempts - the number of peers that actually receive the message might
## be lower.
if p.triggerSelf:
await handleData(p, topic, data)
return 0

View File

@ -10,8 +10,6 @@
import std/[sequtils, strutils, tables, hashes, sets]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
timedcache,
../../switch,
../../peerid,
../../peerinfo,
../../stream/connection,
@ -28,24 +26,21 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
const
DefaultSendTimeout* = 10.seconds
type
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
GetConn* = proc(): Future[(Connection, RPCMsg)] {.gcsafe.}
PubSubPeer* = ref object of RootObj
switch*: Switch # switch instance to dial peers
getConn*: GetConn # callback to establish a new send connection
codec*: string # the protocol that this peer joined from
sendConn*: Connection
sendConn: Connection # cached send connection
connections*: seq[Connection] # connections to this peer
peerId*: PeerID
handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
subscribed*: bool # are we subscribed to this peer
dialLock: AsyncLock
score*: float64
@ -55,7 +50,7 @@ type
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
chronicles.formatIt(PubSubPeer): $it.peerId
@ -63,9 +58,10 @@ func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref
cast[pointer](p).hash
proc id*(p: PubSubPeer): string =
doAssert(not p.isNil, "nil pubsubpeer")
p.peerId.pretty
func shortLog*(p: PubSubPeer): string =
if p.isNil: "PubSubPeer(nil)"
else: shortLog(p.peerId)
chronicles.formatIt(PubSubPeer): shortLog(it)
proc connected*(p: PubSubPeer): bool =
not p.sendConn.isNil and not
@ -86,63 +82,68 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
obs.onSend(p, msg)
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
logScope:
peer = p.id
debug "starting pubsub read loop for peer", closed = conn.closed
debug "starting pubsub read loop",
conn, peer = p, closed = conn.closed
try:
try:
while not conn.atEof:
trace "waiting for data", closed = conn.closed
trace "waiting for data", conn, peer = p, closed = conn.closed
let data = await conn.readLp(64 * 1024)
let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache:
when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id])
trace "message already received, skipping"
continue
trace "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog
var rmsg = decodeRpcMsg(data)
if rmsg.isErr():
notice "failed to decode msg from peer"
notice "failed to decode msg from peer",
conn, peer = p, closed = conn.closed,
err = rmsg.error()
break
var msg = rmsg.get()
trace "decoded msg from peer", msg = msg.shortLog
trace "decoded msg from peer",
conn, peer = p, closed = conn.closed,
msg = rmsg.get().shortLog
# trigger hooks
p.recvObservers(msg)
p.recvObservers(rmsg.get())
when defined(libp2p_expensive_metrics):
for m in msg.messages:
for m in rmsg.get().messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [p.id, t])
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
await p.handler(p, @[msg])
p.recvdRpcCache.put(digest)
await p.handler(p, rmsg.get())
finally:
debug "exiting pubsub peer read loop"
await conn.close()
if p.sendConn == conn:
p.sendConn = nil
except CancelledError as exc:
raise exc
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
trace "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
finally:
debug "exiting pubsub read loop",
conn, peer = p, closed = conn.closed
proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
# get a cached send connection or create a new one
## get a cached send connection or create a new one - will return nil if
## getting a new connection fails
##
block: # check if there's an existing connection that can be reused
let current = p.sendConn
if not current.isNil:
if not (current.closed() or current.atEof):
# The existing send connection looks like it might work - reuse it
trace "Reusing existing connection", oid = $current.oid
trace "Reusing existing connection", current
return current
# Send connection is set but broken - get rid of it
@ -156,7 +157,8 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
# and later close one of them, other implementations such as rust-libp2p
# become deaf to our messages (potentially due to the clean-up associated
# with closing connections). To prevent this, we use a lock that ensures
# that only a single dial will be performed for each peer.
# that only a single dial will be performed for each peer and send the
# subscription table every time we reconnect.
#
# Nevertheless, this approach is still quite problematic because the gossip
# sends and their respective dials may be started from the mplex read loop.
@ -181,31 +183,36 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
return current
# Grab a new send connection
let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here
let (newConn, handshake) = await p.getConn() # ...and here
if newConn.isNil:
return nil
trace "Caching new send connection", oid = $newConn.oid
p.sendConn = newConn
asyncCheck p.handle(newConn) # start a read loop on the new connection
return newConn
trace "Sending handshake", newConn, handshake = shortLog(handshake)
await newConn.writeLp(encodeRpcMsg(handshake))
trace "Caching new send connection", newConn
p.sendConn = newConn
# Start a read loop on the new connection.
# All the errors are handled inside `handle()` procedure.
asyncSpawn p.handle(newConn)
return newConn
finally:
if p.dialLock.locked:
p.dialLock.release()
proc send*(
p: PubSubPeer,
msg: RPCMsg,
timeout: Duration = DefaultSendTimeout) {.async.} =
proc connectImpl*(p: PubSubPeer) {.async.} =
try:
discard await getSendConn(p)
except CatchableError as exc:
debug "Could not connect to pubsub peer", err = exc.msg
proc connect*(p: PubSubPeer) =
asyncCheck(connectImpl(p))
proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")
logScope:
peer = p.id
rpcMsg = shortLog(msg)
trace "sending msg to peer"
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
# trigger send hooks
var mm = msg # hooks can modify the message
@ -216,55 +223,50 @@ proc send*(
info "empty message, skipping"
return
logScope:
encoded = shortLog(encoded)
let digest = $(sha256.digest(encoded))
if digest in p.sentRpcCache:
trace "message already sent to peer, skipping"
when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return
var conn: Connection
try:
trace "about to send message"
conn = await p.getSendConn()
if conn == nil:
debug "Couldn't get send connection, dropping message"
debug "Couldn't get send connection, dropping message", peer = p
return
trace "sending encoded msgs to peer", connId = $conn.oid
await conn.writeLp(encoded).wait(timeout)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote", connId = $conn.oid
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
await conn.writeLp(encoded)
trace "sent pubsub message to remote", conn
when defined(libp2p_expensive_metrics):
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
# Because we detach the send call from the currently executing task using
# asyncCheck, no exceptions may leak out of it
debug "unable to send to remote", exc = exc.msg, peer = p
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
if not isNil(conn):
await conn.close()
await conn.close() # This will clean up the send connection
raise exc
if exc is CancelledError: # TODO not handled
debug "Send cancelled", peer = p
# We'll ask for a new send connection whenever possible
if p.sendConn == conn:
p.sendConn = nil
proc send*(p: PubSubPeer, msg: RPCMsg) =
asyncCheck sendImpl(p, msg)
proc `$`*(p: PubSubPeer): string =
p.id
$p.peerId
proc newPubSubPeer*(peerId: PeerID,
switch: Switch,
getConn: GetConn,
codec: string): PubSubPeer =
new result
result.switch = switch
result.getConn = getConn
result.codec = codec
result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes)
result.dialLock = newAsyncLock()

View File

@ -28,10 +28,10 @@ declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
func defaultMsgIdProvider*(m: Message): string =
byteutils.toHex(m.seqno) & m.fromPeer.pretty
byteutils.toHex(m.seqno) & $m.fromPeer
proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] =
ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes())
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes())
proc verify*(m: Message, p: PeerID): bool =
if m.signature.len > 0 and m.key.len > 0:
@ -63,6 +63,9 @@ proc init*(
seqno: @(seqno.toBytesBE), # unefficient, fine for now
topicIDs: @[topic])
if sign and peer.publicKey.isSome:
result.signature = sign(result, peer).tryGet()
result.key = peer.publicKey.get().getBytes().tryGet()
if sign:
if peer.keyType != KeyType.HasPrivate:
raise (ref CatchableError)(msg: "Cannot sign message without private key")
result.signature = sign(result, peer.privateKey).tryGet()
result.key = peer.privateKey.getKey().tryGet().getBytes().tryGet()

View File

@ -56,6 +56,11 @@ type
messages*: seq[Message]
control*: Option[ControlMessage]
func withSubs*(
T: type RPCMsg, topics: openArray[string], subscribe: bool): T =
T(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it)))
func shortLog*(s: ControlIHave): auto =
(
topicID: s.topicID.shortLog,
@ -87,7 +92,7 @@ func shortLog*(c: ControlMessage): auto =
func shortLog*(msg: Message): auto =
(
fromPeer: msg.fromPeer,
fromPeer: msg.fromPeer.shortLog,
data: msg.data.shortLog,
seqno: msg.seqno.shortLog,
topicIDs: $msg.topicIDs,

View File

@ -197,7 +197,7 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
trace "decodeMessage: decoding message"
var msg: Message
if ? pb.getField(1, msg.fromPeer):
trace "decodeMessage: read fromPeer", fromPeer = msg.fromPeer.pretty()
trace "decodeMessage: read fromPeer", fromPeer = msg.fromPeer
else:
trace "decodeMessage: fromPeer is missing"
if ? pb.getField(2, msg.data):

View File

@ -7,73 +7,59 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import tables
import chronos, chronicles
import std/[heapqueue, sets]
logScope:
topics = "timedcache"
import chronos/timer
const Timeout* = 10.seconds # default timeout in ms
type
ExpireHandler*[V] = proc(key: string, val: V) {.gcsafe.}
TimedEntry*[V] = object of RootObj
val: V
handler: ExpireHandler[V]
TimedEntry*[K] = ref object of RootObj
key: K
expiresAt: Moment
TimedCache*[V] = ref object of RootObj
cache*: Table[string, TimedEntry[V]]
onExpire*: ExpireHandler[V]
timeout*: Duration
TimedCache*[K] = object of RootObj
expiries: HeapQueue[TimedEntry[K]]
entries: HashSet[K]
timeout: Duration
# TODO: This belong in chronos, temporary left here until chronos is updated
proc addTimer*(at: Duration, cb: CallbackFunc, udata: pointer = nil) =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
addTimer(Moment.fromNow(at), cb, udata)
func `<`*(a, b: TimedEntry): bool =
a.expiresAt < b.expiresAt
proc put*[V](t: TimedCache[V],
key: string,
val: V = "",
timeout: Duration,
handler: ExpireHandler[V] = nil) =
trace "adding entry to timed cache", key = key
t.cache[key] = TimedEntry[V](val: val, handler: handler)
func expire*(t: var TimedCache, now: Moment = Moment.now()) =
while t.expiries.len() > 0 and t.expiries[0].expiresAt < now:
t.entries.excl(t.expiries.pop().key)
addTimer(
timeout,
proc (arg: pointer = nil) {.gcsafe.} =
trace "deleting expired entry from timed cache", key = key
if key in t.cache:
let entry = t.cache[key]
t.cache.del(key)
if not isNil(entry.handler):
entry.handler(key, entry.val)
func del*[K](t: var TimedCache[K], key: K): bool =
# Removes existing key from cache, returning false if it was not present
if not t.entries.missingOrExcl(key):
for i in 0..<t.expiries.len:
if t.expiries[i].key == key:
t.expiries.del(i)
break
true
else:
false
func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# Puts k in cache, returning true if the item was already present and false
# otherwise. If the item was already present, its expiry timer will be
# refreshed.
t.expire(now)
var res = t.del(k) # Refresh existing item
t.entries.incl(k)
t.expiries.push(TimedEntry[K](key: k, expiresAt: now + t.timeout))
res
func contains*[K](t: TimedCache[K], k: K): bool =
k in t.entries
func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T =
T(
expiries: initHeapQueue[TimedEntry[K]](),
entries: initHashSet[K](),
timeout: timeout
)
proc put*[V](t: TimedCache[V],
key: string,
val: V = "",
handler: ExpireHandler[V] = nil) =
t.put(key, val, t.timeout, handler)
proc contains*[V](t: TimedCache[V], key: string): bool =
t.cache.contains(key)
proc del*[V](t: TimedCache[V], key: string) =
trace "deleting entry from timed cache", key = key
t.cache.del(key)
proc get*[V](t: TimedCache[V], key: string): V =
t.cache[key].val
proc `[]`*[V](t: TimedCache[V], key: string): V =
t.get(key)
proc `[]=`*[V](t: TimedCache[V], key: string, val: V): V =
t.put(key, val)
proc newTimedCache*[V](timeout: Duration = Timeout): TimedCache[V] =
new result
result.cache = initTable[string, TimedEntry[V]]()
result.timeout = timeout

View File

@ -7,6 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[oids, strformat]
import chronos
import chronicles
import bearssl
@ -88,6 +89,12 @@ type
# Utility
func shortLog*(conn: NoiseConnection): auto =
if conn.isNil: "NoiseConnection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
chronicles.formatIt(NoiseConnection): shortLog(it)
proc genKeyPair(rng: var BrHmacDrbgContext): KeyPair =
result.privateKey = Curve25519Key.random(rng)
result.publicKey = result.privateKey.public()
@ -392,13 +399,13 @@ method readMessage*(sconn: NoiseConnection): Future[seq[byte]] {.async.} =
var besize: array[2, byte]
await sconn.stream.readExactly(addr besize[0], besize.len)
let size = uint16.fromBytesBE(besize).int # Cannot overflow
trace "receiveEncryptedMessage", size, peer = $sconn
trace "receiveEncryptedMessage", size, sconn
if size > 0:
var buffer = newSeq[byte](size)
await sconn.stream.readExactly(addr buffer[0], buffer.len)
return sconn.readCs.decryptWithAd([], buffer)
else:
trace "Received 0-length message", conn = $sconn
trace "Received 0-length message", sconn
method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.} =
if message.len == 0:
@ -418,14 +425,14 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.
lesize = cipher.len.uint16
besize = lesize.toBytesBE
outbuf = newSeqOfCap[byte](cipher.len + 2)
trace "sendEncryptedMessage", size = lesize, peer = $sconn, left, offset
trace "sendEncryptedMessage", sconn, size = lesize, left, offset
outbuf &= besize
outbuf &= cipher
await sconn.stream.write(outbuf)
sconn.activity = true
method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureConn] {.async.} =
trace "Starting Noise handshake", initiator, peer = $conn
trace "Starting Noise handshake", conn, initiator
# https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages
let
@ -469,7 +476,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
if not remoteSig.verify(verifyPayload, remotePubKey):
raise newException(NoiseHandshakeError, "Noise handshake signature verify failed.")
else:
trace "Remote signature verified", peer = $conn
trace "Remote signature verified", conn
if initiator and not isNil(conn.peerInfo):
let pid = PeerID.init(remotePubKey)
@ -480,7 +487,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
failedKey: PublicKey
discard extractPublicKey(conn.peerInfo.peerId, failedKey)
debug "Noise handshake, peer infos don't match!",
initiator, dealt_peer = $conn.peerInfo.id,
initiator, dealt_peer = conn,
dealt_key = $failedKey, received_peer = $pid,
received_key = $remotePubKey
raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId)

View File

@ -6,7 +6,8 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos, chronicles, oids, stew/endians2, bearssl
import std/[oids, strformat]
import chronos, chronicles, stew/endians2, bearssl
import nimcrypto/[hmac, sha2, sha, hash, rijndael, twofish, bcmode]
import secure,
../../stream/connection,
@ -69,6 +70,12 @@ type
SecioError* = object of CatchableError
func shortLog*(conn: SecioConn): auto =
if conn.isNil: "SecioConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
chronicles.formatIt(SecioConn): shortLog(it)
proc init(mac: var SecureMac, hash: string, key: openarray[byte]) =
if hash == "SHA256":
mac = SecureMac(kind: SecureMacType.Sha256)
@ -184,17 +191,17 @@ proc readRawMessage(conn: Connection): Future[seq[byte]] {.async.} =
trace "Recieved message header", header = lengthBuf.shortLog, length = length
if length > SecioMaxMessageSize: # Verify length before casting!
trace "Received size of message exceed limits", conn = $conn, length = length
trace "Received size of message exceed limits", conn, length = length
raise (ref SecioError)(msg: "Message exceeds maximum length")
if length > 0:
var buf = newSeq[byte](int(length))
await conn.readExactly(addr buf[0], buf.len)
trace "Received message body",
conn = $conn, length = buf.len, buff = buf.shortLog
conn, length = buf.len, buff = buf.shortLog
return buf
trace "Discarding 0-length payload", conn = $conn
trace "Discarding 0-length payload", conn
method readMessage*(sconn: SecioConn): Future[seq[byte]] {.async.} =
## Read message from channel secure connection ``sconn``.
@ -312,12 +319,12 @@ method handshake*(s: Secio, conn: Connection, initiator: bool = false): Future[S
var answer = await transactMessage(conn, request)
if len(answer) == 0:
trace "Proposal exchange failed", conn = $conn
trace "Proposal exchange failed", conn
raise (ref SecioError)(msg: "Proposal exchange failed")
if not decodeProposal(answer, remoteNonce, remoteBytesPubkey, remoteExchanges,
remoteCiphers, remoteHashes):
trace "Remote proposal decoding failed", conn = $conn
trace "Remote proposal decoding failed", conn
raise (ref SecioError)(msg: "Remote proposal decoding failed")
if not remotePubkey.init(remoteBytesPubkey):
@ -354,11 +361,11 @@ method handshake*(s: Secio, conn: Connection, initiator: bool = false): Future[S
var localExchange = createExchange(epubkey, signature.getBytes())
var remoteExchange = await transactMessage(conn, localExchange)
if len(remoteExchange) == 0:
trace "Corpus exchange failed", conn = $conn
trace "Corpus exchange failed", conn
raise (ref SecioError)(msg: "Corpus exchange failed")
if not decodeExchange(remoteExchange, remoteEBytesPubkey, remoteEBytesSig):
trace "Remote exchange decoding failed", conn = $conn
trace "Remote exchange decoding failed", conn
raise (ref SecioError)(msg: "Remote exchange decoding failed")
if not remoteESignature.init(remoteEBytesSig):

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import options
import std/[options, strformat]
import chronos, chronicles, bearssl
import ../protocol,
../../stream/streamseq,
@ -27,12 +27,18 @@ type
stream*: Connection
buf: StreamSeq
proc init*[T: SecureConn](C: type T,
conn: Connection,
peerInfo: PeerInfo,
observedAddr: Multiaddress,
timeout: Duration = DefaultConnectionTimeout): T =
result = C(stream: conn,
func shortLog*(conn: SecureConn): auto =
if conn.isNil: "SecureConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
chronicles.formatIt(SecureConn): shortLog(it)
proc init*(T: type SecureConn,
conn: Connection,
peerInfo: PeerInfo,
observedAddr: Multiaddress,
timeout: Duration = DefaultConnectionTimeout): T =
result = T(stream: conn,
peerInfo: peerInfo,
observedAddr: observedAddr,
closeEvent: conn.closeEvent,
@ -63,10 +69,21 @@ proc handleConn*(s: Secure,
conn: Connection,
initiator: bool): Future[Connection] {.async, gcsafe.} =
var sconn = await s.handshake(conn, initiator)
proc cleanup() {.async.} =
try:
await conn.join()
await sconn.close()
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
discard
except CatchableError as exc:
trace "error cleaning up secure connection", err = exc.msg, sconn
if not isNil(sconn):
conn.join()
.addCallback do(udata: pointer = nil):
asyncCheck sconn.close()
# All the errors are handled inside `cleanup()` procedure.
asyncSpawn cleanup()
return sconn
@ -74,18 +91,18 @@ method init*(s: Secure) {.gcsafe.} =
procCall LPProtocol(s).init()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
trace "handling connection upgrade", proto
trace "handling connection upgrade", proto, conn
try:
# We don't need the result but we
# definitely need to await the handshake
discard await s.handleConn(conn, false)
trace "connection secured"
trace "connection secured", conn
except CancelledError as exc:
warn "securing connection canceled"
warn "securing connection canceled", conn
await conn.close()
raise exc
except CatchableError as exc:
warn "securing connection failed", msg = exc.msg
warn "securing connection failed", err = exc.msg, conn
await conn.close()
s.handler = handle

View File

@ -30,7 +30,7 @@
## will suspend until either the amount of elements in the
## buffer goes below ``maxSize`` or more data becomes available.
import deques, math
import std/[deques, math, strformat]
import chronos, chronicles, metrics
import ../stream/connection
@ -43,7 +43,7 @@ logScope:
topics = "bufferstream"
const
DefaultBufferSize* = 102400
DefaultBufferSize* = 128
const
BufferStreamTrackerName* = "libp2p.bufferstream"
@ -100,6 +100,12 @@ proc newAlreadyPipedError*(): ref CatchableError {.inline.} =
proc newNotWritableError*(): ref CatchableError {.inline.} =
result = newException(NotWritableError, "stream is not writable")
func shortLog*(s: BufferStream): auto =
if s.isNil: "BufferStream(nil)"
elif s.peerInfo.isNil: $s.oid
else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}"
chronicles.formatIt(BufferStream): shortLog(it)
proc requestReadBytes(s: BufferStream): Future[void] =
## create a future that will complete when more
## data becomes available in the read buffer
@ -142,7 +148,7 @@ proc initBufferStream*(s: BufferStream,
await s.writeLock.acquire()
await handler(data)
trace "created bufferstream", oid = $s.oid
trace "created bufferstream", s
proc newBufferStream*(handler: WriteHandler = nil,
size: int = DefaultBufferSize,
@ -206,7 +212,7 @@ proc drainBuffer*(s: BufferStream) {.async.} =
## wait for all data in the buffer to be consumed
##
trace "draining buffer", len = s.len, oid = $s.oid
trace "draining buffer", len = s.len, s
while s.len > 0:
await s.dataReadEvent.wait()
s.dataReadEvent.clear()
@ -296,7 +302,7 @@ method close*(s: BufferStream) {.async, gcsafe.} =
try:
## close the stream and clear the buffer
if not s.isClosed:
trace "closing bufferstream", oid = $s.oid
trace "closing bufferstream", s
s.isEof = true
for r in s.readReqs:
if not(isNil(r)) and not(r.finished()):
@ -306,11 +312,11 @@ method close*(s: BufferStream) {.async, gcsafe.} =
await procCall Connection(s).close()
inc getBufferStreamTracker().closed
trace "bufferstream closed", oid = $s.oid
trace "bufferstream closed", s
else:
trace "attempt to close an already closed bufferstream",
trace = getStackTrace(), oid = $s.oid
trace = getStackTrace(), s
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error closing buffer stream", exc = exc.msg
trace "error closing buffer stream", exc = exc.msg, s

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import oids
import std/[oids, strformat]
import chronos, chronicles
import connection
@ -21,6 +21,12 @@ type
ChronosStream* = ref object of Connection
client: StreamTransport
func shortLog*(conn: ChronosStream): string =
if conn.isNil: "ChronosStream(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
chronicles.formatIt(ChronosStream): shortLog(it)
method initStream*(s: ChronosStream) =
if s.objName.len == 0:
s.objName = "ChronosStream"
@ -88,7 +94,7 @@ method close*(s: ChronosStream) {.async.} =
try:
if not s.isClosed:
trace "shutting down chronos stream", address = $s.client.remoteAddress(),
oid = $s.oid
s
if not s.client.closed():
await s.client.closeWait()
@ -96,4 +102,4 @@ method close*(s: ChronosStream) {.async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error closing chronosstream", exc = exc.msg
trace "error closing chronosstream", exc = exc.msg, s

View File

@ -7,13 +7,13 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import hashes, oids
import std/[hashes, oids, strformat]
import chronicles, chronos, metrics
import lpstream,
../multiaddress,
../peerinfo
export lpstream
export lpstream, peerinfo
logScope:
topics = "connection"
@ -66,6 +66,12 @@ proc setupConnectionTracker(): ConnectionTracker =
result.isLeaked = leakTransport
addTracker(ConnectionTrackerName, result)
func shortLog*(conn: Connection): string =
if conn.isNil: "Connection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
chronicles.formatIt(Connection): shortLog(it)
method initStream*(s: Connection) =
if s.objName.len == 0:
s.objName = "Connection"
@ -77,7 +83,7 @@ method initStream*(s: Connection) =
s.timeoutHandler = proc() {.async.} =
await s.close()
trace "timeout set at", timeout = s.timeout.millis
trace "timeout set at", timeout = s.timeout.millis, s
doAssert(isNil(s.timerTaskFut))
# doAssert(s.timeout > 0.millis)
if s.timeout > 0.millis:
@ -94,10 +100,6 @@ method close*(s: Connection) {.async.} =
await procCall LPStream(s).close()
inc getConnectionTracker().closed
proc `$`*(conn: Connection): string =
if not isNil(conn.peerInfo):
result = conn.peerInfo.id
func hash*(p: Connection): Hash =
cast[pointer](p).hash
@ -110,9 +112,6 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} =
## be reset
##
logScope:
oid = $s.oid
try:
while true:
await sleepAsync(s.timeout)
@ -127,14 +126,14 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} =
break
# reset channel on innactivity timeout
trace "Connection timed out"
trace "Connection timed out", s
if not(isNil(s.timeoutHandler)):
await s.timeoutHandler()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in timeout", exc = exc.msg
trace "exception in timeout", exc = exc.msg, s
proc init*(C: type Connection,
peerInfo: PeerInfo,

View File

@ -30,9 +30,6 @@ import stream/connection,
peerid,
errors
chronicles.formatIt(PeerInfo): $it
chronicles.formatIt(PeerID): $it
logScope:
topics = "switch"
@ -47,7 +44,8 @@ declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrade, "peers failed upgrade")
type
NoPubSubException* = object of CatchableError
UpgradeFailedError* = object of CatchableError
DialFailedError* = object of CatchableError
ConnEventKind* {.pure.} = enum
Connected, # A connection was made and securely upgraded - there may be
@ -101,7 +99,7 @@ proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsa
except CancelledError as exc:
raise exc
except CatchableError as exc: # handlers should not raise!
warn "exception in trigger ConnEvents", exc = exc.msg
warn "exception in trigger ConnEvents", exc = exc.msg, peerId
proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.}
@ -114,13 +112,13 @@ proc isConnected*(s: Switch, peerId: PeerID): bool =
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
if s.secureManagers.len <= 0:
raise newException(CatchableError, "No secure managers registered!")
raise newException(UpgradeFailedError, "No secure managers registered!")
let manager = await s.ms.select(conn, s.secureManagers.mapIt(it.codec))
if manager.len == 0:
raise newException(CatchableError, "Unable to negotiate a secure channel!")
raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!")
trace "securing connection", codec = manager
trace "Securing connection", codec = manager, conn
let secureProtocol = s.secureManagers.filterIt(it.codec == manager)
# ms.select should deal with the correctness of this
@ -136,7 +134,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
let info = await s.identity.identify(conn, conn.peerInfo)
if info.pubKey.isNone and isNil(conn):
raise newException(CatchableError,
raise newException(UpgradeFailedError,
"no public key provided and no existing peer identity found")
if isNil(conn.peerInfo):
@ -154,7 +152,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
if info.protos.len > 0:
conn.peerInfo.protocols = info.protos
trace "identify: identified remote peer", peer = $conn.peerInfo
trace "identified remote peer", conn, peerInfo = shortLog(conn.peerInfo)
proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} =
# new stream for identify
@ -171,14 +169,14 @@ proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} =
proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
## mux incoming connection
trace "muxing connection", peer = $conn
trace "Muxing connection", conn
if s.muxers.len == 0:
warn "no muxers registered, skipping upgrade flow"
warn "no muxers registered, skipping upgrade flow", conn
return
let muxerName = await s.ms.select(conn, toSeq(s.muxers.keys()))
if muxerName.len == 0 or muxerName == "na":
debug "no muxer available, early exit", peer = $conn
debug "no muxer available, early exit", conn
return
# create new muxer for connection
@ -187,16 +185,17 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
# install stream handler
muxer.streamHandler = s.streamHandler
s.connManager.storeOutgoing(muxer.connection)
s.connManager.storeOutgoing(conn)
trace "Storing muxer", conn
s.connManager.storeMuxer(muxer)
trace "found a muxer", name = muxerName, peer = $conn
trace "found a muxer", name = muxerName, conn
# start muxer read loop - the future will complete when loop ends
let handlerFut = muxer.handle()
# store it in muxed connections if we have a peer for it
trace "adding muxer for peer", peer = conn.peerInfo.id
trace "Storing muxer with handler", conn
s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler
return muxer
@ -205,52 +204,53 @@ proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
s.connManager.dropPeer(peerId)
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
logScope:
conn = $conn
oid = $conn.oid
trace "Upgrading outgoing connection", conn
let sconn = await s.secure(conn) # secure the connection
if isNil(sconn):
raise newException(CatchableError,
raise newException(UpgradeFailedError,
"unable to secure connection, stopping upgrade")
if sconn.peerInfo.isNil:
raise newException(CatchableError,
raise newException(UpgradeFailedError,
"current version of nim-libp2p requires that secure protocol negotiates peerid")
trace "upgrading connection"
let muxer = await s.mux(sconn) # mux it if possible
if muxer == nil:
# TODO this might be relaxed in the future
raise newException(CatchableError,
raise newException(UpgradeFailedError,
"a muxer is required for outgoing connections")
await s.identify(muxer)
try:
await s.identify(muxer)
except CatchableError as exc:
# Identify is non-essential, though if it fails, it might indicate that
# the connection was closed already - this will be picked up by the read
# loop
debug "Could not identify connection", err = exc.msg, conn
if isNil(sconn.peerInfo):
await sconn.close()
raise newException(CatchableError,
"unable to identify connection, stopping upgrade")
raise newException(UpgradeFailedError,
"No peerInfo for connection, stopping upgrade")
trace "successfully upgraded outgoing connection", oid = sconn.oid
trace "Upgraded outgoing connection", conn, sconn
return sconn
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection", conn = $conn, oid = $conn.oid
trace "Upgrading incoming connection", conn
let ms = newMultistream()
# secure incoming connections
proc securedHandler (conn: Connection,
proto: string)
{.async, gcsafe, closure.} =
var sconn: Connection
trace "Securing connection", oid = $conn.oid
trace "Securing connection", conn
let secure = s.secureManagers.filterIt(it.codec == proto)[0]
try:
sconn = await secure.secure(conn, false)
var sconn = await secure.secure(conn, false)
if isNil(sconn):
return
@ -272,7 +272,9 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "ending secured handler", err = exc.msg
debug "Exception in secure handler", err = exc.msg, conn
trace "Ending secured handler", conn
if (await ms.select(conn)): # just handshake
# add the secure handlers
@ -286,9 +288,6 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
proc internalConnect(s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress]): Future[Connection] {.async.} =
logScope:
peer = peerId
if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!")
@ -305,27 +304,26 @@ proc internalConnect(s: Switch,
# This connection should already have been removed from the connection
# manager - it's essentially a bug that we end up here - we'll fail
# for now, hoping that this will clean themselves up later...
warn "dead connection in connection manager"
warn "dead connection in connection manager", conn
await conn.close()
raise newException(CatchableError, "Zombie connection encountered")
raise newException(DialFailedError, "Zombie connection encountered")
trace "Reusing existing connection", oid = $conn.oid,
direction = $conn.dir
trace "Reusing existing connection", conn, direction = $conn.dir
return conn
trace "Dialing peer"
trace "Dialing peer", peerId
for t in s.transports: # for each transport
for a in addrs: # for each address
if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a
trace "Dialing address", address = $a, peerId
let dialed = try:
await t.dial(a)
except CancelledError as exc:
trace "dialing canceled", exc = exc.msg
trace "Dialing canceled", exc = exc.msg, peerId
raise exc
except CatchableError as exc:
trace "dialing failed", exc = exc.msg
trace "Dialing failed", exc = exc.msg, peerId
libp2p_failed_dials.inc()
continue # Try the next address
@ -340,7 +338,7 @@ proc internalConnect(s: Switch,
# If we failed to establish the connection through one transport,
# we won't succeeed through another - no use in trying again
await dialed.close()
debug "upgrade failed", exc = exc.msg
debug "Upgrade failed", exc = exc.msg, peerId
if exc isnot CancelledError:
libp2p_failed_upgrade.inc()
raise exc
@ -348,9 +346,7 @@ proc internalConnect(s: Switch,
doAssert not isNil(upgraded), "connection died after upgradeOutgoing"
conn = upgraded
trace "dial successful",
oid = $upgraded.oid,
peerInfo = shortLog(upgraded.peerInfo)
trace "Dial successful", conn, peerInfo = conn.peerInfo
break
finally:
if lock.locked():
@ -359,41 +355,50 @@ proc internalConnect(s: Switch,
if isNil(conn): # None of the addresses connected
raise newException(CatchableError, "Unable to establish outgoing link")
conn.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Disconnected))
if conn.closed():
# This can happen if one of the peer event handlers deems the peer
# unworthy and disconnects it
raise newLPStreamClosedError()
await s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false))
if conn.closed():
# This can happen if one of the peer event handlers deems the peer
# unworthy and disconnects it
raise newException(CatchableError, "Connection closed during handshake")
proc peerCleanup() {.async.} =
try:
await conn.closeEvent.wait()
await s.triggerConnEvent(peerId,
ConnEvent(kind: ConnEventKind.Disconnected))
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in switch peer connect cleanup",
conn
except CatchableError as exc:
trace "Unexpected exception in switch peer connect cleanup",
errMsg = exc.msg, conn
# All the errors are handled inside `cleanup()` procedure.
asyncSpawn peerCleanup()
return conn
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
discard await s.internalConnect(peerId, addrs)
proc negotiateStream(s: Switch, stream: Connection, proto: string): Future[Connection] {.async.} =
trace "Attempting to select remote", proto = proto,
streamOid = $stream.oid,
oid = $stream.oid
proc negotiateStream(s: Switch, conn: Connection, proto: string): Future[Connection] {.async.} =
trace "Negotiating stream", proto = proto, conn
if not await s.ms.select(conn, proto):
await conn.close()
raise newException(DialFailedError, "Unable to select sub-protocol " & proto)
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol" & proto)
return stream
return conn
proc dial*(s: Switch,
peerId: PeerID,
proto: string): Future[Connection] {.async.} =
let stream = await s.connmanager.getMuxedStream(peerId)
if stream.isNil:
raise newException(CatchableError, "Couldn't get muxed stream")
raise newException(DialFailedError, "Couldn't get muxed stream")
return await s.negotiateStream(stream, proto)
@ -415,15 +420,15 @@ proc dial*(s: Switch,
try:
if isNil(stream):
await conn.close()
raise newException(CatchableError, "Couldn't get muxed stream")
raise newException(DialFailedError, "Couldn't get muxed stream")
return await s.negotiateStream(stream, proto)
except CancelledError as exc:
trace "dial canceled"
trace "dial canceled", conn
await cleanup()
raise exc
except CatchableError as exc:
trace "error dialing", exc = exc.msg
trace "Error dialing", exc = exc.msg, conn
await cleanup()
raise exc
@ -439,17 +444,19 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
s.ms.addHandler(proto.codecs, proto)
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
trace "starting switch for peer", peerInfo = shortLog(s.peerInfo)
trace "starting switch for peer", peerInfo = s.peerInfo
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
trace "Incoming connection", conn
try:
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Exception occurred in Switch.start", exc = exc.msg
trace "Exception occurred in incoming handler", exc = exc.msg, conn
finally:
await conn.close()
trace "Connection handler done", conn
var startFuts: seq[Future[void]]
for t in s.transports: # for each transport
@ -459,7 +466,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
s.peerInfo.addrs[i] = t.ma # update peer's address
startFuts.add(server)
debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs
debug "Started libp2p node", peer = s.peerInfo
result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} =
@ -479,28 +486,62 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped"
proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
if muxer.connection.peerInfo.isNil:
let
conn = muxer.connection
if conn.peerInfo.isNil:
warn "This version of nim-libp2p requires secure protocol to negotiate peerid"
await muxer.close()
return
# store incoming connection
s.connManager.storeIncoming(muxer.connection)
s.connManager.storeIncoming(conn)
# store muxer and muxed connection
s.connManager.storeMuxer(muxer)
try:
await s.identify(muxer)
except CatchableError as exc:
# Identify is non-essential, though if it fails, it might indicate that
# the connection was closed already - this will be picked up by the read
# loop
debug "Could not identify connection", err = exc.msg, conn
let peerId = muxer.connection.peerInfo.peerId
muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Disconnected))
try:
let peerId = conn.peerInfo.peerId
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true))
proc peerCleanup() {.async.} =
try:
await muxer.connection.closeEvent.wait()
await s.triggerConnEvent(peerId,
ConnEvent(kind: ConnEventKind.Disconnected))
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
debug "Unexpected cancellation in switch muxer cleanup", conn
except CatchableError as exc:
debug "Unexpected exception in switch muxer cleanup",
err = exc.msg, conn
proc peerStartup() {.async.} =
try:
await s.triggerConnEvent(peerId,
ConnEvent(kind: ConnEventKind.Connected,
incoming: true))
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
debug "Unexpected cancellation in switch muxer startup", conn
except CatchableError as exc:
debug "Unexpected exception in switch muxer startup",
err = exc.msg, conn
# All the errors are handled inside `peerStartup()` procedure.
asyncSpawn peerStartup()
# All the errors are handled inside `peerCleanup()` procedure.
asyncSpawn peerCleanup()
except CancelledError as exc:
await muxer.close()
@ -508,7 +549,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
except CatchableError as exc:
await muxer.close()
libp2p_failed_upgrade.inc()
trace "exception in muxer handler", exc = exc.msg
trace "Exception in muxer handler", exc = exc.msg, conn
proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport],
@ -529,17 +570,17 @@ proc newSwitch*(peerInfo: PeerInfo,
)
let s = result # can't capture result
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
result.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises
trace "Incoming muxed connection", conn
try:
trace "handling connection for", peerInfo = $stream
defer:
if not(isNil(stream)):
await stream.close()
await s.ms.handle(stream) # handle incoming connection
await s.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in stream handler", exc = exc.msg
trace "exception in stream handler", exc = exc.msg, conn
finally:
await conn.close()
trace "Muxed connection done", conn
result.mount(identity)
for key, val in muxers:

View File

@ -76,13 +76,16 @@ proc connHandler*(t: TcpTransport,
if not(isNil(conn)):
await conn.close()
t.clients.keepItIf(it != client)
except CancelledError as exc:
raise exc
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in transport's cleanup"
except CatchableError as exc:
trace "error cleaning up client", exc = exc.msg
t.clients.add(client)
asyncCheck cleanup()
# All the errors are handled inside `cleanup()` procedure.
asyncSpawn cleanup()
result = conn
proc connCb(server: StreamServer,

View File

@ -16,6 +16,13 @@ type
proc noop(data: seq[byte]) {.async, gcsafe.} = discard
proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto =
proc getConn(): Future[(Connection, RPCMsg)] {.async.} =
let conn = await p.switch.dial(peerId, GossipSubCodec)
return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true))
newPubSubPeer(peerId, getConn, GossipSubCodec)
proc randomPeerInfo(): PeerInfo =
PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
@ -50,7 +57,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.gossipsub[topic].incl(peer)
@ -81,7 +88,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
gossipSub.grafted(peer, topic)
gossipSub.peers[peerInfo.peerId] = peer
@ -103,7 +110,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard
let topic = "foobar"
@ -116,7 +123,7 @@ suite "GossipSub internal":
conns &= conn
var peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
gossipSub.gossipsub[topic].incl(peer)
@ -137,7 +144,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard
let topic = "foobar"
@ -152,7 +159,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
gossipSub.fanout[topic].incl(peer)
@ -174,7 +181,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard
let topic1 = "foobar1"
@ -193,7 +200,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
gossipSub.fanout[topic1].incl(peer)
@ -218,7 +225,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard
let topic = "foobar"
@ -234,7 +241,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
if i mod 2 == 0:
@ -249,7 +256,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
gossipSub.gossipsub[topic].incl(peer)
@ -287,7 +294,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard
let topic = "foobar"
@ -300,7 +307,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
if i mod 2 == 0:
@ -334,7 +341,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard
let topic = "foobar"
@ -347,7 +354,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
if i mod 2 == 0:
@ -382,7 +389,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard
let topic = "foobar"
@ -395,7 +402,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
if i mod 2 == 0:

View File

@ -540,9 +540,9 @@ suite "GossipSub":
closureScope:
var dialerNode = dialer
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
if dialerNode.peerInfo.id notin seen:
seen[dialerNode.peerInfo.id] = 0
seen[dialerNode.peerInfo.id].inc
if $dialerNode.peerInfo.peerId notin seen:
seen[$dialerNode.peerInfo.peerId] = 0
seen[$dialerNode.peerInfo.peerId].inc
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
@ -552,8 +552,8 @@ suite "GossipSub":
await allFuturesThrowing(subs).wait(30.seconds)
tryPublish await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)),
toBytes("from node " &
$nodes[1].peerInfo.peerId)),
1.minutes), runs, 5.seconds
await wait(seenFut, 2.minutes)
@ -588,7 +588,7 @@ suite "GossipSub":
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
var seen: Table[string, int]
var seen: Table[PeerID, int]
var subs: seq[Future[void]]
var seenFut = newFuture[void]()
for dialer in nodes:
@ -597,9 +597,9 @@ suite "GossipSub":
var dialerNode = dialer
handler = proc(topic: string, data: seq[byte])
{.async, gcsafe, closure.} =
if dialerNode.peerInfo.id notin seen:
seen[dialerNode.peerInfo.id] = 0
seen[dialerNode.peerInfo.id].inc
if dialerNode.peerInfo.peerId notin seen:
seen[dialerNode.peerInfo.peerId] = 0
seen[dialerNode.peerInfo.peerId].inc
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
@ -609,8 +609,8 @@ suite "GossipSub":
await allFuturesThrowing(subs)
tryPublish await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)),
toBytes("from node " &
$nodes[1].peerInfo.peerId)),
1.minutes), 2, 5.seconds
await wait(seenFut, 5.minutes)

View File

@ -15,14 +15,14 @@ proc randomPeerID(): PeerID =
suite "MCache":
test "put/get":
var mCache = newMCache(3, 5)
var mCache = MCache.init(3, 5)
var msg = Message(fromPeer: randomPeerID(), seqno: "12345".toBytes())
let msgId = defaultMsgIdProvider(msg)
mCache.put(msgId, msg)
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
test "window":
var mCache = newMCache(3, 5)
var mCache = MCache.init(3, 5)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerID(),
@ -43,7 +43,7 @@ suite "MCache":
check mCache.get(id).get().topicIDs[0] == "foo"
test "shift - shift 1 window at a time":
var mCache = newMCache(1, 5)
var mCache = MCache.init(1, 5)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerID(),
@ -73,7 +73,7 @@ suite "MCache":
check mCache.window("baz").len == 0
test "shift - 2 windows at a time":
var mCache = newMCache(1, 5)
var mCache = MCache.init(1, 5)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerID(),

View File

@ -3,4 +3,5 @@
import testfloodsub,
testgossipsub,
testmcache,
testtimedcache,
testmessage

View File

@ -0,0 +1,34 @@
{.used.}
import std/unittest
import chronos/timer
import ../../libp2p/protocols/pubsub/timedcache
suite "TimedCache":
test "put/get":
var cache = TimedCache[int].init(5.seconds)
let now = Moment.now()
check:
not cache.put(1, now)
not cache.put(2, now + 3.seconds)
check:
1 in cache
2 in cache
check: not cache.put(3, now + 6.seconds) # expires 1
check:
1 notin cache
2 in cache
3 in cache
check:
cache.put(2, now + 7.seconds) # refreshes 2
not cache.put(4, now + 12.seconds) # expires 3
check:
2 in cache
3 notin cache
4 in cache

View File

@ -200,10 +200,10 @@ suite "Peer testing suite":
p1 == p2
p1 == p4
p2 == p4
p1.pretty() == PeerIDs[i]
p2.pretty() == PeerIDs[i]
p3.pretty() == PeerIDs[i]
p4.pretty() == PeerIDs[i]
$p1 == PeerIDs[i]
$p2 == PeerIDs[i]
$p3 == PeerIDs[i]
$p4 == PeerIDs[i]
p1.match(seckey) == true
p1.match(pubkey) == true
p1.getBytes() == p2.getBytes()