diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index c03f3f0c8..bdb5d474c 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -68,7 +68,7 @@ type Chat = ref object type PrivateKey* = crypto.PrivateKey - Topic* = waku_node.PubsubTopic + Topic* = waku_message.PubsubTopic ##################### ## chat2 protobufs ## @@ -514,7 +514,7 @@ proc processInput(rfd: AsyncFD) {.async.} = proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = trace "Hit subscribe handler", topic - let decoded = WakuMessage.init(data) + let decoded = WakuMessage.decode(data) if decoded.isOk(): if decoded.get().contentTopic == chat.contentTopic: diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index d0e6e0f71..b51a4d540 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -196,7 +196,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = # Bridging # Handle messages on Waku v2 and bridge to Matterbridge proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): trace "Bridging message from Chat2 to Matterbridge", msg=msg[] cmb.toMatterbridge(msg[]) diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index 37681aee2..ad1a4cb43 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -54,7 +54,7 @@ type WakuBridge* = ref object of RootObj nodev1*: EthereumNode nodev2*: WakuNode - nodev2PubsubTopic: waku_node.PubsubTopic # Pubsub topic to bridge to/from + nodev2PubsubTopic: waku_message.PubsubTopic # Pubsub topic to bridge to/from seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication. rng: ref HmacDrbgContext v1Pool: seq[Node] # Pool of v1 nodes for possible connections @@ -231,7 +231,7 @@ proc new*(T: type WakuBridge, nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), nameResolver: NameResolver = nil, # Bridge configuration - nodev2PubsubTopic: waku_node.PubsubTopic, + nodev2PubsubTopic: waku_message.PubsubTopic, v1Pool: seq[Node] = @[], targetV1Peers = 0): T {.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} = @@ -303,7 +303,7 @@ proc start*(bridge: WakuBridge) {.async.} = # Handle messages on Waku v2 and bridge to Waku v1 proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk() and msg.get().isBridgeable(): try: trace "Bridging message from V2 to V1", msg=msg.tryGet() diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 968e083e3..da857bb51 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -32,7 +32,7 @@ proc runBackground() {.async.} = # Subscribe to a topic let topic = PubsubTopic("foobar") proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let message = WakuMessage.init(data).value + let message = WakuMessage.decode(data).value let payload = cast[string](message.payload) info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic node.subscribe(topic, handler) diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 153caccf8..7573794d1 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -70,7 +70,7 @@ proc setupAndSubscribe() {.async.} = let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let message = WakuMessage.init(data).value + let message = WakuMessage.decode(data).value let payloadStr = string.fromBytes(message.payload) if message.contentTopic == contentTopic: notice "message received", payload=payloadStr, diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index 23898711b..f848a3813 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -128,7 +128,7 @@ procSuite "WakuBridge": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk() and msg.value().version == 1: check: diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index 67d1451c0..2579f090d 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -101,7 +101,7 @@ procSuite "Waku Discovery v5": # Let's see if we can deliver a message end-to-end # var completionFut = newFuture[bool]() # proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # let msg = WakuMessage.init(data) + # let msg = WakuMessage.decode(data) # if msg.isOk(): # let val = msg.value() # check: diff --git a/tests/v2/test_waku_noise.nim b/tests/v2/test_waku_noise.nim index 4aab39891..298437f8c 100644 --- a/tests/v2/test_waku_noise.nim +++ b/tests/v2/test_waku_noise.nim @@ -172,7 +172,7 @@ procSuite "Waku Noise": let pb = msg.get().encode() # We decode the WakuMessage from the ProtoBuffer - let msgFromPb = WakuMessage.init(pb.buffer) + let msgFromPb = WakuMessage.decode(pb.buffer) check: msgFromPb.isOk() diff --git a/tests/v2/test_waku_noise_sessions.nim b/tests/v2/test_waku_noise_sessions.nim index 2aafff622..1e26c7deb 100644 --- a/tests/v2/test_waku_noise_sessions.nim +++ b/tests/v2/test_waku_noise_sessions.nim @@ -128,7 +128,7 @@ procSuite "Waku Noise Sessions": pb = wakuMsg.get().encode() # We decode the WakuMessage from the ProtoBuffer - msgFromPb = WakuMessage.init(pb.buffer) + msgFromPb = WakuMessage.decode(pb.buffer) check: msgFromPb.isOk() @@ -185,7 +185,7 @@ procSuite "Waku Noise Sessions": pb = wakuMsg.get().encode() # We decode the WakuMessage from the ProtoBuffer - msgFromPb = WakuMessage.init(pb.buffer) + msgFromPb = WakuMessage.decode(pb.buffer) check: msgFromPb.isOk() @@ -238,7 +238,7 @@ procSuite "Waku Noise Sessions": pb = wakuMsg.get().encode() # We decode the WakuMessage from the ProtoBuffer - msgFromPb = WakuMessage.init(pb.buffer) + msgFromPb = WakuMessage.decode(pb.buffer) check: msgFromPb.isOk() diff --git a/tests/v2/test_waku_payload.nim b/tests/v2/test_waku_payload.nim index 22833d10c..ff9bf5217 100644 --- a/tests/v2/test_waku_payload.nim +++ b/tests/v2/test_waku_payload.nim @@ -21,7 +21,7 @@ procSuite "Waku Payload": pb = msg.encode() # Decoding - let msgDecoded = WakuMessage.init(pb.buffer) + let msgDecoded = WakuMessage.decode(pb.buffer) check msgDecoded.isOk() let @@ -47,7 +47,7 @@ procSuite "Waku Payload": pb = msg.encode() # Decoding - let msgDecoded = WakuMessage.init(pb.buffer) + let msgDecoded = WakuMessage.decode(pb.buffer) check msgDecoded.isOk() let @@ -73,7 +73,7 @@ procSuite "Waku Payload": pb = msg.encode() # Decoding - let msgDecoded = WakuMessage.init(pb.buffer) + let msgDecoded = WakuMessage.decode(pb.buffer) check msgDecoded.isOk() let @@ -101,7 +101,7 @@ procSuite "Waku Payload": pb = msg.encode() # Decoding - let msgDecoded = WakuMessage.init(pb.buffer) + let msgDecoded = WakuMessage.decode(pb.buffer) check msgDecoded.isOk() let @@ -123,7 +123,7 @@ procSuite "Waku Payload": ## When let pb = msg.encode() - let msgDecoded = WakuMessage.init(pb.buffer) + let msgDecoded = WakuMessage.decode(pb.buffer) ## Then check: @@ -144,7 +144,7 @@ procSuite "Waku Payload": ## When let pb = msg.encode() - let msgDecoded = WakuMessage.init(pb.buffer) + let msgDecoded = WakuMessage.decode(pb.buffer) ## Then check: diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 575cc8612..eaa9f9412 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -45,7 +45,7 @@ procSuite "Waku Peer Exchange": ## When let rpcBuffer: seq[byte] = rpc.encode().buffer - res = PeerExchangeRpc.init(rpcBuffer) + res = PeerExchangeRpc.decode(rpcBuffer) ## Then check: diff --git a/tests/v2/test_waku_store_rpc_codec.nim b/tests/v2/test_waku_store_rpc_codec.nim index 9a0745a67..f575cdfc0 100644 --- a/tests/v2/test_waku_store_rpc_codec.nim +++ b/tests/v2/test_waku_store_rpc_codec.nim @@ -18,7 +18,7 @@ procSuite "Waku Store - RPC codec": ## When let encodedIndex = index.encode() - let decodedIndexRes = PagingIndex.init(encodedIndex.buffer) + let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer) ## Then check: @@ -34,7 +34,7 @@ procSuite "Waku Store - RPC codec": let emptyIndex = PagingIndex() let encodedIndex = emptyIndex.encode() - let decodedIndexRes = PagingIndex.init(encodedIndex.buffer) + let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer) ## Then check: @@ -53,7 +53,7 @@ procSuite "Waku Store - RPC codec": ## When let pb = pagingInfo.encode() - let decodedPagingInfo = PagingInfo.init(pb.buffer) + let decodedPagingInfo = PagingInfo.decode(pb.buffer) ## Then check: @@ -69,8 +69,8 @@ procSuite "Waku Store - RPC codec": let emptyPagingInfo = PagingInfo() ## When - let epb = emptyPagingInfo.encode() - let decodedEmptyPagingInfo = PagingInfo.init(epb.buffer) + let pb = emptyPagingInfo.encode() + let decodedEmptyPagingInfo = PagingInfo.decode(pb.buffer) ## Then check: @@ -89,7 +89,7 @@ procSuite "Waku Store - RPC codec": ## When let pb = query.encode() - let decodedQuery = HistoryQuery.init(pb.buffer) + let decodedQuery = HistoryQuery.decode(pb.buffer) ## Then check: @@ -104,8 +104,8 @@ procSuite "Waku Store - RPC codec": let emptyQuery = HistoryQuery() ## When - let epb = emptyQuery.encode() - let decodedEmptyQuery = HistoryQuery.init(epb.buffer) + let pb = emptyQuery.encode() + let decodedEmptyQuery = HistoryQuery.decode(pb.buffer) ## Then check: @@ -125,7 +125,7 @@ procSuite "Waku Store - RPC codec": ## When let pb = res.encode() - let decodedRes = HistoryResponse.init(pb.buffer) + let decodedRes = HistoryResponse.decode(pb.buffer) ## Then check: @@ -140,8 +140,8 @@ procSuite "Waku Store - RPC codec": let emptyRes = HistoryResponse() ## When - let epb = emptyRes.encode() - let decodedEmptyRes = HistoryResponse.init(epb.buffer) + let pb = emptyRes.encode() + let decodedEmptyRes = HistoryResponse.decode(pb.buffer) ## Then check: diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index c928d06bc..15d8466a6 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -58,7 +58,7 @@ procSuite "WakuNode": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: diff --git a/tests/v2/test_wakunode_lightpush.nim b/tests/v2/test_wakunode_lightpush.nim index e28596b1b..b1cfe1aa5 100644 --- a/tests/v2/test_wakunode_lightpush.nim +++ b/tests/v2/test_wakunode_lightpush.nim @@ -44,7 +44,7 @@ procSuite "WakuNode - Lightpush": var completionFutRelay = newFuture[bool]() proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data).get() + let msg = WakuMessage.decode(data).get() check: pubsubTopic == DefaultPubsubTopic msg == message diff --git a/tests/v2/test_wakunode_relay.nim b/tests/v2/test_wakunode_relay.nim index 3bc840b2e..3eb30ac27 100644 --- a/tests/v2/test_wakunode_relay.nim +++ b/tests/v2/test_wakunode_relay.nim @@ -89,7 +89,7 @@ procSuite "WakuNode - Relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: @@ -158,7 +158,7 @@ procSuite "WakuNode - Relay": ## the validator that only allows messages with contentTopic1 to be relayed check: topic == pubSubTopic - let msg = WakuMessage.init(message.data) + let msg = WakuMessage.decode(message.data) if msg.isOk(): # only relay messages with contentTopic1 if msg.value().contentTopic == contentTopic1: @@ -175,7 +175,7 @@ procSuite "WakuNode - Relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = debug "relayed pubsub topic:", topic - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: @@ -228,7 +228,7 @@ procSuite "WakuNode - Relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: @@ -272,7 +272,7 @@ procSuite "WakuNode - Relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: @@ -320,7 +320,7 @@ procSuite "WakuNode - Relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: @@ -363,7 +363,7 @@ procSuite "WakuNode - Relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: @@ -406,7 +406,7 @@ procSuite "WakuNode - Relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let val = msg.value() check: diff --git a/tests/v2/test_wakunode_rln_relay.nim b/tests/v2/test_wakunode_rln_relay.nim index de6bde812..17db361c6 100644 --- a/tests/v2/test_wakunode_rln_relay.nim +++ b/tests/v2/test_wakunode_rln_relay.nim @@ -107,7 +107,7 @@ procSuite "WakuNode - RLN relay": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): debug "The received topic:", topic if topic == rlnRelayPubSubTopic: @@ -212,7 +212,7 @@ procSuite "WakuNode - RLN relay": # define a custom relay handler var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): debug "The received topic:", topic if topic == rlnRelayPubSubTopic: @@ -361,7 +361,7 @@ procSuite "WakuNode - RLN relay": var completionFut3 = newFuture[bool]() var completionFut4 = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): let wm = msg.value() debug "The received topic:", topic diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index 703c2627b..17fbcbd4b 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -25,7 +25,7 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} = trace "Topic handler triggered", topic=topic - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isOk(): # Add message to current cache trace "WakuMessage received", msg=msg, topic=topic diff --git a/waku/v2/node/rest/relay/topic_cache.nim b/waku/v2/node/rest/relay/topic_cache.nim index 32e846c9d..aca83626b 100644 --- a/waku/v2/node/rest/relay/topic_cache.nim +++ b/waku/v2/node/rest/relay/topic_cache.nim @@ -37,7 +37,7 @@ proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler = trace "Topic handler triggered", topic=topic # Add message to current cache - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isErr(): debug "WakuMessage received but failed to decode", msg=msg, topic=topic # TODO: handle message decode failure diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index dd13e4e9f..9d6b8fb80 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -62,9 +62,6 @@ const git_version* {.strdefine.} = "n/a" # Default clientId const clientId* = "Nimbus Waku v2 node" -# TODO: Unify pubusub topic type and default value -type PubsubTopic* = string - const defaultTopic*: PubsubTopic = "/waku/2/default-waku/proto" # Default Waku Filter Timeout @@ -270,7 +267,7 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler] # A default handler should be registered for all topics trace "Hit default handler", topic=topic, data=data - let msg = WakuMessage.init(data) + let msg = WakuMessage.decode(data) if msg.isErr(): # TODO: Add metric to track waku message decode errors return diff --git a/waku/v2/protocol/waku_filter/client.nim b/waku/v2/protocol/waku_filter/client.nim index 943e17530..79ee4b604 100644 --- a/waku/v2/protocol/waku_filter/client.nim +++ b/waku/v2/protocol/waku_filter/client.nim @@ -90,7 +90,7 @@ proc initProtocolHandler(wf: WakuFilterClient) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = let buffer = await conn.readLp(MaxRpcSize.int) - let decodeReqRes = FilterRPC.init(buffer) + let decodeReqRes = FilterRPC.decode(buffer) if decodeReqRes.isErr(): waku_filter_errors.inc(labelValues = [decodeRpcFailure]) return diff --git a/waku/v2/protocol/waku_filter/protocol.nim b/waku/v2/protocol/waku_filter/protocol.nim index 379087770..193f89773 100644 --- a/waku/v2/protocol/waku_filter/protocol.nim +++ b/waku/v2/protocol/waku_filter/protocol.nim @@ -90,7 +90,7 @@ proc initProtocolHandler(wf: WakuFilter) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = let buffer = await conn.readLp(MaxRpcSize.int) - let decodeRpcRes = FilterRPC.init(buffer) + let decodeRpcRes = FilterRPC.decode(buffer) if decodeRpcRes.isErr(): waku_filter_errors.inc(labelValues = [decodeRpcFailure]) return diff --git a/waku/v2/protocol/waku_filter/rpc_codec.nim b/waku/v2/protocol/waku_filter/rpc_codec.nim index 2bbda4a27..73fce1b29 100644 --- a/waku/v2/protocol/waku_filter/rpc_codec.nim +++ b/waku/v2/protocol/waku_filter/rpc_codec.nim @@ -7,8 +7,8 @@ import libp2p/protobuf/minprotobuf, libp2p/varint import - ../waku_message, ../../utils/protobuf, + ../waku_message, ./rpc @@ -18,36 +18,37 @@ const MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 proc encode*(filter: ContentFilter): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, filter.contentTopic) - output.finish3() + var pb = initProtoBuffer() - return output + pb.write3(1, filter.contentTopic) + pb.finish3() -proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = + pb + +proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) var contentTopic: ContentTopic discard ?pb.getField(1, contentTopic) - return ok(ContentFilter(contentTopic: contentTopic)) + ok(ContentFilter(contentTopic: contentTopic)) proc encode*(rpc: FilterRequest): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, uint64(rpc.subscribe)) - output.write3(2, rpc.pubSubTopic) + var pb = initProtoBuffer() + + pb.write3(1, uint64(rpc.subscribe)) + pb.write3(2, rpc.pubSubTopic) for filter in rpc.contentFilters: - output.write3(3, filter.encode()) + pb.write3(3, filter.encode()) - output.finish3() + pb.finish3() - return output + pb -proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") var subflag: uint64 @@ -61,45 +62,46 @@ proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = var buffs: seq[seq[byte]] discard ?pb.getRepeatedField(3, buffs) for buf in buffs: - rpc.contentFilters.add(?ContentFilter.init(buf)) + rpc.contentFilters.add(?ContentFilter.decode(buf)) - return ok(rpc) + ok(rpc) proc encode*(push: MessagePush): ProtoBuffer = - var output = initProtoBuffer() + var pb = initProtoBuffer() + for push in push.messages: - output.write3(1, push.encode()) - output.finish3() + pb.write3(1, push.encode()) - return output + pb.finish3() -proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = + pb + +proc decode*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var push = MessagePush() var messages: seq[seq[byte]] discard ?pb.getRepeatedField(1, messages) for buf in messages: - push.messages.add(?WakuMessage.init(buf)) + push.messages.add(?WakuMessage.decode(buf)) - return ok(push) + ok(push) proc encode*(rpc: FilterRPC): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, rpc.requestId) - output.write3(2, rpc.request.encode()) - output.write3(3, rpc.push.encode()) - output.finish3() + var pb = initProtoBuffer() - return output + pb.write3(1, rpc.requestId) + pb.write3(2, rpc.request.encode()) + pb.write3(3, rpc.push.encode()) + pb.finish3() -proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = + pb + +proc decode*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = FilterRPC() var requestId: string @@ -108,10 +110,10 @@ proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = var requestBuffer: seq[byte] discard ?pb.getField(2, requestBuffer) - rpc.request = ?FilterRequest.init(requestBuffer) + rpc.request = ?FilterRequest.decode(requestBuffer) var pushBuffer: seq[byte] discard ?pb.getField(3, pushBuffer) - rpc.push = ?MessagePush.init(pushBuffer) + rpc.push = ?MessagePush.decode(pushBuffer) - return ok(rpc) + ok(rpc) diff --git a/waku/v2/protocol/waku_lightpush/client.nim b/waku/v2/protocol/waku_lightpush/client.nim index ca5d25456..fc187020a 100644 --- a/waku/v2/protocol/waku_lightpush/client.nim +++ b/waku/v2/protocol/waku_lightpush/client.nim @@ -46,7 +46,7 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem await connection.writeLP(rpc.encode().buffer) var buffer = await connection.readLp(MaxRpcSize.int) - let decodeRespRes = PushRPC.init(buffer) + let decodeRespRes = PushRPC.decode(buffer) if decodeRespRes.isErr(): error "failed to decode response" waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) diff --git a/waku/v2/protocol/waku_lightpush/protocol.nim b/waku/v2/protocol/waku_lightpush/protocol.nim index acb77536a..8939d57ee 100644 --- a/waku/v2/protocol/waku_lightpush/protocol.nim +++ b/waku/v2/protocol/waku_lightpush/protocol.nim @@ -38,7 +38,7 @@ type proc initProtocolHandler*(wl: WakuLightPush) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = let buffer = await conn.readLp(MaxRpcSize.int) - let reqDecodeRes = PushRPC.init(buffer) + let reqDecodeRes = PushRPC.decode(buffer) if reqDecodeRes.isErr(): error "failed to decode rpc" waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) diff --git a/waku/v2/protocol/waku_lightpush/rpc_codec.nim b/waku/v2/protocol/waku_lightpush/rpc_codec.nim index dd57b5650..0017777db 100644 --- a/waku/v2/protocol/waku_lightpush/rpc_codec.nim +++ b/waku/v2/protocol/waku_lightpush/rpc_codec.nim @@ -7,8 +7,8 @@ else: import libp2p/protobuf/minprotobuf import - ../waku_message, ../../utils/protobuf, + ../waku_message, ./rpc @@ -16,16 +16,16 @@ const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer proc encode*(rpc: PushRequest): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, rpc.pubSubTopic) - output.write3(2, rpc.message.encode()) - output.finish3() + var pb = initProtoBuffer() - return output + pb.write3(1, rpc.pubSubTopic) + pb.write3(2, rpc.message.encode()) + pb.finish3() -proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = + pb + +proc decode*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PushRequest() var pubSubTopic: string @@ -34,22 +34,22 @@ proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = var buf: seq[byte] discard ?pb.getField(2, buf) - rpc.message = ?WakuMessage.init(buf) + rpc.message = ?WakuMessage.decode(buf) - return ok(rpc) + ok(rpc) proc encode*(rpc: PushResponse): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, uint64(rpc.isSuccess)) - output.write3(2, rpc.info) - output.finish3() + var pb = initProtoBuffer() - return output + pb.write3(1, uint64(rpc.isSuccess)) + pb.write3(2, rpc.info) + pb.finish3() -proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = + pb + +proc decode*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PushResponse(isSuccess: false, info: "") var isSuccess: uint64 @@ -60,21 +60,21 @@ proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = discard ?pb.getField(2, info) rpc.info = info - return ok(rpc) + ok(rpc) proc encode*(rpc: PushRPC): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, rpc.requestId) - output.write3(2, rpc.request.encode()) - output.write3(3, rpc.response.encode()) - output.finish3() + var pb = initProtoBuffer() + + pb.write3(1, rpc.requestId) + pb.write3(2, rpc.request.encode()) + pb.write3(3, rpc.response.encode()) + pb.finish3() - return output + pb -proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PushRPC() var requestId: string @@ -83,10 +83,10 @@ proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = var requestBuffer: seq[byte] discard ?pb.getField(2, requestBuffer) - rpc.request = ?PushRequest.init(requestBuffer) + rpc.request = ?PushRequest.decode(requestBuffer) var pushBuffer: seq[byte] discard ?pb.getField(3, pushBuffer) - rpc.response = ?PushResponse.init(pushBuffer) + rpc.response = ?PushResponse.decode(pushBuffer) - return ok(rpc) \ No newline at end of file + ok(rpc) \ No newline at end of file diff --git a/waku/v2/protocol/waku_message.nim b/waku/v2/protocol/waku_message.nim index 1f1fef66b..3574cbe01 100644 --- a/waku/v2/protocol/waku_message.nim +++ b/waku/v2/protocol/waku_message.nim @@ -5,26 +5,29 @@ ## ## For payload content and encryption, see waku/v2/node/waku_payload.nim - when (NimMajor, NimMinor) < (1, 4): {.push raises: [Defect].} else: {.push raises: [].} + import libp2p/protobuf/minprotobuf, - libp2p/varint, + libp2p/varint +import ../utils/protobuf, ../utils/time, - waku_rln_relay/waku_rln_relay_types + ./waku_rln_relay/waku_rln_relay_types -const - MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default + +const MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default type + PubsubTopic* = string ContentTopic* = string - WakuMessage* = object + +type WakuMessage* = object payload*: seq[byte] contentTopic*: ContentTopic version*: uint32 @@ -38,45 +41,42 @@ type # be stored. bools and uints are # equivalent in serialization of the protobuf ephemeral*: bool - - -# Encoding and decoding ------------------------------------------------------- -proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] = + +## Encoding and decoding + +proc encode*(message: WakuMessage): ProtoBuffer = + var buf = initProtoBuffer() + + buf.write3(1, message.payload) + buf.write3(2, message.contentTopic) + buf.write3(3, message.version) + buf.write3(10, zint64(message.timestamp)) + buf.write3(21, message.proof.encode()) + buf.write3(31, uint64(message.ephemeral)) + buf.finish3() + + buf + +proc decode*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] = var msg = WakuMessage(ephemeral: false) let pb = initProtoBuffer(buffer) - discard ? pb.getField(1, msg.payload) - discard ? pb.getField(2, msg.contentTopic) - discard ? pb.getField(3, msg.version) + discard ?pb.getField(1, msg.payload) + discard ?pb.getField(2, msg.contentTopic) + discard ?pb.getField(3, msg.version) var timestamp: zint64 - discard ? pb.getField(10, timestamp) + discard ?pb.getField(10, timestamp) msg.timestamp = Timestamp(timestamp) # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec var proofBytes: seq[byte] - discard ? pb.getField(21, proofBytes) - msg.proof = ? RateLimitProof.init(proofBytes) + discard ?pb.getField(21, proofBytes) + msg.proof = ?RateLimitProof.init(proofBytes) - # Behaviour of ephemeral with storeTTL to be defined, - # If a message is marked ephemeral, it should not have a storeTTL. - # If a message is not marked ephemeral, it should have a storeTTL. - # How would we handle messages that should be stored permanently? var ephemeral: uint - if ? pb.getField(31, ephemeral): + if ?pb.getField(31, ephemeral): msg.ephemeral = bool(ephemeral) ok(msg) - -proc encode*(message: WakuMessage): ProtoBuffer = - result = initProtoBuffer() - - result.write3(1, message.payload) - result.write3(2, message.contentTopic) - result.write3(3, message.version) - result.write3(10, zint64(message.timestamp)) - result.write3(21, message.proof.encode()) - result.write3(31, uint64(message.ephemeral)) - result.finish3() - diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 6a095aa01..ff33f1ffe 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -153,7 +153,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = let buff = await conn.readLp(MaxRpcSize.int) - let res = PeerExchangeRpc.init(buff) + let res = PeerExchangeRpc.decode(buff) if res.isErr(): waku_px_errors.inc(labelValues = [decodeRpcFailure]) return diff --git a/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim index 3432353f3..f94057fff 100644 --- a/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim +++ b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim @@ -3,6 +3,7 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} + import libp2p/protobuf/minprotobuf, libp2p/varint @@ -10,85 +11,86 @@ import ../../utils/protobuf, ./rpc + proc encode*(rpc: PeerExchangeRequest): ProtoBuffer = - var output = initProtoBuffer() + var pb = initProtoBuffer() - output.write3(1, rpc.numPeers) - output.finish3() + pb.write3(1, rpc.numPeers) + pb.finish3() - return output + pb -proc init*(T: type PeerExchangeRequest, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PeerExchangeRequest, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PeerExchangeRequest(numPeers: 0) var numPeers: uint64 if ?pb.getField(1, numPeers): rpc.numPeers = numPeers - return ok(rpc) + ok(rpc) + proc encode*(rpc: PeerExchangePeerInfo): ProtoBuffer = - var output = initProtoBuffer() + var pb = initProtoBuffer() - output.write3(1, rpc.enr) - output.finish3() + pb.write3(1, rpc.enr) + pb.finish3() - return output + pb -proc init*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PeerExchangePeerInfo(enr: @[]) var peerInfoBuffer: seq[byte] if ?pb.getField(1, peerInfoBuffer): - rpc.enr = peerInfoBuffer + rpc.enr = peerInfoBuffer + + ok(rpc) - return ok(rpc) proc encode*(rpc: PeerExchangeResponse): ProtoBuffer = - var output = initProtoBuffer() + var pb = initProtoBuffer() for pi in rpc.peerInfos: - output.write3(1, pi.encode()) - output.finish3() + pb.write3(1, pi.encode()) - return output + pb.finish3() -proc init*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = + pb + +proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PeerExchangeResponse(peerInfos: @[]) var peerInfoBuffers: seq[seq[byte]] if ?pb.getRepeatedField(1, peerInfoBuffers): for pib in peerInfoBuffers: - rpc.peerInfos.add(?PeerExchangePeerInfo.init(pib)) + rpc.peerInfos.add(?PeerExchangePeerInfo.decode(pib)) + + ok(rpc) - return ok(rpc) proc encode*(rpc: PeerExchangeRpc): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, rpc.request.encode()) - output.write3(2, rpc.response.encode()) - output.finish3() + var pb = initProtoBuffer() - return output + pb.write3(1, rpc.request.encode()) + pb.write3(2, rpc.response.encode()) + pb.finish3() -proc init*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = + pb + +proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PeerExchangeRpc() var requestBuffer: seq[byte] discard ?pb.getField(1, requestBuffer) - rpc.request = ?PeerExchangeRequest.init(requestBuffer) + rpc.request = ?PeerExchangeRequest.decode(requestBuffer) var responseBuffer: seq[byte] discard ?pb.getField(2, responseBuffer) - rpc.response = ?PeerExchangeResponse.init(responseBuffer) - - return ok(rpc) + rpc.response = ?PeerExchangeResponse.decode(responseBuffer) + ok(rpc) diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index f3e9da141..0e002c0c6 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -1062,7 +1062,7 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: Co ## the message validation logic is according to https://rfc.vac.dev/spec/17/ proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = trace "rln-relay topic validator is called" - let msg = WakuMessage.init(message.data) + let msg = WakuMessage.decode(message.data) if msg.isOk(): let wakumessage = msg.value() diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index 312818843..537038783 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -52,7 +52,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future await connection.writeLP(rpc.encode().buffer) var message = await connection.readLp(MaxRpcSize.int) - let response = HistoryRPC.init(message) + let response = HistoryRPC.decode(message) if response.isErr(): error "failed to decode response" diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 90707aa2b..5d04e1483 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -171,7 +171,7 @@ proc initProtocolHandler*(ws: WakuStore) = proc handler(conn: Connection, proto: string) {.async.} = let buf = await conn.readLp(MaxRpcSize.int) - let resReq = HistoryRPC.init(buf) + let resReq = HistoryRPC.decode(buf) if resReq.isErr(): error "failed to decode rpc", peerId=conn.peerId waku_store_errors.inc(labelValues = [decodeRpcFailure]) diff --git a/waku/v2/protocol/waku_store/rpc_codec.nim b/waku/v2/protocol/waku_store/rpc_codec.nim index 226e0e363..478f73039 100644 --- a/waku/v2/protocol/waku_store/rpc_codec.nim +++ b/waku/v2/protocol/waku_store/rpc_codec.nim @@ -21,17 +21,17 @@ const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB s proc encode*(index: PagingIndex): ProtoBuffer = ## Encode an Index object into a ProtoBuffer ## returns the resultant ProtoBuffer + var pb = initProtoBuffer() - var output = initProtoBuffer() - output.write3(1, index.digest.data) - output.write3(2, zint64(index.receiverTime)) - output.write3(3, zint64(index.senderTime)) - output.write3(4, index.pubsubTopic) - output.finish3() + pb.write3(1, index.digest.data) + pb.write3(2, zint64(index.receiverTime)) + pb.write3(3, zint64(index.senderTime)) + pb.write3(4, index.pubsubTopic) + pb.finish3() - return output + pb -proc init*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] = ## creates and returns an Index object out of buffer var index = PagingIndex() let pb = initProtoBuffer(buffer) @@ -57,22 +57,22 @@ proc init*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] = # read the pubsubTopic discard ?pb.getField(4, index.pubsubTopic) - return ok(index) + ok(index) proc encode*(pinfo: PagingInfo): ProtoBuffer = ## Encodes a PagingInfo object into a ProtoBuffer ## returns the resultant ProtoBuffer + var pb = initProtoBuffer() - var output = initProtoBuffer() - output.write3(1, pinfo.pageSize) - output.write3(2, pinfo.cursor.encode()) - output.write3(3, uint32(ord(pinfo.direction))) - output.finish3() + pb.write3(1, pinfo.pageSize) + pb.write3(2, pinfo.cursor.encode()) + pb.write3(3, uint32(ord(pinfo.direction))) + pb.finish3() - return output + pb -proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = ## creates and returns a PagingInfo object out of buffer var pagingInfo = PagingInfo() let pb = initProtoBuffer(buffer) @@ -83,22 +83,24 @@ proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = var cursorBuffer: seq[byte] discard ?pb.getField(2, cursorBuffer) - pagingInfo.cursor = ?PagingIndex.init(cursorBuffer) + pagingInfo.cursor = ?PagingIndex.decode(cursorBuffer) var direction: uint32 discard ?pb.getField(3, direction) pagingInfo.direction = PagingDirection(direction) - return ok(pagingInfo) + ok(pagingInfo) proc encode*(filter: HistoryContentFilter): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, filter.contentTopic) - output.finish3() - return output + var pb = initProtoBuffer() -proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = + pb.write3(1, filter.contentTopic) + pb.finish3() + + pb + +proc decode*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) var contentTopic: ContentTopic discard ?pb.getField(1, contentTopic) @@ -107,20 +109,20 @@ proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = proc encode*(query: HistoryQuery): ProtoBuffer = - var output = initProtoBuffer() - output.write3(2, query.pubsubTopic) + var pb = initProtoBuffer() + pb.write3(2, query.pubsubTopic) for filter in query.contentFilters: - output.write3(3, filter.encode()) + pb.write3(3, filter.encode()) - output.write3(4, query.pagingInfo.encode()) - output.write3(5, zint64(query.startTime)) - output.write3(6, zint64(query.endTime)) - output.finish3() + pb.write3(4, query.pagingInfo.encode()) + pb.write3(5, zint64(query.startTime)) + pb.write3(6, zint64(query.endTime)) + pb.finish3() - return output + pb -proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() let pb = initProtoBuffer(buffer) @@ -129,13 +131,13 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var buffs: seq[seq[byte]] discard ?pb.getRepeatedField(3, buffs) - for buf in buffs: - msg.contentFilters.add(? HistoryContentFilter.init(buf)) + for pb in buffs: + msg.contentFilters.add(? HistoryContentFilter.decode(pb)) var pagingInfoBuffer: seq[byte] discard ?pb.getField(4, pagingInfoBuffer) - msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer) + msg.pagingInfo = ?PagingInfo.decode(pagingInfoBuffer) var startTime: zint64 discard ?pb.getField(5, startTime) @@ -145,66 +147,64 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = discard ?pb.getField(6, endTime) msg.endTime = Timestamp(endTime) - return ok(msg) + ok(msg) proc encode*(response: HistoryResponse): ProtoBuffer = - var output = initProtoBuffer() + var pb = initProtoBuffer() for msg in response.messages: - output.write3(2, msg.encode()) + pb.write3(2, msg.encode()) - output.write3(3, response.pagingInfo.encode()) - output.write3(4, uint32(ord(response.error))) - output.finish3() + pb.write3(3, response.pagingInfo.encode()) + pb.write3(4, uint32(ord(response.error))) + pb.finish3() - return output + pb -proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryResponse() let pb = initProtoBuffer(buffer) var messages: seq[seq[byte]] discard ?pb.getRepeatedField(2, messages) - for buf in messages: - let message = ?WakuMessage.init(buf) + for pb in messages: + let message = ?WakuMessage.decode(pb) msg.messages.add(message) var pagingInfoBuffer: seq[byte] discard ?pb.getField(3, pagingInfoBuffer) - msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer) + msg.pagingInfo = ?PagingInfo.decode(pagingInfoBuffer) var error: uint32 discard ?pb.getField(4, error) msg.error = HistoryResponseError(error) - return ok(msg) + ok(msg) proc encode*(rpc: HistoryRPC): ProtoBuffer = - var output = initProtoBuffer() + var pb = initProtoBuffer() - output.write3(1, rpc.requestId) - output.write3(2, rpc.query.encode()) - output.write3(3, rpc.response.encode()) + pb.write3(1, rpc.requestId) + pb.write3(2, rpc.query.encode()) + pb.write3(3, rpc.response.encode()) + pb.finish3() - output.finish3() + pb - return output - -proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = var rpc = HistoryRPC() let pb = initProtoBuffer(buffer) discard ?pb.getField(1, rpc.requestId) var queryBuffer: seq[byte] discard ?pb.getField(2, queryBuffer) - rpc.query = ?HistoryQuery.init(queryBuffer) + rpc.query = ?HistoryQuery.decode(queryBuffer) var responseBuffer: seq[byte] discard ?pb.getField(3, responseBuffer) - rpc.response = ?HistoryResponse.init(responseBuffer) - - return ok(rpc) + rpc.response = ?HistoryResponse.decode(responseBuffer) + ok(rpc)