diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b36343813..6cf1a831b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -88,6 +88,13 @@ steps: [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=2 echo "Found ${ncpu} cores" + if [[ $PLATFORM == "x86" ]]; then + choco --version + choco install --x86 openssl + export PATH="/c/Program Files (x86)/OpenSSL-Win32/bin:${PATH}" + echo "PATH=${PATH}" + fi + # build nim from our own branch - this to avoid the day-to-day churn and # regressions of the fast-paced Nim development while maintaining the # flexibility to apply patches diff --git a/examples/directchat.nim b/examples/directchat.nim index 744409c3a..c83733c14 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -190,7 +190,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = let libp2pFuts = await switch.start() chatProto.started = true - let id = peerInfo.peerId.pretty + let id = $peerInfo.peerId echo "PeerID: " & id echo "listening on: " for a in peerInfo.addrs: diff --git a/libp2p.nimble b/libp2p.nimble index fbdb3f410..33885b133 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -11,7 +11,7 @@ requires "nim >= 1.2.0", "nimcrypto >= 0.4.1", "bearssl >= 0.1.4", "chronicles >= 0.7.2", - "chronos >= 2.3.8", + "chronos >= 2.5.2", "metrics", "secp256k1", "stew >= 0.1.0" diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 8d101146a..b17bbf46a 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -76,11 +76,12 @@ proc contains*(c: ConnManager, muxer: Muxer): bool = return muxer == c.muxed[conn].muxer proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} = - trace "cleaning up muxer for peer" + trace "Cleaning up muxer", m = muxerHolder.muxer await muxerHolder.muxer.close() if not(isNil(muxerHolder.handle)): await muxerHolder.handle # TODO noraises? + trace "Cleaned up muxer", m = muxerHolder.muxer proc delConn(c: ConnManager, conn: Connection) = let peerId = conn.peerInfo.peerId @@ -91,6 +92,8 @@ proc delConn(c: ConnManager, conn: Connection) = c.conns.del(peerId) libp2p_peers.set(c.conns.len.int64) + trace "Removed connection", conn + proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = ## clean connection's resources such as muxers and streams @@ -113,17 +116,24 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = finally: await conn.close() - trace "connection cleaned up", peer = $conn.peerInfo + trace "Connection cleaned up", conn proc onClose(c: ConnManager, conn: Connection) {.async.} = ## connection close even handler ## ## triggers the connections resource cleanup ## - - await conn.join() - trace "triggering connection cleanup", peer = $conn.peerInfo - await c.cleanupConn(conn) + try: + await conn.join() + trace "Connection closed, cleaning up", conn + await c.cleanupConn(conn) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + debug "Unexpected cancellation in connection manager's cleanup", conn + except CatchableError as exc: + debug "Unexpected exception in connection manager's cleanup", + errMsg = exc.msg, conn proc selectConn*(c: ConnManager, peerId: PeerID, @@ -160,7 +170,7 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer = if conn in c.muxed: return c.muxed[conn].muxer else: - debug "no muxer for connection", conn = $conn + debug "no muxer for connection", conn proc storeConn*(c: ConnManager, conn: Connection) = ## store a connection @@ -174,7 +184,7 @@ proc storeConn*(c: ConnManager, conn: Connection) = let peerId = conn.peerInfo.peerId if c.conns.getOrDefault(peerId).len > c.maxConns: - trace "too many connections", peer = $peerId, + debug "too many connections", peer = conn, conns = c.conns.getOrDefault(peerId).len raise newTooManyConnections() @@ -184,11 +194,13 @@ proc storeConn*(c: ConnManager, conn: Connection) = c.conns[peerId].incl(conn) - # launch on close listener - asyncCheck c.onClose(conn) + # Launch on close listener + # All the errors are handled inside `onClose()` procedure. + asyncSpawn c.onClose(conn) libp2p_peers.set(c.conns.len.int64) - trace "stored connection", connections = c.conns.len, peer = peerId + trace "Stored connection", + connections = c.conns.len, conn, direction = $conn.dir proc storeOutgoing*(c: ConnManager, conn: Connection) = conn.dir = Direction.Out @@ -214,7 +226,7 @@ proc storeMuxer*(c: ConnManager, muxer: muxer, handle: handle) - trace "stored muxer", connections = c.conns.len + trace "Stored muxer", connections = c.conns.len, muxer proc getMuxedStream*(c: ConnManager, peerId: PeerID, @@ -248,8 +260,10 @@ proc getMuxedStream*(c: ConnManager, proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} = ## drop connections and cleanup resources for peer ## + trace "Dropping peer", peerId let conns = c.conns.getOrDefault(peerId) for conn in conns: + trace "Removing connection", conn delConn(c, conn) var muxers: seq[MuxerHolder] @@ -263,6 +277,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} = for conn in conns: await conn.close() + trace "Dropped peer", peerId proc close*(c: ConnManager) {.async.} = ## cleanup resources for the connection diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index e610fd384..8aee77875 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -1304,20 +1304,3 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string, except Exception as exc: await api.closeConnection(transp) raise exc - -proc `$`*(pinfo: PeerInfo): string = - ## Get string representation of ``PeerInfo`` object. - result = newStringOfCap(128) - result.add("{PeerID: '") - result.add($pinfo.peer.pretty()) - result.add("' Addresses: [") - let length = len(pinfo.addresses) - for i in 0.. 0: - result = result diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 3b7ecbe4d..558e266e0 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -31,10 +31,10 @@ proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType = proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} = let header = await conn.readVarint() - trace "read header varint", varint = header + trace "read header varint", varint = header, conn let data = await conn.readLp(MaxMsgSize) - trace "read data", dataLen = data.len, data = shortLog(data) + trace "read data", dataLen = data.len, data = shortLog(data), conn let msgType = header and 0x7 if msgType.int > ord(MessageType.ResetOut): @@ -46,7 +46,7 @@ proc writeMsg*(conn: Connection, id: uint64, msgType: MessageType, data: seq[byte] = @[]) {.async, gcsafe.} = - trace "sending data over mplex", oid = $conn.oid, + trace "sending data over mplex", conn, id, msgType, data = data.len diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 48eadb97d..54c2a8fe1 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import oids, deques +import std/[oids, strformat] import chronos, chronicles, metrics import types, coder, @@ -66,87 +66,60 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped = if not(isNil(lock)) and lock.locked: lock.release() -template withEOFExceptions(body: untyped): untyped = - try: - body - except CancelledError as exc: - raise exc - except LPStreamEOFError as exc: - trace "muxed connection EOF", exc = exc.msg - except LPStreamClosedError as exc: - trace "muxed connection closed", exc = exc.msg - except LPStreamIncompleteError as exc: - trace "incomplete message", exc = exc.msg +func shortLog*(s: LPChannel): auto = + if s.isNil: "LPChannel(nil)" + elif s.conn.peerInfo.isNil: $s.oid + elif s.name != $s.oid: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}" + else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}" +chronicles.formatIt(LPChannel): shortLog(it) proc closeMessage(s: LPChannel) {.async.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - ## send close message - this will not raise ## on EOF or Closed withWriteLock(s.writeLock): - trace "sending close message" + trace "sending close message", s await s.conn.writeMsg(s.id, s.closeCode) # write close proc resetMessage(s: LPChannel) {.async.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - ## send reset message - this will not raise - withEOFExceptions: + try: withWriteLock(s.writeLock): - trace "sending reset message" - + trace "sending reset message", s await s.conn.writeMsg(s.id, s.resetCode) # write reset + except CancelledError: + # This procedure is called from one place and never awaited, so there no + # need to re-raise CancelledError. + debug "Unexpected cancellation while resetting channel", s + except LPStreamEOFError as exc: + trace "muxed connection EOF", exc = exc.msg, s + except LPStreamClosedError as exc: + trace "muxed connection closed", exc = exc.msg, s + except LPStreamIncompleteError as exc: + trace "incomplete message", exc = exc.msg, s + except CatchableError as exc: + debug "Unhandled exception leak", exc = exc.msg, s proc open*(s: LPChannel) {.async, gcsafe.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - - ## NOTE: Don't call withExcAndLock or withWriteLock, - ## because this already gets called from writeHandler - ## which is locked await s.conn.writeMsg(s.id, MessageType.New, s.name) - trace "opened channel" + trace "opened channel", s s.isOpen = true proc closeRemote*(s: LPChannel) {.async.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - - trace "got EOF, closing channel" + trace "closing remote", s try: await s.drainBuffer() s.isEof = true # set EOF immediately to prevent further reads # close parent bufferstream to prevent further reads await procCall BufferStream(s).close() - trace "channel closed on EOF" + trace "channel closed on EOF", s except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception closing remote channel", exc = exc.msg + trace "exception closing remote channel", exc = exc.msg, s + + trace "closed remote", s method closed*(s: LPChannel): bool = ## this emulates half-closed behavior @@ -156,24 +129,13 @@ method closed*(s: LPChannel): bool = s.closedLocal method reset*(s: LPChannel) {.base, async, gcsafe.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - if s.closedLocal and s.isEof: - trace "channel already closed or reset" + trace "channel already closed or reset", s return - trace "resetting channel" + trace "resetting channel", s - # we asyncCheck here because the other end - # might be dead already - reset is always - # optimistic - asyncCheck s.resetMessage() + asyncSpawn s.resetMessage() try: # drain the buffer before closing @@ -186,48 +148,43 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in reset", exc = exc.msg + trace "exception in reset", exc = exc.msg, s - trace "channel reset" + trace "channel reset", s method close*(s: LPChannel) {.async, gcsafe.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - if s.closedLocal: - trace "channel already closed" + trace "channel already closed", s return - trace "closing local lpchannel" + trace "closing local lpchannel", s proc closeInternal() {.async.} = try: await s.closeMessage().wait(2.minutes) if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() - except CancelledError as exc: + except CancelledError: + trace "Unexpected cancellation while closing channel", s await s.reset() - raise exc + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. except CatchableError as exc: - trace "exception closing channel", exc = exc.msg + trace "exception closing channel", exc = exc.msg, s await s.reset() - trace "lpchannel closed local" + trace "lpchannel closed local", s s.closedLocal = true - asyncCheck closeInternal() + # All the errors are handled inside `closeInternal()` procedure. + asyncSpawn closeInternal() method initStream*(s: LPChannel) = if s.objName.len == 0: s.objName = "LPChannel" s.timeoutHandler = proc() {.async, gcsafe.} = - trace "idle timeout expired, resetting LPChannel" + trace "idle timeout expired, resetting LPChannel", s await s.reset() procCall BufferStream(s).initStream() @@ -254,34 +211,26 @@ proc init*( resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, dir: if initiator: Direction.Out else: Direction.In) - logScope: - id = chann.id - initiator = chann.initiator - name = chann.name - oid = $chann.oid - peer = $chann.conn.peerInfo - # stack = getStackTrace() - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = try: if chann.isLazy and not(chann.isOpen): await chann.open() # writes should happen in sequence - trace "sending data", len = data.len + trace "sending data", len = data.len, conn, chann await conn.writeMsg(chann.id, chann.msgCode, data) except CatchableError as exc: - trace "exception in lpchannel write handler", exc = exc.msg - await chann.reset() + trace "exception in lpchannel write handler", exc = exc.msg, chann + asyncSpawn conn.close() raise exc chann.initBufferStream(writeHandler, size) when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid - trace "created new lpchannel" + trace "created new lpchannel", chann return chann diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 5915f0042..71d24c0be 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -32,6 +32,7 @@ when defined(libp2p_expensive_metrics): type TooManyChannels* = object of CatchableError + InvalidChannelIdError* = object of CatchableError Mplex* = ref object of Muxer channels: array[bool, Table[uint64, LPChannel]] @@ -42,20 +43,34 @@ type oid*: Oid maxChannCount: int +func shortLog*(m: MPlex): auto = shortLog(m.connection) +chronicles.formatIt(Mplex): shortLog(it) + proc newTooManyChannels(): ref TooManyChannels = newException(TooManyChannels, "max allowed channel count exceeded") +proc newInvalidChannelIdError(): ref InvalidChannelIdError = + newException(InvalidChannelIdError, "max allowed channel count exceeded") + proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = ## remove the local channel from the internal tables ## - await chann.join() - m.channels[chann.initiator].del(chann.id) - trace "cleaned up channel", id = chann.id, oid = $chann.oid + try: + await chann.join() + m.channels[chann.initiator].del(chann.id) + debug "cleaned up channel", m, chann - when defined(libp2p_expensive_metrics): - libp2p_mplex_channels.set( - m.channels[chann.initiator].len.int64, - labelValues = [$chann.initiator, $m.connection.peerInfo]) + when defined(libp2p_expensive_metrics): + libp2p_mplex_channels.set( + m.channels[chann.initiator].len.int64, + labelValues = [$chann.initiator, $m.connection.peerInfo.peerId]) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + debug "Unexpected cancellation in mplex channel cleanup", + m, chann + except CatchableError as exc: + debug "error cleaning up mplex channel", exc = exc.msg, m, chann proc newStreamInternal*(m: Mplex, initiator: bool = true, @@ -70,10 +85,9 @@ proc newStreamInternal*(m: Mplex, m.currentId.inc(); m.currentId else: chanId - trace "creating new channel", channelId = id, - initiator = initiator, - name = name, - oid = $m.oid + if id in m.channels[initiator]: + raise newInvalidChannelIdError() + result = LPChannel.init( id, m.connection, @@ -85,44 +99,40 @@ proc newStreamInternal*(m: Mplex, result.peerInfo = m.connection.peerInfo result.observedAddr = m.connection.observedAddr - doAssert(id notin m.channels[initiator], - "channel slot already taken!") + trace "Creating new channel", id, initiator, name, m, channel = result m.channels[initiator][id] = result - asyncCheck m.cleanupChann(result) + # All the errors are handled inside `cleanupChann()` procedure. + asyncSpawn m.cleanupChann(result) when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( m.channels[initiator].len.int64, - labelValues = [$initiator, $m.connection.peerInfo]) + labelValues = [$initiator, $m.connection.peerInfo.peerId]) proc handleStream(m: Mplex, chann: LPChannel) {.async.} = ## call the muxer stream handler for this channel ## try: await m.streamHandler(chann) - trace "finished handling stream" + trace "finished handling stream", m, chann doAssert(chann.closed, "connection not closed by handler!") - except CancelledError as exc: - trace "cancelling stream handler", exc = exc.msg + except CancelledError: + trace "Unexpected cancellation in stream handler", m, chann await chann.reset() - raise exc + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg + trace "Exception in mplex stream handler", + exc = exc.msg, m, chann await chann.reset() method handle*(m: Mplex) {.async, gcsafe.} = - logScope: moid = $m.oid - - trace "starting mplex main loop" + trace "Starting mplex main loop", m try: - defer: - trace "stopping mplex main loop" - await m.close() - while not m.connection.atEof: - trace "waiting for data" + trace "waiting for data", m let (id, msgType, data) = await m.connection.readMsg() initiator = bool(ord(msgType) and 1) @@ -133,57 +143,57 @@ method handle*(m: Mplex) {.async, gcsafe.} = msgType = msgType size = data.len - trace "read message from connection", data = data.shortLog + trace "read message from connection", m, data = data.shortLog var channel = if MessageType(msgType) != MessageType.New: let tmp = m.channels[initiator].getOrDefault(id, nil) if tmp == nil: - trace "Channel not found, skipping" + trace "Channel not found, skipping", m continue tmp else: if m.channels[false].len > m.maxChannCount - 1: - warn "too many channels created by remote peer", allowedMax = MaxChannelCount + warn "too many channels created by remote peer", + allowedMax = MaxChannelCount, m raise newTooManyChannels() let name = string.fromBytes(data) m.newStreamInternal(false, id, name, timeout = m.outChannTimeout) - logScope: - name = channel.name - oid = $channel.oid - case msgType: of MessageType.New: - trace "created channel" + trace "created channel", m, channel if not isNil(m.streamHandler): - # launch handler task - asyncCheck m.handleStream(channel) + # Launch handler task + # All the errors are handled inside `handleStream()` procedure. + asyncSpawn m.handleStream(channel) of MessageType.MsgIn, MessageType.MsgOut: if data.len > MaxMsgSize: - warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize + warn "attempting to send a packet larger than allowed", + allowed = MaxMsgSize, channel raise newLPStreamLimitError() - trace "pushing data to channel" + trace "pushing data to channel", m, channel await channel.pushTo(data) - trace "pushed data to channel" + trace "pushed data to channel", m, channel of MessageType.CloseIn, MessageType.CloseOut: - trace "closing channel" await channel.closeRemote() - trace "closed channel" of MessageType.ResetIn, MessageType.ResetOut: - trace "resetting channel" await channel.reset() - trace "reset channel" - except CancelledError as exc: - raise exc + except CancelledError: + # This procedure is spawned as task and it is not part of public API, so + # there no way for this procedure to be cancelled implicitely. + debug "Unexpected cancellation in mplex handler", m except CatchableError as exc: - trace "Exception occurred", exception = exc.msg, oid = $m.oid + trace "Exception occurred", exception = exc.msg, m + finally: + trace "stopping mplex main loop", m + await m.close() proc init*(M: type Mplex, conn: Connection, @@ -210,7 +220,7 @@ method close*(m: Mplex) {.async, gcsafe.} = if m.isClosed: return - trace "closing mplex muxer", moid = $m.oid + trace "closing mplex muxer", m m.isClosed = true diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index baa76a22d..870a80a79 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -35,6 +35,9 @@ type streamHandler*: StreamHandler # triggered every time there is a new stream, called for any muxer instance muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created +func shortLog*(m: Muxer): auto = shortLog(m.connection) +chronicles.formatIt(Muxer): shortLog(it) + # muxer interface method newStream*(m: Muxer, name: string = "", lazy: bool = false): Future[Connection] {.base, async, gcsafe.} = discard @@ -49,7 +52,7 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider method init(c: MuxerProvider) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - trace "starting muxer handler", proto=proto, peer = $conn + trace "starting muxer handler", proto=proto, conn try: let muxer = c.newMuxer(conn) @@ -68,11 +71,8 @@ method init(c: MuxerProvider) = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in muxer handler", exc = exc.msg, peer = $conn, proto=proto + trace "exception in muxer handler", exc = exc.msg, conn, proto finally: await conn.close() c.handler = handler - -proc `$`*(m: Muxer): string = - $m.connection diff --git a/libp2p/peerid.nim b/libp2p/peerid.nim index 9eeb46791..cd01ec149 100644 --- a/libp2p/peerid.nim +++ b/libp2p/peerid.nim @@ -11,11 +11,13 @@ {.push raises: [Defect].} -import hashes +import std/hashes +import chronicles import nimcrypto/utils, stew/base58 import crypto/crypto, multicodec, multihash, vbuffer import protobuf/minprotobuf import stew/results + export results const @@ -27,11 +29,24 @@ type PeerID* = object data*: seq[byte] - PeerIDError* = object of CatchableError - -proc pretty*(pid: PeerID): string {.inline.} = +func `$`*(pid: PeerID): string = ## Return base58 encoded ``pid`` representation. - result = Base58.encode(pid.data) + Base58.encode(pid.data) + +func shortLog*(pid: PeerId): string = + ## Returns compact string representation of ``pid``. + var spid = $pid + if len(spid) <= 10: + result = spid + else: + result = newStringOfCap(10) + for i in 0..<2: + result.add(spid[i]) + result.add("*") + for i in (len(spid) - 6)..spid.high: + result.add(spid[i]) + +chronicles.formatIt(PeerID): shortLog(it) proc toBytes*(pid: PeerID, data: var openarray[byte]): int = ## Store PeerID ``pid`` to array of bytes ``data``. @@ -112,19 +127,6 @@ proc extractPublicKey*(pid: PeerID, pubkey: var PublicKey): bool = let length = len(mh.data.buffer) result = pubkey.init(mh.data.buffer.toOpenArray(mh.dpos, length - 1)) -proc `$`*(pid: PeerID): string = - ## Returns compact string representation of ``pid``. - var spid = pid.pretty() - if len(spid) <= 10: - result = spid - else: - result = newStringOfCap(10) - for i in 0..<2: - result.add(spid[i]) - result.add("*") - for i in (len(spid) - 6)..spid.high: - result.add(spid[i]) - proc init*(pid: var PeerID, data: openarray[byte]): bool = ## Initialize peer id from raw binary representation ``data``. ## diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index f43905696..3fac63a3b 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -41,20 +41,15 @@ type of HasPublic: key: Option[PublicKey] -proc id*(p: PeerInfo): string = - if not(isNil(p)): - return p.peerId.pretty() - -proc `$`*(p: PeerInfo): string = p.id - -proc shortLog*(p: PeerInfo): auto = +func shortLog*(p: PeerInfo): auto = ( - id: p.id(), + peerId: $p.peerId, addrs: mapIt(p.addrs, $it), protocols: mapIt(p.protocols, $it), protoVersion: p.protoVersion, agentVersion: p.agentVersion, ) +chronicles.formatIt(PeerInfo): shortLog(it) template postInit(peerinfo: PeerInfo, addrs: openarray[MultiAddress], diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 611469a66..3c7ce2381 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -106,16 +106,16 @@ method init*(p: Identify) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = try: defer: - trace "exiting identify handler", oid = $conn.oid + trace "exiting identify handler", conn await conn.close() - trace "handling identify request", oid = $conn.oid + trace "handling identify request", conn var pb = encodeMsg(p.peerInfo, conn.observedAddr) await conn.writeLp(pb.buffer) except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in identify handler", exc = exc.msg + trace "exception in identify handler", exc = exc.msg, conn p.handler = handle p.codec = IdentifyCodec @@ -123,10 +123,10 @@ method init*(p: Identify) = proc identify*(p: Identify, conn: Connection, remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = - trace "initiating identify", peer = $conn + trace "initiating identify", conn var message = await conn.readLp(64*1024) if len(message) == 0: - trace "identify: Empty message received!" + trace "identify: Empty message received!", conn raise newException(IdentityInvalidMsgError, "Empty message received!") let infoOpt = decodeMsg(message) @@ -144,8 +144,8 @@ proc identify*(p: Identify, # have in most cases if peer.get() != remotePeerInfo.peerId: trace "Peer ids don't match", - remote = peer.get().pretty(), - local = remotePeerInfo.id + remote = peer, + local = remotePeerInfo.peerId raise newException(IdentityNoMatchError, "Peer ids don't match") diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 382db1fd0..11c480251 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,16 +7,17 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import sequtils, tables, sets, strutils +import std/[sequtils, sets, tables] import chronos, chronicles, metrics -import pubsub, - pubsubpeer, - timedcache, - peertable, - rpc/[messages, message], +import ./pubsub, + ./pubsubpeer, + ./timedcache, + ./peertable, + ./rpc/[message, messages], ../../stream/connection, ../../peerid, - ../../peerinfo + ../../peerinfo, + ../../utility logScope: topics = "floodsub" @@ -38,75 +39,57 @@ method subscribeTopic*(f: FloodSub, f.floodsub[topic] = initHashSet[PubSubPeer]() if subscribe: - trace "adding subscription for topic", peer = peer.id, name = topic + trace "adding subscription for topic", peer, topic # subscribe the peer to the topic f.floodsub[topic].incl(peer) else: - trace "removing subscription for topic", peer = peer.id, name = topic + trace "removing subscription for topic", peer, topic # unsubscribe the peer from the topic f.floodsub[topic].excl(peer) method unsubscribePeer*(f: FloodSub, peer: PeerID) = ## handle peer disconnects ## - - trace "unsubscribing floodsub peer", peer = $peer + trace "unsubscribing floodsub peer", peer let pubSubPeer = f.peers.getOrDefault(peer) if pubSubPeer.isNil: return - for t in toSeq(f.floodsub.keys): - if t in f.floodsub: - f.floodsub[t].excl(pubSubPeer) + for _, v in f.floodsub.mpairs(): + v.excl(pubSubPeer) procCall PubSub(f).unsubscribePeer(peer) method rpcHandler*(f: FloodSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async.} = - await procCall PubSub(f).rpcHandler(peer, rpcMsgs) + rpcMsg: RPCMsg) {.async.} = + await procCall PubSub(f).rpcHandler(peer, rpcMsg) - for m in rpcMsgs: # for all RPC messages - if m.messages.len > 0: # if there are any messages - var toSendPeers = initHashSet[PubSubPeer]() - for msg in m.messages: # for every message - let msgId = f.msgIdProvider(msg) - logScope: msgId + for msg in rpcMsg.messages: # for every message + let msgId = f.msgIdProvider(msg) - if msgId notin f.seen: - f.seen.put(msgId) # add the message to the seen cache + if f.seen.put(msgId): + trace "Dropping already-seen message", msgId, peer + continue - if f.verifySignature and not msg.verify(peer.peerId): - trace "dropping message due to failed signature verification" - continue + if f.verifySignature and not msg.verify(peer.peerId): + debug "Dropping message due to failed signature verification", msgId, peer + continue - if not (await f.validate(msg)): - trace "dropping message due to failed validation" - continue + if not (await f.validate(msg)): + trace "Dropping message due to failed validation", msgId, peer + continue - for t in msg.topicIDs: # for every topic in the message - if t in f.floodsub: - toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic - if t in f.topics: # check that we're subscribed to it - for h in f.topics[t].handler: - trace "calling handler for message", topicId = t, - localPeer = f.peerInfo.id, - fromPeer = msg.fromPeer.pretty + var toSendPeers = initHashSet[PubSubPeer]() + for t in msg.topicIDs: # for every topic in the message + f.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) - try: - await h(t, msg.data) # trigger user provided handler - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in message handler", exc = exc.msg + await handleData(f, t, msg.data) - # forward the message to all peers interested in it - let published = await f.broadcast( - toSeq(toSendPeers), - RPCMsg(messages: m.messages), - DefaultSendTimeout) - - trace "forwared message to peers", peers = published + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) + trace "Forwared message to peers", peers = toSendPeers.len method init*(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -114,59 +97,74 @@ method init*(f: FloodSub) = ## connection for a protocol string ## e.g. ``/floodsub/1.0.0``, etc... ## - - await f.handleConn(conn, proto) + try: + await f.handleConn(conn, proto) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in floodsub handler", conn + except CatchableError as exc: + trace "FloodSub handler leaks an error", exc = exc.msg, conn f.handler = handler f.codec = FloodSubCodec method publish*(f: FloodSub, topic: string, - data: seq[byte], - timeout: Duration = InfiniteDuration): Future[int] {.async.} = + data: seq[byte]): Future[int] {.async.} = # base returns always 0 - discard await procCall PubSub(f).publish(topic, data, timeout) + discard await procCall PubSub(f).publish(topic, data) - if data.len <= 0 or topic.len <= 0: - trace "topic or data missing, skipping publish" - return + trace "Publishing message on topic", data = data.shortLog, topic - if topic notin f.floodsub: - trace "missing peers for topic, skipping publish" - return + if topic.len <= 0: # data could be 0/empty + debug "Empty topic, skipping publish", topic + return 0 + + let peers = toSeq(f.floodsub.getOrDefault(topic)) + + if peers.len == 0: + debug "No peers for topic, skipping publish", topic + return 0 - trace "publishing on topic", name = topic inc f.msgSeqno - let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) + let + msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) + msgId = f.msgIdProvider(msg) - # start the future but do not wait yet - let published = await f.broadcast( - toSeq(f.floodsub.getOrDefault(topic)), - RPCMsg(messages: @[msg]), - timeout) + trace "Created new message", + msg = shortLog(msg), peers = peers.len, topic, msgId + + if f.seen.put(msgId): + # custom msgid providers might cause this + trace "Dropping already-seen message", msgId, topic + return 0 + + # Try to send to all peers that are known to be interested + f.broadcast(peers, RPCMsg(messages: @[msg])) when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = published, - msg = msg.shortLog() - return published + trace "Published message to peers", msgId, topic + + return peers.len method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(f).unsubscribe(topics) for p in f.peers.values: - discard await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) + f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = await procCall PubSub(f).unsubscribeAll(topic) for p in f.peers.values: - discard await f.sendSubs(p, @[topic], false) + f.sendSubs(p, @[topic], false) method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.floodsub = initTable[string, HashSet[PubSubPeer]]() - f.seen = newTimedCache[string](2.minutes) + f.seen = TimedCache[string].init(2.minutes) f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 949faeaae..c3acbf970 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -9,18 +9,17 @@ import std/[tables, sets, options, sequtils, random, algorithm] import chronos, chronicles, metrics -import pubsub, - floodsub, - pubsubpeer, - peertable, - mcache, - timedcache, - rpc/[messages, message], +import ./pubsub, + ./floodsub, + ./pubsubpeer, + ./peertable, + ./mcache, + ./timedcache, + ./rpc/[messages, message], ../protocol, - ../../peerinfo, ../../stream/connection, + ../../peerinfo, ../../peerid, - ../../errors, ../../utility import stew/results export results @@ -141,7 +140,6 @@ type mcache*: MCache # messages cache heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatRunning: bool - heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats peerStats: Table[PubSubPeer, PeerStats] parameters*: GossipSubParams @@ -268,12 +266,14 @@ method init*(g: GossipSub) = ## connection for a protocol string ## e.g. ``/floodsub/1.0.0``, etc... ## - - # TODO - # if conn.peerInfo.maintain: - # g.explicitPeers.incl(conn.peerInfo.id) - - await g.handleConn(conn, proto) + try: + await g.handleConn(conn, proto) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in gossipsub handler", conn + except CatchableError as exc: + trace "GossipSub handler leaks an error", exc = exc.msg, conn g.handler = handler g.codecs &= GossipSubCodec @@ -346,6 +346,7 @@ proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] = proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic + logScope: topic trace "about to replenish fanout" if g.fanout.peers(topic) < GossipSubDLo: @@ -550,14 +551,14 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # Send changes to peers after table updates to avoid stale state if grafts.len > 0: let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - discard await g.broadcast(grafts, graft, DefaultSendTimeout) + g.broadcast(grafts, graft) if prunes.len > 0: let prune = RPCMsg(control: some(ControlMessage( prune: @[ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - discard await g.broadcast(prunes, prune, DefaultSendTimeout) + g.broadcast(prunes, prune) proc dropFanoutPeers(g: GossipSub) = # drop peers that we haven't published to in @@ -622,11 +623,12 @@ func `/`(a, b: Duration): float64 = fa / fb proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = - if peer.sendConn == nil: + if peer.connections.len == 0: 0.0 else: let - address = peer.sendConn.observedAddr + # TODO, we are just using the first connections for now + address = peer.connections[0].observedAddr ipPeers = g.peersInIP.getOrDefault(address) len = ipPeers.len.float64 if len > g.parameters.ipColocationFactorThreshold: @@ -771,7 +773,7 @@ proc heartbeat(g: GossipSub) {.async.} = topicID: t, peers: g.peerExchangeList(t), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - discard await g.broadcast(prunes, prune, DefaultSendTimeout) + g.broadcast(prunes, prune) await g.rebalanceMesh(t) @@ -782,14 +784,11 @@ proc heartbeat(g: GossipSub) {.async.} = g.replenishFanout(t) let peers = g.getGossipPeers() - var sent: seq[Future[bool]] for peer, control in peers: g.peers.withValue(peer.peerId, pubsubPeer) do: - sent &= g.send( + g.send( pubsubPeer[], - RPCMsg(control: some(control)), - DefaultSendTimeout) - checkFutures(await allFinished(sent)) + RPCMsg(control: some(control))) g.mcache.shift() # shift the cache except CancelledError as exc: @@ -808,14 +807,16 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = ## handle peer disconnects ## - trace "unsubscribing gossipsub peer", peer = $peer + trace "unsubscribing gossipsub peer", peer let pubSubPeer = g.peers.getOrDefault(peer) if pubSubPeer.isNil: + trace "no peer to unsubscribe", peer return # remove from peer IPs collection too - if pubSubPeer.sendConn != nil: - g.peersInIP.withValue(pubSubPeer.sendConn.observedAddr, s) do: + if pubSubPeer.connections.len > 0: + # TODO, we are just using the first connections for now + g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s) do: s[].excl(pubSubPeer) for t in toSeq(g.gossipsub.keys): @@ -858,10 +859,11 @@ method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, peer: PubSubPeer) {.gcsafe.} = - procCall FloodSub(g).subscribeTopic(topic, subscribe, peer) + # Skip floodsub - we don't want it to add the peer to `g.floodsub` + procCall PubSub(g).subscribeTopic(topic, subscribe, peer) logScope: - peer = $peer.id + peer topic g.onNewPeer(peer) @@ -899,7 +901,7 @@ proc handleGraft(g: GossipSub, for graft in grafts: let topic = graft.topicID logScope: - peer = peer.id + peer topic trace "peer grafted topic" @@ -958,7 +960,7 @@ proc handleGraft(g: GossipSub, proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: - trace "peer pruned topic", peer = peer.id, topic = prune.topicID + trace "peer pruned topic", peer, topic = prune.topicID # add peer backoff if prune.backoff > 0: @@ -985,7 +987,7 @@ proc handleIHave(g: GossipSub, dec peer.iHaveBudget for ihave in ihaves: trace "peer sent ihave", - peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs + peer, topic = ihave.topicID, msgs = ihave.messageIDs if ihave.topicID in g.mesh: for m in ihave.messageIDs: @@ -1000,7 +1002,7 @@ proc handleIWant(g: GossipSub, else: for iwant in iwants: for mid in iwant.messageIDs: - trace "peer sent iwant", peer = peer.id, messageID = mid + trace "peer sent iwant", peer, messageID = mid let msg = g.mcache.get(mid) if msg.isSome: # avoid spam @@ -1021,124 +1023,87 @@ proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) = method rpcHandler*(g: GossipSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async.} = - await procCall PubSub(g).rpcHandler(peer, rpcMsgs) + rpcMsg: RPCMsg) {.async.} = + await procCall PubSub(g).rpcHandler(peer, rpcMsg) - var userHandlers: seq[Future[void]] + for msg in rpcMsg.messages: # for every message + let msgId = g.msgIdProvider(msg) - for m in rpcMsgs: # for all RPC messages - if m.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[PubSubPeer] - for msg in m.messages: # for every message - let msgId = g.msgIdProvider(msg) - logScope: msgId + if g.seen.put(msgId): + trace "Dropping already-seen message", msgId, peer - if msgId in g.seen: - trace "message already processed, skipping" + # make sure to update score tho before continuing + for t in msg.topicIDs: # for every topic in the message + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) + # if in mesh add more delivery score + var stats = g.peerStats[peer].topicInfos.getOrDefault(t) + if stats.inMesh: + stats.meshMessageDeliveries += 1 + if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - # make sure to update score tho before continuing - for t in msg.topicIDs: # for every topic in the message - let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - # if in mesh add more delivery score - var stats = g.peerStats[peer].topicInfos.getOrDefault(t) - if stats.inMesh: - stats.meshMessageDeliveries += 1 - if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + # commit back to the table + g.peerStats[peer].topicInfos[t] = stats + + continue - # commit back to the table - g.peerStats[peer].topicInfos[t] = stats - continue + g.mcache.put(msgId, msg) - trace "processing message" + if g.verifySignature and not msg.verify(peer.peerId): + debug "Dropping message due to failed signature verification", msgId, peer + g.punishPeer(peer, msg) + continue - g.seen.put(msgId) # add the message to the seen cache + if not (await g.validate(msg)): + trace "Dropping message due to failed validation", msgId, peer + g.punishPeer(peer, msg) + continue - if g.verifySignature and not msg.verify(peer.peerId): - trace "dropping message due to failed signature verification" - g.punishPeer(peer, msg) - continue + var toSendPeers = initHashSet[PubSubPeer]() + for t in msg.topicIDs: # for every topic in the message + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - if not (await g.validate(msg)): - trace "dropping message due to failed validation", peer - g.punishPeer(peer, msg) - continue + # contribute to peer score first delivery + var stats = g.peerStats[peer].topicInfos.getOrDefault(t) + stats.firstMessageDeliveries += 1 + if stats.firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: + stats.firstMessageDeliveries = topicParams.firstMessageDeliveriesCap - # this shouldn't happen - if g.peerInfo.peerId == msg.fromPeer: - trace "skipping messages from self" - continue + # if in mesh add more delivery score + if stats.inMesh: + stats.meshMessageDeliveries += 1 + if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - for t in msg.topicIDs: # for every topic in the message - let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) + # commit back to the table + g.peerStats[peer].topicInfos[t] = stats - # contribute to peer score first delivery - var stats = g.peerStats[peer].topicInfos.getOrDefault(t) - stats.firstMessageDeliveries += 1 - if stats.firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: - stats.firstMessageDeliveries = topicParams.firstMessageDeliveriesCap + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) - # if in mesh add more delivery score - if stats.inMesh: - stats.meshMessageDeliveries += 1 - if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + await handleData(g, t, msg.data) - # commit back to the table - g.peerStats[peer].topicInfos[t] = stats + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer - if t in g.floodsub: - toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic + if rpcMsg.control.isSome: + let control = rpcMsg.control.get() + g.handlePrune(peer, control.prune) - if t in g.mesh: - toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic - - if t in g.explicit: - toSendPeers.incl(g.explicit[t]) # always forward to explicit peers - - if t in g.topics: # if we're subscribed to the topic - for h in g.topics[t].handler: - trace "calling handler for message", topicId = t, - localPeer = g.peerInfo.id, - fromPeer = msg.fromPeer.pretty - userHandlers &= h(t, msg.data) # enqueue user provided handler - - # forward the message to all peers interested in it - let published = await g.broadcast( - toSeq(toSendPeers), - RPCMsg(messages: m.messages), - DefaultSendTimeout) - - trace "forwared message to peers", peers = published - var respControl: ControlMessage - if m.control.isSome: - let control = m.control.get() - g.handlePrune(peer, control.prune) + respControl.iwant.add(g.handleIHave(peer, control.ihave)) + respControl.prune.add(g.handleGraft(peer, control.graft)) + let messages = g.handleIWant(peer, control.iwant) - respControl.iwant.add(g.handleIHave(peer, control.ihave)) - respControl.prune.add(g.handleGraft(peer, control.graft)) - let messages = g.handleIWant(peer, control.iwant) + if respControl.graft.len > 0 or respControl.prune.len > 0 or + respControl.ihave.len > 0 or messages.len > 0: - if respControl.graft.len > 0 or respControl.prune.len > 0 or - respControl.ihave.len > 0: - try: - info "sending control message", msg = respControl - let sent = await g.send( - peer, - RPCMsg(control: some(respControl), messages: messages), - DefaultSendTimeout) - - if not sent: - g.unsubscribePeer(peer.peerId) - - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception forwarding control messages", exc = exc.msg - - # await user tasks at the very end - checkFutures(await allFinished(userHandlers)); + debug "sending control message", msg = shortLog(respControl), peer + g.send( + peer, + RPCMsg(control: some(respControl), messages: messages)) method subscribe*(g: GossipSub, topic: string, @@ -1157,9 +1122,9 @@ method unsubscribe*(g: GossipSub, for (topic, handler) in topics: # delete from mesh only if no handlers are left - if g.topics[topic].handler.len <= 0: + if topic notin g.topics: if topic in g.mesh: - let peers = g.mesh.getOrDefault(topic) + let peers = g.mesh[topic] g.mesh.del(topic) for peer in peers: g.pruned(peer, topic) @@ -1168,7 +1133,7 @@ method unsubscribe*(g: GossipSub, topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) + g.broadcast(toSeq(peers), prune) method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = await procCall PubSub(g).unsubscribeAll(topic) @@ -1183,17 +1148,19 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) + g.broadcast(toSeq(peers), prune) method publish*(g: GossipSub, topic: string, - data: seq[byte], - timeout: Duration = InfiniteDuration): Future[int] {.async.} = + data: seq[byte]): Future[int] {.async.} = # base returns always 0 - discard await procCall PubSub(g).publish(topic, data, timeout) - trace "publishing message on topic", topic, data = data.shortLog + discard await procCall PubSub(g).publish(topic, data) + + logScope: topic + trace "Publishing message on topic", data = data.shortLog if topic.len <= 0: # data could be 0/empty + debug "Empty topic, skipping publish" return 0 var peers: HashSet[PubSubPeer] @@ -1226,29 +1193,35 @@ method publish*(g: GossipSub, # time g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + if peers.len == 0: + debug "No peers for topic, skipping publish" + return 0 + inc g.msgSeqno let msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) msgId = g.msgIdProvider(msg) - trace "created new message", msg, topic, peers = peers.len + logScope: msgId - if msgId notin g.mcache: - g.mcache.put(msgId, msg) + trace "Created new message", msg = shortLog(msg), peers = peers.len - if peers.len > 0: - let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout) - when defined(libp2p_expensive_metrics): - if published > 0: - libp2p_pubsub_messages_published.inc(labelValues = [topic]) - - trace "published message to peers", peers = published, - msg = msg.shortLog() - return published - else: - debug "No peers for gossip message", topic, msg = msg.shortLog() + if g.seen.put(msgId): + # custom msgid providers might cause this + trace "Dropping already-seen message" return 0 + g.mcache.put(msgId, msg) + + g.broadcast(toSeq(peers), RPCMsg(messages: @[msg])) + when defined(libp2p_expensive_metrics): + if peers.len > 0: + libp2p_pubsub_messages_published.inc(labelValues = [topic]) + + trace "Published message to peers" + + return peers.len + proc maintainDirectPeers(g: GossipSub) {.async.} = while g.heartbeatRunning: for id in g.parameters.directPeers: @@ -1264,29 +1237,28 @@ proc maintainDirectPeers(g: GossipSub) {.async.} = method start*(g: GossipSub) {.async.} = trace "gossipsub start" - ## start pubsub - ## start long running/repeating procedures + if not g.heartbeatFut.isNil: + warn "Starting gossipsub twice" + return - withLock g.heartbeatLock: - # setup the heartbeat interval - g.heartbeatRunning = true - g.heartbeatFut = g.heartbeat() - g.directPeersLoop = g.maintainDirectPeers() + g.heartbeatRunning = true + g.heartbeatFut = g.heartbeat() + g.directPeersLoop = g.maintainDirectPeers() method stop*(g: GossipSub) {.async.} = trace "gossipsub stop" + if g.heartbeatFut.isNil: + warn "Stopping gossipsub without starting it" + return - ## stop pubsub - ## stop long running tasks - - withLock g.heartbeatLock: - # stop heartbeat interval - g.heartbeatRunning = false - if not g.heartbeatFut.finished: - trace "awaiting last heartbeat" - await g.heartbeatFut - await g.directPeersLoop.cancelAndWait() - + # stop heartbeat interval + g.heartbeatRunning = false + g.directPeersLoop.cancel() + if not g.heartbeatFut.finished: + trace "awaiting last heartbeat" + await g.heartbeatFut + trace "heartbeat stopped" + g.heartbeatFut = nil method initPubSub*(g: GossipSub) = procCall FloodSub(g).initPubSub() @@ -1297,11 +1269,10 @@ method initPubSub*(g: GossipSub) = g.parameters.validateParameters().tryGet() randomize() - g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) + g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength) g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.control = initTable[string, ControlMessage]() # pending control messages - g.heartbeatLock = newAsyncLock() diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index 82231f550..172e8bc62 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -7,69 +7,57 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles -import tables, options, sets, sequtils -import rpc/[messages], timedcache +import std/[sets, tables, options] +import rpc/[messages] + +export sets, tables, messages, options type CacheEntry* = object mid*: string - msg*: Message + topicIDs*: seq[string] - MCache* = ref object of RootObj - msgs*: TimedCache[Message] + MCache* = object of RootObj + msgs*: Table[string, Message] history*: seq[seq[CacheEntry]] historySize*: Natural windowSize*: Natural -proc get*(c: MCache, mid: string): Option[Message] = +func get*(c: MCache, mid: string): Option[Message] = result = none(Message) if mid in c.msgs: result = some(c.msgs[mid]) -proc contains*(c: MCache, mid: string): bool = +func contains*(c: MCache, mid: string): bool = c.get(mid).isSome -proc put*(c: MCache, msgId: string, msg: Message) = - proc handler(key: string, val: Message) {.gcsafe.} = - ## make sure we remove the message from history - ## to keep things consisten - c.history.applyIt( - it.filterIt(it.mid != msgId) - ) - +func put*(c: var MCache, msgId: string, msg: Message) = if msgId notin c.msgs: - c.msgs.put(msgId, msg, handler = handler) - c.history[0].add(CacheEntry(mid: msgId, msg: msg)) + c.msgs[msgId] = msg + c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs)) -proc window*(c: MCache, topic: string): HashSet[string] = +func window*(c: MCache, topic: string): HashSet[string] = result = initHashSet[string]() - let len = - if c.windowSize > c.history.len: - c.history.len - else: - c.windowSize + let + len = min(c.windowSize, c.history.len) - if c.history.len > 0: - for slot in c.history[0.. c.historySize: - for entry in c.history.pop(): - c.msgs.del(entry.mid) +func shift*(c: var MCache) = + for entry in c.history.pop(): + c.msgs.del(entry.mid) c.history.insert(@[]) -proc newMCache*(window: Natural, history: Natural): MCache = - new result - result.historySize = history - result.windowSize = window - result.history = newSeq[seq[CacheEntry]]() - result.history.add(@[]) # initialize with empty slot - result.msgs = newTimedCache[Message](2.minutes) +func init*(T: type MCache, window, history: Natural): T = + T( + history: newSeq[seq[CacheEntry]](history), + historySize: history, + windowSize: window + ) diff --git a/libp2p/protocols/pubsub/peertable.nim b/libp2p/protocols/pubsub/peertable.nim index d294c0155..15a1bf5d5 100644 --- a/libp2p/protocols/pubsub/peertable.nim +++ b/libp2p/protocols/pubsub/peertable.nim @@ -14,9 +14,11 @@ type PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool = - let peers = toSeq(t.getOrDefault(topic)) - peers.any do (peer: PubSubPeer) -> bool: - peer.peerId == peerId + if topic in t: + for peer in t[topic]: + if peer.peerId == peerId: + return true + false func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = # returns true if the peer was added, diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 9d68cd275..158b1e2da 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -72,56 +72,34 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = ## trace "unsubscribing pubsub peer", peer = $peerId - if peerId in p.peers: - p.peers.del(peerId) + p.peers.del(peerId) libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*( - p: PubSub, - peer: PubSubPeer, - msg: RPCMsg, - timeout: Duration): Future[bool] {.async.} = - ## send to remote peer +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) = + ## Attempt to send `msg` to remote peer ## trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg) - try: - await peer.send(msg, timeout) - return true - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception sending pubsub message to peer", - peer = $peer, msg = shortLog(msg) - # do not unsub during internal testing (no networking) - when not defined(pubsub_internal_testing): - p.unsubscribePeer(peer.peerId) + peer.send(msg) proc broadcast*( p: PubSub, - sendPeers: seq[PubSubPeer], - msg: RPCMsg, - timeout: Duration): Future[int] {.async.} = - ## send messages and cleanup failed peers - ## + sendPeers: openArray[PubSubPeer], + msg: RPCMsg) = # raises: [Defect] + ## Attempt to send `msg` to the given peers trace "broadcasting messages to peers", - peers = sendPeers.len, message = shortLog(msg) - let sent = await allFinished( - sendPeers.mapIt( p.send(it, msg, timeout) )) - return sent.filterIt( it.finished and it.read ).len + peers = sendPeers.len, msg = shortLog(msg) + for peer in sendPeers: + p.send(peer, msg) proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], - subscribe: bool): Future[bool] = + subscribe: bool) = ## send subscriptions to remote peer - p.send( - peer, - RPCMsg( - subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), - DefaultSendTimeout) + p.send(peer, RPCMsg.withSubs(topics, subscribe)) method subscribeTopic*(p: PubSub, topic: string, @@ -132,16 +110,12 @@ method subscribeTopic*(p: PubSub, method rpcHandler*(p: PubSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async, base.} = + rpcMsg: RPCMsg) {.async, base.} = ## handle rpc messages - trace "processing RPC message", peer = peer.id, msgs = rpcMsgs.len - - for m in rpcMsgs: # for all RPC messages - trace "processing messages", msg = m.shortLog - if m.subscriptions.len > 0: # if there are any subscriptions - for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic - trace "about to subscribe to topic", topicId = s.topic - p.subscribeTopic(s.topic, s.subscribe, peer) + trace "processing RPC message", msg = rpcMsg.shortLog, peer + for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic + trace "about to subscribe to topic", topicId = s.topic, peer + p.subscribeTopic(s.topic, s.subscribe, peer) method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard @@ -152,8 +126,12 @@ proc getOrCreatePeer*( if peer in p.peers: return p.peers[peer] + proc getConn(): Future[(Connection, RPCMsg)] {.async.} = + let conn = await p.switch.dial(peer, proto) + return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true)) + # create new pubsub peer - let pubSubPeer = newPubSubPeer(peer, p.switch, proto) + let pubSubPeer = newPubSubPeer(peer, getConn, proto) trace "created new pubsub peer", peerId = $peer p.peers[peer] = pubSubPeer @@ -163,8 +141,24 @@ proc getOrCreatePeer*( # metrics libp2p_pubsub_peers.set(p.peers.len.int64) + + pubsubPeer.connect() + return pubSubPeer +proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} = + if topic notin p.topics: return # Not subscribed + + for h in p.topics[topic].handler: + trace "triggering handler", topicID = topic + try: + await h(topic, data) + except CancelledError as exc: + raise exc + except CatchableError as exc: + # Handlers should never raise exceptions + warn "Error in topic handler", msg = exc.msg + method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} = @@ -184,22 +178,20 @@ method handleConn*(p: PubSub, await conn.close() return - proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] = # call pubsub rpc handler - await p.rpcHandler(peer, msgs) + p.rpcHandler(peer, msg) + + let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) try: - let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) - if p.topics.len > 0: - discard await p.sendSubs(peer, toSeq(p.topics.keys), true) - peer.handler = handler await peer.handle(conn) # spawn peer read loop - trace "pubsub peer handler ended", peer = peer.id + trace "pubsub peer handler ended", conn except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception ocurred in pubsub handle", exc = exc.msg + trace "exception ocurred in pubsub handle", exc = exc.msg, conn finally: await conn.close() @@ -208,31 +200,22 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## messages ## - let pubsubPeer = p.getOrCreatePeer(peer, p.codec) - pubsubPeer.outbound = true # flag as outbound - if p.topics.len > 0: - # TODO sendSubs may raise, but doing asyncCheck here causes the exception - # to escape to the poll loop. - # With a bit of luck, it may be harmless to ignore exceptions here - - # some cleanup is eventually done in PubSubPeer.send - asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true) - - pubsubPeer.subscribed = true + let peer = p.getOrCreatePeer(peer, p.codec) + peer.outbound = true # flag as outbound method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = ## unsubscribe from a list of ``topic`` strings for t in topics: - for i, h in p.topics[t.topic].handler: - if h == t.handler: - p.topics[t.topic].handler.del(i) + p.topics.withValue(t.topic, topic): + topic[].handler.keepIf(proc (x: auto): bool = x != t.handler) - # make sure we delete the topic if - # no more handlers are left - if p.topics[t.topic].handler.len <= 0: - p.topics.del(t.topic) - # metrics - libp2p_pubsub_topics.set(p.topics.len.int64) + if topic[].handler.len == 0: + # make sure we delete the topic if + # no more handlers are left + p.topics.del(t.topic) + + libp2p_pubsub_topics.set(p.topics.len.int64) proc unsubscribe*(p: PubSub, topic: string, @@ -263,32 +246,22 @@ method subscribe*(p: PubSub, p.topics[topic].handler.add(handler) - var sent: seq[Future[bool]] - for peer in toSeq(p.peers.values): - sent.add(p.sendSubs(peer, @[topic], true)) - - checkFutures(await allFinished(sent)) + for _, peer in p.peers: + p.sendSubs(peer, @[topic], true) # metrics libp2p_pubsub_topics.set(p.topics.len.int64) method publish*(p: PubSub, topic: string, - data: seq[byte], - timeout: Duration = InfiniteDuration): Future[int] {.base, async.} = + data: seq[byte]): Future[int] {.base, async.} = ## publish to a ``topic`` - if p.triggerSelf and topic in p.topics: - for h in p.topics[topic].handler: - trace "triggering handler", topicID = topic - try: - await h(topic, data) - except CancelledError as exc: - raise exc - except CatchableError as exc: - # TODO these exceptions are ignored since it's likely that if writes are - # are failing, the underlying connection is already closed - this needs - # more cleanup though - debug "Could not write to pubsub connection", msg = exc.msg + ## The return value is the number of neighbours that we attempted to send the + ## message to, excluding self. Note that this is an optimistic number of + ## attempts - the number of peers that actually receive the message might + ## be lower. + if p.triggerSelf: + await handleData(p, topic, data) return 0 diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 503e9e8a4..285ed22ff 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -10,8 +10,6 @@ import std/[sequtils, strutils, tables, hashes, sets] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], - timedcache, - ../../switch, ../../peerid, ../../peerinfo, ../../stream/connection, @@ -28,24 +26,21 @@ when defined(libp2p_expensive_metrics): declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) -const - DefaultSendTimeout* = 10.seconds - type PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} + GetConn* = proc(): Future[(Connection, RPCMsg)] {.gcsafe.} + PubSubPeer* = ref object of RootObj - switch*: Switch # switch instance to dial peers + getConn*: GetConn # callback to establish a new send connection codec*: string # the protocol that this peer joined from - sendConn*: Connection + sendConn: Connection # cached send connection + connections*: seq[Connection] # connections to this peer peerId*: PeerID handler*: RPCHandler - sentRpcCache: TimedCache[string] # cache for already sent messages - recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr - subscribed*: bool # are we subscribed to this peer dialLock: AsyncLock score*: float64 @@ -55,7 +50,7 @@ type appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} + RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.} chronicles.formatIt(PubSubPeer): $it.peerId @@ -63,9 +58,10 @@ func hash*(p: PubSubPeer): Hash = # int is either 32/64, so intptr basically, pubsubpeer is a ref cast[pointer](p).hash -proc id*(p: PubSubPeer): string = - doAssert(not p.isNil, "nil pubsubpeer") - p.peerId.pretty +func shortLog*(p: PubSubPeer): string = + if p.isNil: "PubSubPeer(nil)" + else: shortLog(p.peerId) +chronicles.formatIt(PubSubPeer): shortLog(it) proc connected*(p: PubSubPeer): bool = not p.sendConn.isNil and not @@ -86,63 +82,68 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = obs.onSend(p, msg) proc handle*(p: PubSubPeer, conn: Connection) {.async.} = - logScope: - peer = p.id - debug "starting pubsub read loop for peer", closed = conn.closed + debug "starting pubsub read loop", + conn, peer = p, closed = conn.closed try: try: while not conn.atEof: - trace "waiting for data", closed = conn.closed + trace "waiting for data", conn, peer = p, closed = conn.closed + let data = await conn.readLp(64 * 1024) - let digest = $(sha256.digest(data)) - trace "read data from peer", data = data.shortLog - if digest in p.recvdRpcCache: - when defined(libp2p_expensive_metrics): - libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id]) - trace "message already received, skipping" - continue + trace "read data from peer", + conn, peer = p, closed = conn.closed, + data = data.shortLog var rmsg = decodeRpcMsg(data) if rmsg.isErr(): - notice "failed to decode msg from peer" + notice "failed to decode msg from peer", + conn, peer = p, closed = conn.closed, + err = rmsg.error() break - var msg = rmsg.get() - - trace "decoded msg from peer", msg = msg.shortLog + trace "decoded msg from peer", + conn, peer = p, closed = conn.closed, + msg = rmsg.get().shortLog # trigger hooks - p.recvObservers(msg) + p.recvObservers(rmsg.get()) when defined(libp2p_expensive_metrics): - for m in msg.messages: + for m in rmsg.get().messages: for t in m.topicIDs: # metrics - libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) + libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t]) - await p.handler(p, @[msg]) - p.recvdRpcCache.put(digest) + await p.handler(p, rmsg.get()) finally: - debug "exiting pubsub peer read loop" await conn.close() if p.sendConn == conn: p.sendConn = nil - except CancelledError as exc: - raise exc + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in PubSubPeer.handle" except CatchableError as exc: - trace "Exception occurred in PubSubPeer.handle", exc = exc.msg + trace "Exception occurred in PubSubPeer.handle", + conn, peer = p, closed = conn.closed, exc = exc.msg + finally: + debug "exiting pubsub read loop", + conn, peer = p, closed = conn.closed proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = - # get a cached send connection or create a new one + ## get a cached send connection or create a new one - will return nil if + ## getting a new connection fails + ## + block: # check if there's an existing connection that can be reused let current = p.sendConn if not current.isNil: if not (current.closed() or current.atEof): # The existing send connection looks like it might work - reuse it - trace "Reusing existing connection", oid = $current.oid + trace "Reusing existing connection", current return current # Send connection is set but broken - get rid of it @@ -156,7 +157,8 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = # and later close one of them, other implementations such as rust-libp2p # become deaf to our messages (potentially due to the clean-up associated # with closing connections). To prevent this, we use a lock that ensures - # that only a single dial will be performed for each peer. + # that only a single dial will be performed for each peer and send the + # subscription table every time we reconnect. # # Nevertheless, this approach is still quite problematic because the gossip # sends and their respective dials may be started from the mplex read loop. @@ -181,31 +183,36 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = return current # Grab a new send connection - let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here + let (newConn, handshake) = await p.getConn() # ...and here if newConn.isNil: return nil - trace "Caching new send connection", oid = $newConn.oid - p.sendConn = newConn - asyncCheck p.handle(newConn) # start a read loop on the new connection - return newConn + trace "Sending handshake", newConn, handshake = shortLog(handshake) + await newConn.writeLp(encodeRpcMsg(handshake)) + trace "Caching new send connection", newConn + p.sendConn = newConn + # Start a read loop on the new connection. + # All the errors are handled inside `handle()` procedure. + asyncSpawn p.handle(newConn) + return newConn finally: if p.dialLock.locked: p.dialLock.release() -proc send*( - p: PubSubPeer, - msg: RPCMsg, - timeout: Duration = DefaultSendTimeout) {.async.} = +proc connectImpl*(p: PubSubPeer) {.async.} = + try: + discard await getSendConn(p) + except CatchableError as exc: + debug "Could not connect to pubsub peer", err = exc.msg +proc connect*(p: PubSubPeer) = + asyncCheck(connectImpl(p)) + +proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = doAssert(not isNil(p), "pubsubpeer nil!") - logScope: - peer = p.id - rpcMsg = shortLog(msg) - - trace "sending msg to peer" + trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) # trigger send hooks var mm = msg # hooks can modify the message @@ -216,55 +223,50 @@ proc send*( info "empty message, skipping" return - logScope: - encoded = shortLog(encoded) - - let digest = $(sha256.digest(encoded)) - if digest in p.sentRpcCache: - trace "message already sent to peer, skipping" - when defined(libp2p_expensive_metrics): - libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) - return - var conn: Connection try: - trace "about to send message" conn = await p.getSendConn() - if conn == nil: - debug "Couldn't get send connection, dropping message" + debug "Couldn't get send connection, dropping message", peer = p return - trace "sending encoded msgs to peer", connId = $conn.oid - await conn.writeLp(encoded).wait(timeout) - p.sentRpcCache.put(digest) - trace "sent pubsub message to remote", connId = $conn.oid + trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded) + await conn.writeLp(encoded) + trace "sent pubsub message to remote", conn when defined(libp2p_expensive_metrics): for x in mm.messages: for t in x.topicIDs: # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) + libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) except CatchableError as exc: - trace "unable to send to remote", exc = exc.msg + # Because we detach the send call from the currently executing task using + # asyncCheck, no exceptions may leak out of it + debug "unable to send to remote", exc = exc.msg, peer = p # Next time sendConn is used, it will be have its close flag set and thus # will be recycled if not isNil(conn): - await conn.close() + await conn.close() # This will clean up the send connection - raise exc + if exc is CancelledError: # TODO not handled + debug "Send cancelled", peer = p + + # We'll ask for a new send connection whenever possible + if p.sendConn == conn: + p.sendConn = nil + +proc send*(p: PubSubPeer, msg: RPCMsg) = + asyncCheck sendImpl(p, msg) proc `$`*(p: PubSubPeer): string = - p.id + $p.peerId proc newPubSubPeer*(peerId: PeerID, - switch: Switch, + getConn: GetConn, codec: string): PubSubPeer = new result - result.switch = switch + result.getConn = getConn result.codec = codec result.peerId = peerId - result.sentRpcCache = newTimedCache[string](2.minutes) - result.recvdRpcCache = newTimedCache[string](2.minutes) result.dialLock = newAsyncLock() diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index e49b54d9e..090bd374d 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -28,10 +28,10 @@ declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages") func defaultMsgIdProvider*(m: Message): string = - byteutils.toHex(m.seqno) & m.fromPeer.pretty + byteutils.toHex(m.seqno) & $m.fromPeer -proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] = - ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) +proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = + ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) proc verify*(m: Message, p: PeerID): bool = if m.signature.len > 0 and m.key.len > 0: @@ -63,6 +63,9 @@ proc init*( seqno: @(seqno.toBytesBE), # unefficient, fine for now topicIDs: @[topic]) - if sign and peer.publicKey.isSome: - result.signature = sign(result, peer).tryGet() - result.key = peer.publicKey.get().getBytes().tryGet() + if sign: + if peer.keyType != KeyType.HasPrivate: + raise (ref CatchableError)(msg: "Cannot sign message without private key") + + result.signature = sign(result, peer.privateKey).tryGet() + result.key = peer.privateKey.getKey().tryGet().getBytes().tryGet() diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index ea6f8918c..301b8a33b 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -56,6 +56,11 @@ type messages*: seq[Message] control*: Option[ControlMessage] +func withSubs*( + T: type RPCMsg, topics: openArray[string], subscribe: bool): T = + T( + subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))) + func shortLog*(s: ControlIHave): auto = ( topicID: s.topicID.shortLog, @@ -87,7 +92,7 @@ func shortLog*(c: ControlMessage): auto = func shortLog*(msg: Message): auto = ( - fromPeer: msg.fromPeer, + fromPeer: msg.fromPeer.shortLog, data: msg.data.shortLog, seqno: msg.seqno.shortLog, topicIDs: $msg.topicIDs, diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index bfc2a3c84..9b4388b33 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -197,7 +197,7 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} = trace "decodeMessage: decoding message" var msg: Message if ? pb.getField(1, msg.fromPeer): - trace "decodeMessage: read fromPeer", fromPeer = msg.fromPeer.pretty() + trace "decodeMessage: read fromPeer", fromPeer = msg.fromPeer else: trace "decodeMessage: fromPeer is missing" if ? pb.getField(2, msg.data): diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index e7d08f3b2..35a12aed3 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -7,73 +7,59 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables -import chronos, chronicles +import std/[heapqueue, sets] -logScope: - topics = "timedcache" +import chronos/timer const Timeout* = 10.seconds # default timeout in ms type - ExpireHandler*[V] = proc(key: string, val: V) {.gcsafe.} - TimedEntry*[V] = object of RootObj - val: V - handler: ExpireHandler[V] + TimedEntry*[K] = ref object of RootObj + key: K + expiresAt: Moment - TimedCache*[V] = ref object of RootObj - cache*: Table[string, TimedEntry[V]] - onExpire*: ExpireHandler[V] - timeout*: Duration + TimedCache*[K] = object of RootObj + expiries: HeapQueue[TimedEntry[K]] + entries: HashSet[K] + timeout: Duration -# TODO: This belong in chronos, temporary left here until chronos is updated -proc addTimer*(at: Duration, cb: CallbackFunc, udata: pointer = nil) = - ## Arrange for the callback ``cb`` to be called at the given absolute - ## timestamp ``at``. You can also pass ``udata`` to callback. - addTimer(Moment.fromNow(at), cb, udata) +func `<`*(a, b: TimedEntry): bool = + a.expiresAt < b.expiresAt -proc put*[V](t: TimedCache[V], - key: string, - val: V = "", - timeout: Duration, - handler: ExpireHandler[V] = nil) = - trace "adding entry to timed cache", key = key - t.cache[key] = TimedEntry[V](val: val, handler: handler) +func expire*(t: var TimedCache, now: Moment = Moment.now()) = + while t.expiries.len() > 0 and t.expiries[0].expiresAt < now: + t.entries.excl(t.expiries.pop().key) - addTimer( - timeout, - proc (arg: pointer = nil) {.gcsafe.} = - trace "deleting expired entry from timed cache", key = key - if key in t.cache: - let entry = t.cache[key] - t.cache.del(key) - if not isNil(entry.handler): - entry.handler(key, entry.val) +func del*[K](t: var TimedCache[K], key: K): bool = + # Removes existing key from cache, returning false if it was not present + if not t.entries.missingOrExcl(key): + for i in 0.. 0: var buffer = newSeq[byte](size) await sconn.stream.readExactly(addr buffer[0], buffer.len) return sconn.readCs.decryptWithAd([], buffer) else: - trace "Received 0-length message", conn = $sconn + trace "Received 0-length message", sconn method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.} = if message.len == 0: @@ -418,14 +425,14 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async. lesize = cipher.len.uint16 besize = lesize.toBytesBE outbuf = newSeqOfCap[byte](cipher.len + 2) - trace "sendEncryptedMessage", size = lesize, peer = $sconn, left, offset + trace "sendEncryptedMessage", sconn, size = lesize, left, offset outbuf &= besize outbuf &= cipher await sconn.stream.write(outbuf) sconn.activity = true method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureConn] {.async.} = - trace "Starting Noise handshake", initiator, peer = $conn + trace "Starting Noise handshake", conn, initiator # https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages let @@ -469,7 +476,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon if not remoteSig.verify(verifyPayload, remotePubKey): raise newException(NoiseHandshakeError, "Noise handshake signature verify failed.") else: - trace "Remote signature verified", peer = $conn + trace "Remote signature verified", conn if initiator and not isNil(conn.peerInfo): let pid = PeerID.init(remotePubKey) @@ -480,7 +487,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon failedKey: PublicKey discard extractPublicKey(conn.peerInfo.peerId, failedKey) debug "Noise handshake, peer infos don't match!", - initiator, dealt_peer = $conn.peerInfo.id, + initiator, dealt_peer = conn, dealt_key = $failedKey, received_peer = $pid, received_key = $remotePubKey raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId) diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index e40ab2d00..71bcb4dc2 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -6,7 +6,8 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles, oids, stew/endians2, bearssl +import std/[oids, strformat] +import chronos, chronicles, stew/endians2, bearssl import nimcrypto/[hmac, sha2, sha, hash, rijndael, twofish, bcmode] import secure, ../../stream/connection, @@ -69,6 +70,12 @@ type SecioError* = object of CatchableError +func shortLog*(conn: SecioConn): auto = + if conn.isNil: "SecioConn(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(SecioConn): shortLog(it) + proc init(mac: var SecureMac, hash: string, key: openarray[byte]) = if hash == "SHA256": mac = SecureMac(kind: SecureMacType.Sha256) @@ -184,17 +191,17 @@ proc readRawMessage(conn: Connection): Future[seq[byte]] {.async.} = trace "Recieved message header", header = lengthBuf.shortLog, length = length if length > SecioMaxMessageSize: # Verify length before casting! - trace "Received size of message exceed limits", conn = $conn, length = length + trace "Received size of message exceed limits", conn, length = length raise (ref SecioError)(msg: "Message exceeds maximum length") if length > 0: var buf = newSeq[byte](int(length)) await conn.readExactly(addr buf[0], buf.len) trace "Received message body", - conn = $conn, length = buf.len, buff = buf.shortLog + conn, length = buf.len, buff = buf.shortLog return buf - trace "Discarding 0-length payload", conn = $conn + trace "Discarding 0-length payload", conn method readMessage*(sconn: SecioConn): Future[seq[byte]] {.async.} = ## Read message from channel secure connection ``sconn``. @@ -312,12 +319,12 @@ method handshake*(s: Secio, conn: Connection, initiator: bool = false): Future[S var answer = await transactMessage(conn, request) if len(answer) == 0: - trace "Proposal exchange failed", conn = $conn + trace "Proposal exchange failed", conn raise (ref SecioError)(msg: "Proposal exchange failed") if not decodeProposal(answer, remoteNonce, remoteBytesPubkey, remoteExchanges, remoteCiphers, remoteHashes): - trace "Remote proposal decoding failed", conn = $conn + trace "Remote proposal decoding failed", conn raise (ref SecioError)(msg: "Remote proposal decoding failed") if not remotePubkey.init(remoteBytesPubkey): @@ -354,11 +361,11 @@ method handshake*(s: Secio, conn: Connection, initiator: bool = false): Future[S var localExchange = createExchange(epubkey, signature.getBytes()) var remoteExchange = await transactMessage(conn, localExchange) if len(remoteExchange) == 0: - trace "Corpus exchange failed", conn = $conn + trace "Corpus exchange failed", conn raise (ref SecioError)(msg: "Corpus exchange failed") if not decodeExchange(remoteExchange, remoteEBytesPubkey, remoteEBytesSig): - trace "Remote exchange decoding failed", conn = $conn + trace "Remote exchange decoding failed", conn raise (ref SecioError)(msg: "Remote exchange decoding failed") if not remoteESignature.init(remoteEBytesSig): diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 9d7fb9dc9..294632c96 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import options +import std/[options, strformat] import chronos, chronicles, bearssl import ../protocol, ../../stream/streamseq, @@ -27,12 +27,18 @@ type stream*: Connection buf: StreamSeq -proc init*[T: SecureConn](C: type T, - conn: Connection, - peerInfo: PeerInfo, - observedAddr: Multiaddress, - timeout: Duration = DefaultConnectionTimeout): T = - result = C(stream: conn, +func shortLog*(conn: SecureConn): auto = + if conn.isNil: "SecureConn(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(SecureConn): shortLog(it) + +proc init*(T: type SecureConn, + conn: Connection, + peerInfo: PeerInfo, + observedAddr: Multiaddress, + timeout: Duration = DefaultConnectionTimeout): T = + result = T(stream: conn, peerInfo: peerInfo, observedAddr: observedAddr, closeEvent: conn.closeEvent, @@ -63,10 +69,21 @@ proc handleConn*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, gcsafe.} = var sconn = await s.handshake(conn, initiator) + + proc cleanup() {.async.} = + try: + await conn.join() + await sconn.close() + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + discard + except CatchableError as exc: + trace "error cleaning up secure connection", err = exc.msg, sconn + if not isNil(sconn): - conn.join() - .addCallback do(udata: pointer = nil): - asyncCheck sconn.close() + # All the errors are handled inside `cleanup()` procedure. + asyncSpawn cleanup() return sconn @@ -74,18 +91,18 @@ method init*(s: Secure) {.gcsafe.} = procCall LPProtocol(s).init() proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - trace "handling connection upgrade", proto + trace "handling connection upgrade", proto, conn try: # We don't need the result but we # definitely need to await the handshake discard await s.handleConn(conn, false) - trace "connection secured" + trace "connection secured", conn except CancelledError as exc: - warn "securing connection canceled" + warn "securing connection canceled", conn await conn.close() raise exc except CatchableError as exc: - warn "securing connection failed", msg = exc.msg + warn "securing connection failed", err = exc.msg, conn await conn.close() s.handler = handle diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 5cd33c1e6..9ca59d744 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -30,7 +30,7 @@ ## will suspend until either the amount of elements in the ## buffer goes below ``maxSize`` or more data becomes available. -import deques, math +import std/[deques, math, strformat] import chronos, chronicles, metrics import ../stream/connection @@ -43,7 +43,7 @@ logScope: topics = "bufferstream" const - DefaultBufferSize* = 102400 + DefaultBufferSize* = 128 const BufferStreamTrackerName* = "libp2p.bufferstream" @@ -100,6 +100,12 @@ proc newAlreadyPipedError*(): ref CatchableError {.inline.} = proc newNotWritableError*(): ref CatchableError {.inline.} = result = newException(NotWritableError, "stream is not writable") +func shortLog*(s: BufferStream): auto = + if s.isNil: "BufferStream(nil)" + elif s.peerInfo.isNil: $s.oid + else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}" +chronicles.formatIt(BufferStream): shortLog(it) + proc requestReadBytes(s: BufferStream): Future[void] = ## create a future that will complete when more ## data becomes available in the read buffer @@ -142,7 +148,7 @@ proc initBufferStream*(s: BufferStream, await s.writeLock.acquire() await handler(data) - trace "created bufferstream", oid = $s.oid + trace "created bufferstream", s proc newBufferStream*(handler: WriteHandler = nil, size: int = DefaultBufferSize, @@ -206,7 +212,7 @@ proc drainBuffer*(s: BufferStream) {.async.} = ## wait for all data in the buffer to be consumed ## - trace "draining buffer", len = s.len, oid = $s.oid + trace "draining buffer", len = s.len, s while s.len > 0: await s.dataReadEvent.wait() s.dataReadEvent.clear() @@ -296,7 +302,7 @@ method close*(s: BufferStream) {.async, gcsafe.} = try: ## close the stream and clear the buffer if not s.isClosed: - trace "closing bufferstream", oid = $s.oid + trace "closing bufferstream", s s.isEof = true for r in s.readReqs: if not(isNil(r)) and not(r.finished()): @@ -306,11 +312,11 @@ method close*(s: BufferStream) {.async, gcsafe.} = await procCall Connection(s).close() inc getBufferStreamTracker().closed - trace "bufferstream closed", oid = $s.oid + trace "bufferstream closed", s else: trace "attempt to close an already closed bufferstream", - trace = getStackTrace(), oid = $s.oid + trace = getStackTrace(), s except CancelledError as exc: raise exc except CatchableError as exc: - trace "error closing buffer stream", exc = exc.msg + trace "error closing buffer stream", exc = exc.msg, s diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index e6d28cf26..b6bb99d73 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import oids +import std/[oids, strformat] import chronos, chronicles import connection @@ -21,6 +21,12 @@ type ChronosStream* = ref object of Connection client: StreamTransport +func shortLog*(conn: ChronosStream): string = + if conn.isNil: "ChronosStream(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(ChronosStream): shortLog(it) + method initStream*(s: ChronosStream) = if s.objName.len == 0: s.objName = "ChronosStream" @@ -88,7 +94,7 @@ method close*(s: ChronosStream) {.async.} = try: if not s.isClosed: trace "shutting down chronos stream", address = $s.client.remoteAddress(), - oid = $s.oid + s if not s.client.closed(): await s.client.closeWait() @@ -96,4 +102,4 @@ method close*(s: ChronosStream) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "error closing chronosstream", exc = exc.msg + trace "error closing chronosstream", exc = exc.msg, s diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 3359e3344..20343ea81 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -7,13 +7,13 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import hashes, oids +import std/[hashes, oids, strformat] import chronicles, chronos, metrics import lpstream, ../multiaddress, ../peerinfo -export lpstream +export lpstream, peerinfo logScope: topics = "connection" @@ -66,6 +66,12 @@ proc setupConnectionTracker(): ConnectionTracker = result.isLeaked = leakTransport addTracker(ConnectionTrackerName, result) +func shortLog*(conn: Connection): string = + if conn.isNil: "Connection(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(Connection): shortLog(it) + method initStream*(s: Connection) = if s.objName.len == 0: s.objName = "Connection" @@ -77,7 +83,7 @@ method initStream*(s: Connection) = s.timeoutHandler = proc() {.async.} = await s.close() - trace "timeout set at", timeout = s.timeout.millis + trace "timeout set at", timeout = s.timeout.millis, s doAssert(isNil(s.timerTaskFut)) # doAssert(s.timeout > 0.millis) if s.timeout > 0.millis: @@ -94,10 +100,6 @@ method close*(s: Connection) {.async.} = await procCall LPStream(s).close() inc getConnectionTracker().closed -proc `$`*(conn: Connection): string = - if not isNil(conn.peerInfo): - result = conn.peerInfo.id - func hash*(p: Connection): Hash = cast[pointer](p).hash @@ -110,9 +112,6 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = ## be reset ## - logScope: - oid = $s.oid - try: while true: await sleepAsync(s.timeout) @@ -127,14 +126,14 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = break # reset channel on innactivity timeout - trace "Connection timed out" + trace "Connection timed out", s if not(isNil(s.timeoutHandler)): await s.timeoutHandler() except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in timeout", exc = exc.msg + trace "exception in timeout", exc = exc.msg, s proc init*(C: type Connection, peerInfo: PeerInfo, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 03d4c2022..6fffc8640 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -30,9 +30,6 @@ import stream/connection, peerid, errors -chronicles.formatIt(PeerInfo): $it -chronicles.formatIt(PeerID): $it - logScope: topics = "switch" @@ -47,7 +44,8 @@ declareCounter(libp2p_failed_dials, "failed dials") declareCounter(libp2p_failed_upgrade, "peers failed upgrade") type - NoPubSubException* = object of CatchableError + UpgradeFailedError* = object of CatchableError + DialFailedError* = object of CatchableError ConnEventKind* {.pure.} = enum Connected, # A connection was made and securely upgraded - there may be @@ -101,7 +99,7 @@ proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsa except CancelledError as exc: raise exc except CatchableError as exc: # handlers should not raise! - warn "exception in trigger ConnEvents", exc = exc.msg + warn "exception in trigger ConnEvents", exc = exc.msg, peerId proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} @@ -114,13 +112,13 @@ proc isConnected*(s: Switch, peerId: PeerID): bool = proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = if s.secureManagers.len <= 0: - raise newException(CatchableError, "No secure managers registered!") + raise newException(UpgradeFailedError, "No secure managers registered!") let manager = await s.ms.select(conn, s.secureManagers.mapIt(it.codec)) if manager.len == 0: - raise newException(CatchableError, "Unable to negotiate a secure channel!") + raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") - trace "securing connection", codec = manager + trace "Securing connection", codec = manager, conn let secureProtocol = s.secureManagers.filterIt(it.codec == manager) # ms.select should deal with the correctness of this @@ -136,7 +134,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = let info = await s.identity.identify(conn, conn.peerInfo) if info.pubKey.isNone and isNil(conn): - raise newException(CatchableError, + raise newException(UpgradeFailedError, "no public key provided and no existing peer identity found") if isNil(conn.peerInfo): @@ -154,7 +152,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = if info.protos.len > 0: conn.peerInfo.protocols = info.protos - trace "identify: identified remote peer", peer = $conn.peerInfo + trace "identified remote peer", conn, peerInfo = shortLog(conn.peerInfo) proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = # new stream for identify @@ -171,14 +169,14 @@ proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection - trace "muxing connection", peer = $conn + trace "Muxing connection", conn if s.muxers.len == 0: - warn "no muxers registered, skipping upgrade flow" + warn "no muxers registered, skipping upgrade flow", conn return let muxerName = await s.ms.select(conn, toSeq(s.muxers.keys())) if muxerName.len == 0 or muxerName == "na": - debug "no muxer available, early exit", peer = $conn + debug "no muxer available, early exit", conn return # create new muxer for connection @@ -187,16 +185,17 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = # install stream handler muxer.streamHandler = s.streamHandler - s.connManager.storeOutgoing(muxer.connection) + s.connManager.storeOutgoing(conn) + trace "Storing muxer", conn s.connManager.storeMuxer(muxer) - trace "found a muxer", name = muxerName, peer = $conn + trace "found a muxer", name = muxerName, conn # start muxer read loop - the future will complete when loop ends let handlerFut = muxer.handle() # store it in muxed connections if we have a peer for it - trace "adding muxer for peer", peer = conn.peerInfo.id + trace "Storing muxer with handler", conn s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler return muxer @@ -205,52 +204,53 @@ proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = - logScope: - conn = $conn - oid = $conn.oid + trace "Upgrading outgoing connection", conn let sconn = await s.secure(conn) # secure the connection if isNil(sconn): - raise newException(CatchableError, + raise newException(UpgradeFailedError, "unable to secure connection, stopping upgrade") if sconn.peerInfo.isNil: - raise newException(CatchableError, + raise newException(UpgradeFailedError, "current version of nim-libp2p requires that secure protocol negotiates peerid") - trace "upgrading connection" let muxer = await s.mux(sconn) # mux it if possible if muxer == nil: # TODO this might be relaxed in the future - raise newException(CatchableError, + raise newException(UpgradeFailedError, "a muxer is required for outgoing connections") - await s.identify(muxer) + try: + await s.identify(muxer) + except CatchableError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", err = exc.msg, conn if isNil(sconn.peerInfo): await sconn.close() - raise newException(CatchableError, - "unable to identify connection, stopping upgrade") + raise newException(UpgradeFailedError, + "No peerInfo for connection, stopping upgrade") - trace "successfully upgraded outgoing connection", oid = sconn.oid + trace "Upgraded outgoing connection", conn, sconn return sconn proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = - trace "upgrading incoming connection", conn = $conn, oid = $conn.oid + trace "Upgrading incoming connection", conn let ms = newMultistream() # secure incoming connections proc securedHandler (conn: Connection, proto: string) {.async, gcsafe, closure.} = - - var sconn: Connection - trace "Securing connection", oid = $conn.oid + trace "Securing connection", conn let secure = s.secureManagers.filterIt(it.codec == proto)[0] try: - sconn = await secure.secure(conn, false) + var sconn = await secure.secure(conn, false) if isNil(sconn): return @@ -272,7 +272,9 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - debug "ending secured handler", err = exc.msg + debug "Exception in secure handler", err = exc.msg, conn + + trace "Ending secured handler", conn if (await ms.select(conn)): # just handshake # add the secure handlers @@ -286,9 +288,6 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc internalConnect(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]): Future[Connection] {.async.} = - logScope: - peer = peerId - if s.peerInfo.peerId == peerId: raise newException(CatchableError, "can't dial self!") @@ -305,27 +304,26 @@ proc internalConnect(s: Switch, # This connection should already have been removed from the connection # manager - it's essentially a bug that we end up here - we'll fail # for now, hoping that this will clean themselves up later... - warn "dead connection in connection manager" + warn "dead connection in connection manager", conn await conn.close() - raise newException(CatchableError, "Zombie connection encountered") + raise newException(DialFailedError, "Zombie connection encountered") - trace "Reusing existing connection", oid = $conn.oid, - direction = $conn.dir + trace "Reusing existing connection", conn, direction = $conn.dir return conn - trace "Dialing peer" + trace "Dialing peer", peerId for t in s.transports: # for each transport for a in addrs: # for each address if t.handles(a): # check if it can dial it - trace "Dialing address", address = $a + trace "Dialing address", address = $a, peerId let dialed = try: await t.dial(a) except CancelledError as exc: - trace "dialing canceled", exc = exc.msg + trace "Dialing canceled", exc = exc.msg, peerId raise exc except CatchableError as exc: - trace "dialing failed", exc = exc.msg + trace "Dialing failed", exc = exc.msg, peerId libp2p_failed_dials.inc() continue # Try the next address @@ -340,7 +338,7 @@ proc internalConnect(s: Switch, # If we failed to establish the connection through one transport, # we won't succeeed through another - no use in trying again await dialed.close() - debug "upgrade failed", exc = exc.msg + debug "Upgrade failed", exc = exc.msg, peerId if exc isnot CancelledError: libp2p_failed_upgrade.inc() raise exc @@ -348,9 +346,7 @@ proc internalConnect(s: Switch, doAssert not isNil(upgraded), "connection died after upgradeOutgoing" conn = upgraded - trace "dial successful", - oid = $upgraded.oid, - peerInfo = shortLog(upgraded.peerInfo) + trace "Dial successful", conn, peerInfo = conn.peerInfo break finally: if lock.locked(): @@ -359,41 +355,50 @@ proc internalConnect(s: Switch, if isNil(conn): # None of the addresses connected raise newException(CatchableError, "Unable to establish outgoing link") - conn.closeEvent.wait() - .addCallback do(udata: pointer): - asyncCheck s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + if conn.closed(): + # This can happen if one of the peer event handlers deems the peer + # unworthy and disconnects it + raise newLPStreamClosedError() await s.triggerConnEvent( peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) - if conn.closed(): - # This can happen if one of the peer event handlers deems the peer - # unworthy and disconnects it - raise newException(CatchableError, "Connection closed during handshake") + proc peerCleanup() {.async.} = + try: + await conn.closeEvent.wait() + await s.triggerConnEvent(peerId, + ConnEvent(kind: ConnEventKind.Disconnected)) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in switch peer connect cleanup", + conn + except CatchableError as exc: + trace "Unexpected exception in switch peer connect cleanup", + errMsg = exc.msg, conn + + # All the errors are handled inside `cleanup()` procedure. + asyncSpawn peerCleanup() return conn proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = discard await s.internalConnect(peerId, addrs) -proc negotiateStream(s: Switch, stream: Connection, proto: string): Future[Connection] {.async.} = - trace "Attempting to select remote", proto = proto, - streamOid = $stream.oid, - oid = $stream.oid +proc negotiateStream(s: Switch, conn: Connection, proto: string): Future[Connection] {.async.} = + trace "Negotiating stream", proto = proto, conn + if not await s.ms.select(conn, proto): + await conn.close() + raise newException(DialFailedError, "Unable to select sub-protocol " & proto) - if not await s.ms.select(stream, proto): - await stream.close() - raise newException(CatchableError, "Unable to select sub-protocol" & proto) - - return stream + return conn proc dial*(s: Switch, peerId: PeerID, proto: string): Future[Connection] {.async.} = let stream = await s.connmanager.getMuxedStream(peerId) if stream.isNil: - raise newException(CatchableError, "Couldn't get muxed stream") + raise newException(DialFailedError, "Couldn't get muxed stream") return await s.negotiateStream(stream, proto) @@ -415,15 +420,15 @@ proc dial*(s: Switch, try: if isNil(stream): await conn.close() - raise newException(CatchableError, "Couldn't get muxed stream") + raise newException(DialFailedError, "Couldn't get muxed stream") return await s.negotiateStream(stream, proto) except CancelledError as exc: - trace "dial canceled" + trace "dial canceled", conn await cleanup() raise exc except CatchableError as exc: - trace "error dialing", exc = exc.msg + trace "Error dialing", exc = exc.msg, conn await cleanup() raise exc @@ -439,17 +444,19 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = s.ms.addHandler(proto.codecs, proto) proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = - trace "starting switch for peer", peerInfo = shortLog(s.peerInfo) + trace "starting switch for peer", peerInfo = s.peerInfo proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = + trace "Incoming connection", conn try: await s.upgradeIncoming(conn) # perform upgrade on incoming connection except CancelledError as exc: raise exc except CatchableError as exc: - trace "Exception occurred in Switch.start", exc = exc.msg + trace "Exception occurred in incoming handler", exc = exc.msg, conn finally: await conn.close() + trace "Connection handler done", conn var startFuts: seq[Future[void]] for t in s.transports: # for each transport @@ -459,7 +466,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = s.peerInfo.addrs[i] = t.ma # update peer's address startFuts.add(server) - debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs + debug "Started libp2p node", peer = s.peerInfo result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = @@ -479,28 +486,62 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = - if muxer.connection.peerInfo.isNil: + let + conn = muxer.connection + + if conn.peerInfo.isNil: warn "This version of nim-libp2p requires secure protocol to negotiate peerid" await muxer.close() return # store incoming connection - s.connManager.storeIncoming(muxer.connection) + s.connManager.storeIncoming(conn) # store muxer and muxed connection s.connManager.storeMuxer(muxer) try: await s.identify(muxer) + except CatchableError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", err = exc.msg, conn - let peerId = muxer.connection.peerInfo.peerId - muxer.connection.closeEvent.wait() - .addCallback do(udata: pointer): - asyncCheck s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + try: + let peerId = conn.peerInfo.peerId - asyncCheck s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true)) + proc peerCleanup() {.async.} = + try: + await muxer.connection.closeEvent.wait() + await s.triggerConnEvent(peerId, + ConnEvent(kind: ConnEventKind.Disconnected)) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + debug "Unexpected cancellation in switch muxer cleanup", conn + except CatchableError as exc: + debug "Unexpected exception in switch muxer cleanup", + err = exc.msg, conn + + proc peerStartup() {.async.} = + try: + await s.triggerConnEvent(peerId, + ConnEvent(kind: ConnEventKind.Connected, + incoming: true)) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + debug "Unexpected cancellation in switch muxer startup", conn + except CatchableError as exc: + debug "Unexpected exception in switch muxer startup", + err = exc.msg, conn + + # All the errors are handled inside `peerStartup()` procedure. + asyncSpawn peerStartup() + + # All the errors are handled inside `peerCleanup()` procedure. + asyncSpawn peerCleanup() except CancelledError as exc: await muxer.close() @@ -508,7 +549,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CatchableError as exc: await muxer.close() libp2p_failed_upgrade.inc() - trace "exception in muxer handler", exc = exc.msg + trace "Exception in muxer handler", exc = exc.msg, conn proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], @@ -529,17 +570,17 @@ proc newSwitch*(peerInfo: PeerInfo, ) let s = result # can't capture result - result.streamHandler = proc(stream: Connection) {.async, gcsafe.} = + result.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises + trace "Incoming muxed connection", conn try: - trace "handling connection for", peerInfo = $stream - defer: - if not(isNil(stream)): - await stream.close() - await s.ms.handle(stream) # handle incoming connection + await s.ms.handle(conn) # handle incoming connection except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg + trace "exception in stream handler", exc = exc.msg, conn + finally: + await conn.close() + trace "Muxed connection done", conn result.mount(identity) for key, val in muxers: diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 4d0963b82..3e62e5b2a 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -76,13 +76,16 @@ proc connHandler*(t: TcpTransport, if not(isNil(conn)): await conn.close() t.clients.keepItIf(it != client) - except CancelledError as exc: - raise exc + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in transport's cleanup" except CatchableError as exc: trace "error cleaning up client", exc = exc.msg t.clients.add(client) - asyncCheck cleanup() + # All the errors are handled inside `cleanup()` procedure. + asyncSpawn cleanup() result = conn proc connCb(server: StreamServer, diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 0005f2cff..987263ebf 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -16,6 +16,13 @@ type proc noop(data: seq[byte]) {.async, gcsafe.} = discard +proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto = + proc getConn(): Future[(Connection, RPCMsg)] {.async.} = + let conn = await p.switch.dial(peerId, GossipSubCodec) + return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true)) + + newPubSubPeer(peerId, getConn, GossipSubCodec) + proc randomPeerInfo(): PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) @@ -50,7 +57,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) gossipSub.peers[peerInfo.peerId] = peer gossipSub.gossipsub[topic].incl(peer) @@ -81,7 +88,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) gossipSub.grafted(peer, topic) gossipSub.peers[peerInfo.peerId] = peer @@ -103,7 +110,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -116,7 +123,7 @@ suite "GossipSub internal": conns &= conn var peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -137,7 +144,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -152,7 +159,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.fanout[topic].incl(peer) @@ -174,7 +181,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic1 = "foobar1" @@ -193,7 +200,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.fanout[topic1].incl(peer) @@ -218,7 +225,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -234,7 +241,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: @@ -249,7 +256,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -287,7 +294,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -300,7 +307,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: @@ -334,7 +341,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -347,7 +354,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: @@ -382,7 +389,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -395,7 +402,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index cf4988c7a..9488b9359 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -540,9 +540,9 @@ suite "GossipSub": closureScope: var dialerNode = dialer handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if dialerNode.peerInfo.id notin seen: - seen[dialerNode.peerInfo.id] = 0 - seen[dialerNode.peerInfo.id].inc + if $dialerNode.peerInfo.peerId notin seen: + seen[$dialerNode.peerInfo.peerId] = 0 + seen[$dialerNode.peerInfo.peerId].inc check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -552,8 +552,8 @@ suite "GossipSub": await allFuturesThrowing(subs).wait(30.seconds) tryPublish await wait(nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)), + toBytes("from node " & + $nodes[1].peerInfo.peerId)), 1.minutes), runs, 5.seconds await wait(seenFut, 2.minutes) @@ -588,7 +588,7 @@ suite "GossipSub": await allFuturesThrowing(nodes.mapIt(it.start())) await subscribeNodes(nodes) - var seen: Table[string, int] + var seen: Table[PeerID, int] var subs: seq[Future[void]] var seenFut = newFuture[void]() for dialer in nodes: @@ -597,9 +597,9 @@ suite "GossipSub": var dialerNode = dialer handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if dialerNode.peerInfo.id notin seen: - seen[dialerNode.peerInfo.id] = 0 - seen[dialerNode.peerInfo.id].inc + if dialerNode.peerInfo.peerId notin seen: + seen[dialerNode.peerInfo.peerId] = 0 + seen[dialerNode.peerInfo.peerId].inc check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -609,8 +609,8 @@ suite "GossipSub": await allFuturesThrowing(subs) tryPublish await wait(nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)), + toBytes("from node " & + $nodes[1].peerInfo.peerId)), 1.minutes), 2, 5.seconds await wait(seenFut, 5.minutes) diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim index 2301e570a..ad10ab3b8 100644 --- a/tests/pubsub/testmcache.nim +++ b/tests/pubsub/testmcache.nim @@ -15,14 +15,14 @@ proc randomPeerID(): PeerID = suite "MCache": test "put/get": - var mCache = newMCache(3, 5) + var mCache = MCache.init(3, 5) var msg = Message(fromPeer: randomPeerID(), seqno: "12345".toBytes()) let msgId = defaultMsgIdProvider(msg) mCache.put(msgId, msg) check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg test "window": - var mCache = newMCache(3, 5) + var mCache = MCache.init(3, 5) for i in 0..<3: var msg = Message(fromPeer: randomPeerID(), @@ -43,7 +43,7 @@ suite "MCache": check mCache.get(id).get().topicIDs[0] == "foo" test "shift - shift 1 window at a time": - var mCache = newMCache(1, 5) + var mCache = MCache.init(1, 5) for i in 0..<3: var msg = Message(fromPeer: randomPeerID(), @@ -73,7 +73,7 @@ suite "MCache": check mCache.window("baz").len == 0 test "shift - 2 windows at a time": - var mCache = newMCache(1, 5) + var mCache = MCache.init(1, 5) for i in 0..<3: var msg = Message(fromPeer: randomPeerID(), diff --git a/tests/pubsub/testpubsub.nim b/tests/pubsub/testpubsub.nim index a4f72dd1b..e578e6254 100644 --- a/tests/pubsub/testpubsub.nim +++ b/tests/pubsub/testpubsub.nim @@ -3,4 +3,5 @@ import testfloodsub, testgossipsub, testmcache, + testtimedcache, testmessage diff --git a/tests/pubsub/testtimedcache.nim b/tests/pubsub/testtimedcache.nim new file mode 100644 index 000000000..4f3acef9f --- /dev/null +++ b/tests/pubsub/testtimedcache.nim @@ -0,0 +1,34 @@ +{.used.} + +import std/unittest +import chronos/timer +import ../../libp2p/protocols/pubsub/timedcache + +suite "TimedCache": + test "put/get": + var cache = TimedCache[int].init(5.seconds) + + let now = Moment.now() + check: + not cache.put(1, now) + not cache.put(2, now + 3.seconds) + + check: + 1 in cache + 2 in cache + + check: not cache.put(3, now + 6.seconds) # expires 1 + + check: + 1 notin cache + 2 in cache + 3 in cache + + check: + cache.put(2, now + 7.seconds) # refreshes 2 + not cache.put(4, now + 12.seconds) # expires 3 + + check: + 2 in cache + 3 notin cache + 4 in cache diff --git a/tests/testpeer.nim b/tests/testpeer.nim index c9425f0cf..3418e2091 100644 --- a/tests/testpeer.nim +++ b/tests/testpeer.nim @@ -200,10 +200,10 @@ suite "Peer testing suite": p1 == p2 p1 == p4 p2 == p4 - p1.pretty() == PeerIDs[i] - p2.pretty() == PeerIDs[i] - p3.pretty() == PeerIDs[i] - p4.pretty() == PeerIDs[i] + $p1 == PeerIDs[i] + $p2 == PeerIDs[i] + $p3 == PeerIDs[i] + $p4 == PeerIDs[i] p1.match(seckey) == true p1.match(pubkey) == true p1.getBytes() == p2.getBytes()