nim-libp2p-experimental/libp2p/protocols/pubsub/pubsubpeer.nim

302 lines
10 KiB
Nim

# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[sequtils, strutils, tables, hashes]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
../../peerid,
../../peerinfo,
../../stream/connection,
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
export peerid, connection
logScope:
topics = "libp2p pubsubpeer"
when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
type
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
PubSubPeerEventKind* {.pure.} = enum
Connected
Disconnected
PubSubPeerEvent* = object
kind*: PubSubPeerEventKind
GetConn* = proc(): Future[Connection] {.gcsafe, raises: [Defect].}
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [Defect].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
dropConn*: DropConn # Function pointer to use to drop connections
onEvent*: OnEvent # Connectivity updates for peer
codec*: string # the protocol that this peer joined from
sendConn*: Connection # cached send connection
address*: Option[MultiAddress]
peerId*: PeerId
handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
score*: float64
iWantBudget*: int
iHaveBudget*: int
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
when defined(libp2p_agents_metrics):
shortAgent*: string
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [Defect].}
func hash*(p: PubSubPeer): Hash =
p.peerId.hash
func `==`*(a, b: PubSubPeer): bool =
a.peerId == b.peerId
func shortLog*(p: PubSubPeer): string =
if p.isNil: "PubSubPeer(nil)"
else: shortLog(p.peerId)
chronicles.formatIt(PubSubPeer): shortLog(it)
proc connected*(p: PubSubPeer): bool =
not p.sendConn.isNil and not
(p.sendConn.closed or p.sendConn.atEof)
proc hasObservers*(p: PubSubPeer): bool =
p.observers != nil and anyIt(p.observers[], it != nil)
func outbound*(p: PubSubPeer): bool =
# gossipsub 1.1 spec requires us to know if the transport is outgoing
# in order to give priotity to connections we make
# https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#outbound-mesh-quotas
# This behaviour is presrcibed to counter sybil attacks and ensures that a coordinated inbound attack can never fully take over the mesh
if not p.sendConn.isNil and p.sendConn.transportDir == Direction.Out:
true
else:
false
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
if not(isNil(obs)): # TODO: should never be nil, but...
obs.onRecv(p, msg)
proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
if not(isNil(obs)): # TODO: should never be nil, but...
obs.onSend(p, msg)
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop",
conn, peer = p, closed = conn.closed
try:
try:
while not conn.atEof:
trace "waiting for data", conn, peer = p, closed = conn.closed
var data = await conn.readLp(p.maxMessageSize)
trace "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog
var rmsg = decodeRpcMsg(data)
data = newSeq[byte]() # Release memory
if rmsg.isErr():
notice "failed to decode msg from peer",
conn, peer = p, closed = conn.closed,
err = rmsg.error()
break
trace "decoded msg from peer",
conn, peer = p, closed = conn.closed,
msg = rmsg.get().shortLog
# trigger hooks
p.recvObservers(rmsg.get())
when defined(libp2p_expensive_metrics):
for m in rmsg.get().messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
await p.handler(p, rmsg.get())
finally:
await conn.close()
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
trace "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
finally:
debug "exiting pubsub read loop",
conn, peer = p, closed = conn.closed
proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
try:
let newConn = await p.getConn()
if newConn.isNil:
raise (ref LPError)(msg: "Cannot establish send connection")
# When the send channel goes up, subscriptions need to be sent to the
# remote peer - if we had multiple channels up and one goes down, all
# stop working so we make an effort to only keep a single channel alive
trace "Get new send connection", p, newConn
p.sendConn = newConn
p.address = some(p.sendConn.observedAddr)
if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Connected))
await handle(p, newConn)
finally:
if p.sendConn != nil:
trace "Removing send connection", p, conn = p.sendConn
await p.sendConn.close()
p.sendConn = nil
try:
if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Disconnected))
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Errors during diconnection events", error = exc.msg
# don't cleanup p.address else we leak some gossip stat table
proc connectImpl(p: PubSubPeer) {.async.} =
try:
# Keep trying to establish a connection while it's possible to do so - the
# send connection might get disconnected due to a timeout or an unrelated
# issue so we try to get a new on
while true:
await connectOnce(p)
except CatchableError as exc: # never cancelled
debug "Could not establish send connection", msg = exc.msg
finally:
# drop the connection, else we end up with ghost peers
if p.dropConn != nil: p.dropConn(p)
proc connect*(p: PubSubPeer) =
asyncSpawn connectImpl(p)
proc sendImpl(conn: Connection, encoded: seq[byte]): Future[void] {.raises: [Defect].} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
let fut = conn.writeLp(encoded) # Avoid copying `encoded` into future
proc sendWaiter(): Future[void] {.async.} =
try:
await fut
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection
return sendWaiter()
template sendMetrics(msg: RPCMsg): untyped =
when defined(libp2p_expensive_metrics):
for x in msg.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
doAssert(not isNil(p), "pubsubpeer nil!")
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return
if msg.len > p.maxMessageSize:
info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return
let conn = p.sendConn
if conn == nil or conn.closed():
trace "No send connection, skipping message", p, msg = shortLog(msg)
return
# To limit the size of the closure, we only pass the encoded message and
# connection to the spawned send task
asyncSpawn sendImpl(conn, msg)
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
# some forms of valid but redundantly encoded protobufs with unknown or
# duplicated fields
let encoded = if p.hasObservers():
var mm = msg
# trigger send hooks
p.sendObservers(mm)
sendMetrics(mm)
encodeRpcMsg(mm, anonymize)
else:
# If there are no send hooks, we redundantly re-encode the message to
# protobuf for every peer - this could easily be improved!
sendMetrics(msg)
encodeRpcMsg(msg, anonymize)
p.sendEncoded(encoded)
proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
getConn: GetConn,
dropConn: DropConn,
onEvent: OnEvent,
codec: string,
maxMessageSize: int): T =
T(
getConn: getConn,
dropConn: dropConn,
onEvent: onEvent,
codec: codec,
peerId: peerId,
maxMessageSize: maxMessageSize
)