mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-04 19:14:47 +00:00
fix(lightpush): waku lightpush rpc codec support optional fields
This commit is contained in:
parent
af4fb5f5a7
commit
9396f64489
@ -42,7 +42,7 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
|
||||
return err(dialFailure)
|
||||
let connection = connOpt.get()
|
||||
|
||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req))
|
||||
await connection.writeLP(rpc.encode().buffer)
|
||||
|
||||
var buffer = await connection.readLp(MaxRpcSize.int)
|
||||
@ -53,14 +53,14 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
|
||||
return err(decodeRpcFailure)
|
||||
|
||||
let pushResponseRes = decodeRespRes.get()
|
||||
if pushResponseRes.response == PushResponse():
|
||||
if pushResponseRes.response.isNone():
|
||||
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
|
||||
return err(emptyResponseBodyFailure)
|
||||
|
||||
let response = pushResponseRes.response
|
||||
let response = pushResponseRes.response.get()
|
||||
if not response.isSuccess:
|
||||
if response.info != "":
|
||||
return err(response.info)
|
||||
if response.info.isSome():
|
||||
return err(response.info.get())
|
||||
else:
|
||||
return err("unknown failure")
|
||||
|
||||
|
@ -45,27 +45,27 @@ proc initProtocolHandler*(wl: WakuLightPush) =
|
||||
return
|
||||
|
||||
let req = reqDecodeRes.get()
|
||||
if req.request == PushRequest():
|
||||
if req.request.isNone():
|
||||
error "invalid lightpush rpc received", error=emptyRequestBodyFailure
|
||||
waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure])
|
||||
return
|
||||
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
let
|
||||
pubSubTopic = req.request.pubSubTopic
|
||||
message = req.request.message
|
||||
pubSubTopic = req.request.get().pubSubTopic
|
||||
message = req.request.get().message
|
||||
debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic
|
||||
|
||||
var response: PushResponse
|
||||
let handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message)
|
||||
if handleRes.isOk():
|
||||
response = PushResponse(is_success: true, info: "OK")
|
||||
response = PushResponse(is_success: true, info: some("OK"))
|
||||
else:
|
||||
response = PushResponse(is_success: false, info: handleRes.error)
|
||||
response = PushResponse(is_success: false, info: some(handleRes.error))
|
||||
waku_lightpush_errors.inc(labelValues = [messagePushFailure])
|
||||
error "pushed message handling failed", error=handleRes.error
|
||||
|
||||
let rpc = PushRPC(requestId: req.requestId, response: response)
|
||||
let rpc = PushRPC(requestId: req.requestId, response: some(response))
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
|
||||
wl.handler = handle
|
||||
|
@ -3,6 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/options
|
||||
import
|
||||
../waku_message
|
||||
|
||||
@ -13,9 +15,9 @@ type
|
||||
|
||||
PushResponse* = object
|
||||
isSuccess*: bool
|
||||
info*: string
|
||||
info*: Option[string]
|
||||
|
||||
PushRPC* = object
|
||||
requestId*: string
|
||||
request*: PushRequest
|
||||
response*: PushResponse
|
||||
request*: Option[PushRequest]
|
||||
response*: Option[PushResponse]
|
||||
|
@ -4,6 +4,8 @@ else:
|
||||
{.push raises: [].}
|
||||
|
||||
|
||||
import
|
||||
std/options
|
||||
import
|
||||
../../../common/protobuf,
|
||||
../waku_message,
|
||||
@ -27,12 +29,16 @@ proc decode*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = PushRequest()
|
||||
|
||||
var pubSubTopic: PubsubTopic
|
||||
discard ?pb.getField(1, pubSubTopic)
|
||||
rpc.pubSubTopic = pubSubTopic
|
||||
if not ?pb.getField(1, pubSubTopic):
|
||||
return err(ProtoError.RequiredFieldMissing)
|
||||
else:
|
||||
rpc.pubSubTopic = pubSubTopic
|
||||
|
||||
var buf: seq[byte]
|
||||
discard ?pb.getField(2, buf)
|
||||
rpc.message = ?WakuMessage.decode(buf)
|
||||
var messageBuf: seq[byte]
|
||||
if not ?pb.getField(2, messageBuf):
|
||||
return err(ProtoError.RequiredFieldMissing)
|
||||
else:
|
||||
rpc.message = ?WakuMessage.decode(messageBuf)
|
||||
|
||||
ok(rpc)
|
||||
|
||||
@ -48,15 +54,19 @@ proc encode*(rpc: PushResponse): ProtoBuffer =
|
||||
|
||||
proc decode*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var rpc = PushResponse(isSuccess: false, info: "")
|
||||
var rpc = PushResponse()
|
||||
|
||||
var isSuccess: uint64
|
||||
if ?pb.getField(1, isSuccess):
|
||||
if not ?pb.getField(1, isSuccess):
|
||||
return err(ProtoError.RequiredFieldMissing)
|
||||
else:
|
||||
rpc.isSuccess = bool(isSuccess)
|
||||
|
||||
var info: string
|
||||
discard ?pb.getField(2, info)
|
||||
rpc.info = info
|
||||
if not ?pb.getField(2, info):
|
||||
rpc.info = none(string)
|
||||
else:
|
||||
rpc.info = some(info)
|
||||
|
||||
ok(rpc)
|
||||
|
||||
@ -65,8 +75,8 @@ proc encode*(rpc: PushRPC): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
|
||||
pb.write3(1, rpc.requestId)
|
||||
pb.write3(2, rpc.request.encode())
|
||||
pb.write3(3, rpc.response.encode())
|
||||
pb.write3(2, rpc.request.map(encode))
|
||||
pb.write3(3, rpc.response.map(encode))
|
||||
pb.finish3()
|
||||
|
||||
pb
|
||||
@ -76,15 +86,23 @@ proc decode*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = PushRPC()
|
||||
|
||||
var requestId: string
|
||||
discard ?pb.getField(1, requestId)
|
||||
rpc.requestId = requestId
|
||||
if not ?pb.getField(1, requestId):
|
||||
return err(ProtoError.RequiredFieldMissing)
|
||||
else:
|
||||
rpc.requestId = requestId
|
||||
|
||||
var requestBuffer: seq[byte]
|
||||
discard ?pb.getField(2, requestBuffer)
|
||||
rpc.request = ?PushRequest.decode(requestBuffer)
|
||||
if not ?pb.getField(2, requestBuffer):
|
||||
rpc.request = none(PushRequest)
|
||||
else:
|
||||
let request = ?PushRequest.decode(requestBuffer)
|
||||
rpc.request = some(request)
|
||||
|
||||
var pushBuffer: seq[byte]
|
||||
discard ?pb.getField(3, pushBuffer)
|
||||
rpc.response = ?PushResponse.decode(pushBuffer)
|
||||
var responseBuffer: seq[byte]
|
||||
if not ?pb.getField(3, responseBuffer):
|
||||
rpc.response = none(PushResponse)
|
||||
else:
|
||||
let response = ?PushResponse.decode(responseBuffer)
|
||||
rpc.response = some(response)
|
||||
|
||||
ok(rpc)
|
Loading…
x
Reference in New Issue
Block a user