remove snappy RPC support (#1477)

removed in 0.12.2 - the flow, in particular when the other peer doesn't
support snappy, is hard to follow because of the trial-and-error
approach - removing it simplifies things and removes some of the
hard-to-read parts of the thunking etc
This commit is contained in:
Jacek Sieka 2020-08-10 15:18:17 +02:00 committed by GitHub
parent 936440fccd
commit 280e72f3c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 133 deletions

View File

@ -11,7 +11,7 @@ import
json_serialization, json_serialization/std/[net, options],
chronos, chronicles, metrics,
# TODO: create simpler to use libp2p modules that use re-exports
libp2p/[switch, standard_setup, peerinfo, errors,
libp2p/[switch, standard_setup, peerinfo,
multiaddress, multicodec, crypto/crypto, crypto/secp,
protocols/identify, protocols/protocol],
libp2p/protocols/secure/[secure, secio],
@ -92,7 +92,6 @@ type
maxInactivityAllowed*: Duration
netThroughput: AverageThroughput
score*: int
lacksSnappy: bool
connections*: int
disconnectedFut: Future[void]
@ -110,7 +109,6 @@ type
UntypedResponse = ref object
peer*: Peer
stream*: Connection
noSnappy*: bool
writtenChunks*: int
SingleChunkResponse*[MsgType] = distinct UntypedResponse
@ -279,20 +277,17 @@ template remote*(peer: Peer): untyped =
proc openStream(node: Eth2Node,
peer: Peer,
protocolId: string): Future[Connection] {.async.} =
let protocolId = protocolId & (if peer.lacksSnappy: "ssz" else: "ssz_snappy")
try:
result = await dial(node.switch, peer.info.peerId, peer.info.addrs, protocolId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
# TODO: LibP2P should raise a more specific exception here
if peer.lacksSnappy == false:
peer.lacksSnappy = true
trace "Snappy connection failed. Trying without Snappy",
peer, protocolId
return await openStream(node, peer, protocolId)
else:
raise exc
let
protocolId = protocolId & "ssz_snappy"
conn = await dial(
node.switch, peer.info.peerId, peer.info.addrs, protocolId)
# libp2p may replace peerinfo ref sometimes, so make sure we have a recent
# one
if conn.peerInfo != nil:
peer.info = conn.peerInfo
return conn
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
@ -303,7 +298,7 @@ proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
let peer = Peer.init(node, PeerInfo.init(peerId))
return node.peers.mGetOrPut(peerId, peer)
proc peerFromStream(network: Eth2Node, conn: Connection): Peer {.gcsafe.} =
proc peerFromStream(network: Eth2Node, conn: Connection): Peer =
# TODO: Can this be `nil`?
return network.getPeer(conn.peerInfo.peerId)
@ -427,19 +422,14 @@ proc disconnectAndRaise(peer: Peer,
proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode],
payload: Bytes,
noSnappy: bool) {.async.} =
payload: Bytes) {.async.} =
var output = memoryOutput()
if responseCode.isSome:
output.write byte(responseCode.get)
output.write varintBytes(payload.lenu64)
if noSnappy:
output.write(payload)
else:
output.write(framingFormatCompress payload)
output.write(framingFormatCompress payload)
await conn.write(output.getOutput)
@ -458,12 +448,11 @@ proc formatErrorMsg(msg: ErrorMSg): string =
proc sendErrorResponse(peer: Peer,
conn: Connection,
noSnappy: bool,
responseCode: ResponseCode,
errMsg: ErrorMsg) {.async.} =
debug "Error processing request",
peer, responseCode, errMsg = formatErrorMsg(errMsg)
await conn.writeChunk(some responseCode, SSZ.encode(errMsg), noSnappy)
await conn.writeChunk(some responseCode, SSZ.encode(errMsg))
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var
@ -478,28 +467,26 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
let stream = streamFut.read
try:
await stream.writeChunk(none ResponseCode, requestBytes, peer.lacksSnappy)
await stream.writeChunk(none ResponseCode, requestBytes)
finally:
await safeClose(stream)
proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes) {.async.} =
inc response.writtenChunks
await response.stream.writeChunk(some Success, payload, response.noSnappy)
await response.stream.writeChunk(some Success, payload)
proc sendResponseChunkObj(response: UntypedResponse, val: auto) {.async.} =
inc response.writtenChunks
await response.stream.writeChunk(some Success, SSZ.encode(val), response.noSnappy)
await response.stream.writeChunk(some Success, SSZ.encode(val))
template sendUserHandlerResultAsChunkImpl*(stream: Connection,
noSnappy: bool,
handlerResultFut: Future): untyped =
let handlerRes = await handlerResultFut
writeChunk(stream, some Success, SSZ.encode(handlerRes), noSnappy)
writeChunk(stream, some Success, SSZ.encode(handlerRes))
template sendUserHandlerResultAsChunkImpl*(stream: Connection,
noSnappy: bool,
handlerResult: auto): untyped =
writeChunk(stream, some Success, SSZ.encode(handlerResult), noSnappy)
writeChunk(stream, some Success, SSZ.encode(handlerResult))
when useNativeSnappy:
include faststreams_backend
@ -516,23 +503,23 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
deadline): return neterr StreamOpenTimeout
try:
# Send the request
await stream.writeChunk(none ResponseCode, requestBytes, peer.lacksSnappy)
await stream.writeChunk(none ResponseCode, requestBytes)
# Read the response
return awaitWithTimeout(
readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
peer.lacksSnappy, peer, ResponseMsg),
peer, ResponseMsg),
deadline, neterr(ReadResponseTimeout))
finally:
await safeClose(stream)
proc init*[MsgType](T: type MultipleChunksResponse[MsgType],
peer: Peer, conn: Connection, noSnappy: bool): T =
T(UntypedResponse(peer: peer, stream: conn, noSnappy: noSnappy))
peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
proc init*[MsgType](T: type SingleChunkResponse[MsgType],
peer: Peer, conn: Connection, noSnappy: bool): T =
T(UntypedResponse(peer: peer, stream: conn, noSnappy: noSnappy))
peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
template write*[M](r: MultipleChunksResponse[M], val: auto): untyped =
sendResponseChunkObj(UntypedResponse(r), val)
@ -592,7 +579,6 @@ proc implementSendProcBody(sendProc: SendProc) =
proc handleIncomingStream(network: Eth2Node,
conn: Connection,
noSnappy: bool,
MsgType: type) {.async, gcsafe.} =
mixin callUserHandler, RecType
@ -616,7 +602,7 @@ proc handleIncomingStream(network: Eth2Node,
peer.info = conn.peerInfo
template returnInvalidRequest(msg: ErrorMsg) =
await sendErrorResponse(peer, conn, noSnappy, InvalidRequest, msg)
await sendErrorResponse(peer, conn, InvalidRequest, msg)
return
template returnInvalidRequest(msg: string) =
@ -637,7 +623,7 @@ proc handleIncomingStream(network: Eth2Node,
let msg = if sizeof(MsgRec) > 0:
try:
awaitWithTimeout(readChunkPayload(s, noSnappy, peer, MsgRec), deadline):
awaitWithTimeout(readChunkPayload(s, peer, MsgRec), deadline):
returnInvalidRequest(errorMsgLit "Request full data not sent in time")
except SerializationError as err:
@ -675,14 +661,14 @@ proc handleIncomingStream(network: Eth2Node,
of BrokenConnection:
return
await sendErrorResponse(peer, conn, noSnappy, responseCode, errMsg)
await sendErrorResponse(peer, conn, responseCode, errMsg)
return
try:
logReceivedMsg(peer, MsgType(msg.get))
await callUserHandler(MsgType, peer, conn, noSnappy, msg.get)
await callUserHandler(MsgType, peer, conn, msg.get)
except CatchableError as err:
await sendErrorResponse(peer, conn, noSnappy, ServerError,
await sendErrorResponse(peer, conn, ServerError,
ErrorMsg err.msg.toBytes)
except CatchableError as err:
@ -992,7 +978,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
msgVar = ident "msg"
networkVar = ident "network"
callUserHandler = ident "callUserHandler"
noSnappyVar = ident "noSnappy"
MSG = ident "MSG"
p.useRequestIds = false
@ -1044,7 +1029,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
if msg.kind == msgRequest:
userHandlerCall = newCall(ident"sendUserHandlerResultAsChunkImpl",
streamVar,
noSnappyVar,
userHandlerCall)
else:
if OutputParamType.kind == nnkVarTy:
@ -1060,30 +1044,20 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
userHandlerCall = newStmtList(
newVarStmt(responseVar,
newCall(ident"init", OutputParamType,
peerVar, streamVar, noSnappyVar)),
peerVar, streamVar)),
msg.genUserHandlerCall(msgVar, [peerVar], outputParam = responseVar))
protocol.outRecvProcs.add quote do:
template `callUserHandler`(`MSG`: type `MsgStrongRecName`,
`peerVar`: `Peer`,
`streamVar`: `Connection`,
`noSnappyVar`: bool,
`msgVar`: `MsgRecName`): untyped =
`userHandlerCall`
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
proc sszThunk(`streamVar`: `Connection`,
`protocolVar`: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`, true,
`MsgStrongRecName`)
mount `networkVar`.switch,
LPProtocol(codec: `codecNameLit` & "ssz",
handler: sszThunk)
proc snappyThunk(`streamVar`: `Connection`,
`protocolVar`: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`, false,
return handleIncomingStream(`networkVar`, `streamVar`,
`MsgStrongRecName`)
mount `networkVar`.switch,

View File

@ -90,7 +90,7 @@ proc uncompressFramedStream*(conn: Connection,
return ok output
proc readChunkPayload(conn: Connection, noSnappy: bool, peer: Peer,
proc readChunkPayload(conn: Connection, peer: Peer,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
let sm = now(chronos.Moment)
let size =
@ -109,26 +109,18 @@ proc readChunkPayload(conn: Connection, noSnappy: bool, peer: Peer,
if size == 0:
return neterr ZeroSizePrefix
if noSnappy:
var bytes = newSeq[byte](size.int)
await conn.readExactly(addr bytes[0], bytes.len)
let data = await conn.uncompressFramedStream(size.int)
if data.isOk:
# `10` is the maximum size of variable integer on wire, so error could
# not be significant.
peer.updateNetThroughput(now(chronos.Moment) - sm, uint64(10 + len(bytes)))
return ok SSZ.decode(bytes, MsgType)
peer.updateNetThroughput(now(chronos.Moment) - sm,
uint64(10 + size))
return ok SSZ.decode(data.get(), MsgType)
else:
let data = await conn.uncompressFramedStream(size.int)
if data.isOk:
# `10` is the maximum size of variable integer on wire, so error could
# not be significant.
peer.updateNetThroughput(now(chronos.Moment) - sm,
uint64(10 + size))
return ok SSZ.decode(data.get(), MsgType)
else:
debug "Snappy decompression/read failed", msg = $data.error, conn = $conn
return neterr InvalidSnappyBytes
debug "Snappy decompression/read failed", msg = $data.error, conn = $conn
return neterr InvalidSnappyBytes
proc readResponseChunk(conn: Connection, noSnappy: bool, peer: Peer,
proc readResponseChunk(conn: Connection, peer: Peer,
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
try:
var responseCodeByte: byte
@ -144,7 +136,7 @@ proc readResponseChunk(conn: Connection, noSnappy: bool, peer: Peer,
let responseCode = ResponseCode responseCodeByte
case responseCode:
of InvalidRequest, ServerError:
let errorMsgChunk = await readChunkPayload(conn, noSnappy, peer, ErrorMsg)
let errorMsgChunk = await readChunkPayload(conn, peer, ErrorMsg)
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
@ -153,18 +145,18 @@ proc readResponseChunk(conn: Connection, noSnappy: bool, peer: Peer,
of Success:
discard
return await readChunkPayload(conn, noSnappy, peer, MsgType)
return await readChunkPayload(conn, peer, MsgType)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr UnexpectedEOF
proc readResponse(conn: Connection, noSnappy: bool, peer: Peer,
proc readResponse(conn: Connection, peer: Peer,
MsgType: type): Future[NetRes[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await conn.readResponseChunk(noSnappy, peer, E)
let nextRes = await conn.readResponseChunk(peer, E)
if nextRes.isErr:
if nextRes.error.kind == PotentiallyExpectedEOF:
return ok results
@ -172,4 +164,4 @@ proc readResponse(conn: Connection, noSnappy: bool, peer: Peer,
else:
results.add nextRes.value
else:
return await conn.readResponseChunk(noSnappy, peer, MsgType)
return await conn.readResponseChunk(peer, MsgType)

View File

@ -252,103 +252,69 @@ proc goodbyeUserHandler(peer: Peer; reason: uint64) {.async,
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
template callUserHandler(MSG: type statusObj; peer: Peer; stream: Connection;
noSnappy: bool; msg: StatusMsg): untyped =
var response = init(SingleChunkResponse[StatusMsg], peer, stream, noSnappy)
msg: StatusMsg): untyped =
var response = init(SingleChunkResponse[StatusMsg], peer, stream)
statusUserHandler(peer, msg, response)
proc statusMounter(network: Eth2Node) =
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, true, statusObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/status/1/" &
"ssz", handler: sszThunk)
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, false, statusObj)
return handleIncomingStream(network, stream, statusObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/status/1/" &
"ssz_snappy", handler: snappyThunk)
template callUserHandler(MSG: type pingObj; peer: Peer; stream: Connection;
noSnappy: bool; msg: uint64): untyped =
sendUserHandlerResultAsChunkImpl(stream, noSnappy, pingUserHandler(peer, msg))
template callUserHandler(MSG: type pingObj; peer: Peer; stream: Connection; msg: uint64): untyped =
sendUserHandlerResultAsChunkImpl(stream, pingUserHandler(peer, msg))
proc pingMounter(network: Eth2Node) =
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, true, pingObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/ping/1/" & "ssz",
handler: sszThunk)
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, false, pingObj)
return handleIncomingStream(network, stream, pingObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/ping/1/" &
"ssz_snappy", handler: snappyThunk)
template callUserHandler(MSG: type getMetadataObj; peer: Peer; stream: Connection;
noSnappy: bool; msg: getMetadataObj): untyped =
sendUserHandlerResultAsChunkImpl(stream, noSnappy, getMetadataUserHandler(peer))
msg: getMetadataObj): untyped =
sendUserHandlerResultAsChunkImpl(stream, getMetadataUserHandler(peer))
proc getMetadataMounter(network: Eth2Node) =
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, true, getMetadataObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/metadata/1/" &
"ssz", handler: sszThunk)
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, false, getMetadataObj)
return handleIncomingStream(network, stream, getMetadataObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/metadata/1/" &
"ssz_snappy", handler: snappyThunk)
template callUserHandler(MSG: type beaconBlocksByRangeObj; peer: Peer;
stream: Connection; noSnappy: bool;
msg: beaconBlocksByRangeObj): untyped =
var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream,
noSnappy)
stream: Connection; msg: beaconBlocksByRangeObj): untyped =
var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream)
beaconBlocksByRangeUserHandler(peer, msg.startSlot, msg.count, msg.step, response)
proc beaconBlocksByRangeMounter(network: Eth2Node) =
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, true, beaconBlocksByRangeObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_range/1/" &
"ssz", handler: sszThunk)
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, false, beaconBlocksByRangeObj)
return handleIncomingStream(network, stream, beaconBlocksByRangeObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_range/1/" &
"ssz_snappy", handler: snappyThunk)
template callUserHandler(MSG: type beaconBlocksByRootObj; peer: Peer;
stream: Connection; noSnappy: bool; msg: BlockRootsList): untyped =
var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream,
noSnappy)
stream: Connection; msg: BlockRootsList): untyped =
var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream)
beaconBlocksByRootUserHandler(peer, msg, response)
proc beaconBlocksByRootMounter(network: Eth2Node) =
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, true, beaconBlocksByRootObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_root/1/" &
"ssz", handler: sszThunk)
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, false, beaconBlocksByRootObj)
return handleIncomingStream(network, stream, beaconBlocksByRootObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_root/1/" &
"ssz_snappy", handler: snappyThunk)
template callUserHandler(MSG: type goodbyeObj; peer: Peer; stream: Connection;
noSnappy: bool; msg: uint64): untyped =
msg: uint64): untyped =
goodbyeUserHandler(peer, msg)
proc goodbyeMounter(network: Eth2Node) =
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, true, goodbyeObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/goodbye/1/" &
"ssz", handler: sszThunk)
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
return handleIncomingStream(network, stream, false, goodbyeObj)
return handleIncomingStream(network, stream, goodbyeObj)
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/goodbye/1/" &
"ssz_snappy", handler: snappyThunk)