diff --git a/libp2pdht/dht.nim b/libp2pdht/dht.nim index fb62ef8..40e36c9 100644 --- a/libp2pdht/dht.nim +++ b/libp2pdht/dht.nim @@ -1,4 +1,6 @@ import - ./dht/[providers_encoding, providers_messages] + ./dht/[providers_encoding, providers_messages], + ./dht/[value_encoding, value_messages] -export providers_encoding, providers_messages \ No newline at end of file +export providers_encoding, providers_messages +export value_encoding, value_messages \ No newline at end of file diff --git a/libp2pdht/dht/value_encoding.nim b/libp2pdht/dht/value_encoding.nim new file mode 100644 index 0000000..c21d25d --- /dev/null +++ b/libp2pdht/dht/value_encoding.nim @@ -0,0 +1,77 @@ +import + ../discv5/[node], + libp2p/protobuf/minprotobuf, + ./value_messages + +func getField(pb: ProtoBuffer, field: int, + nid: var NodeId): ProtoResult[bool] {.inline.} = + ## Read ``NodeId`` from ProtoBuf's message and validate it + var buffer: seq[byte] + let res = ? pb.getField(field, buffer) + if not(res): + ok(false) + else: + nid = readUintBE[256](buffer) + ok(true) + +func write(pb: var ProtoBuffer, field: int, nid: NodeId) = + ## Write NodeId value ``nodeid`` to object ``pb`` using ProtoBuf's encoding. + write(pb, field, nid.toBytesBE()) + +proc decode*( + T: typedesc[AddValueMessage], + buffer: openArray[byte]): Result[AddValueMessage, ProtoError] = + + let pb = initProtoBuffer(buffer) + var msg = AddValueMessage() + + ? pb.getRequiredField(1, msg.cId) + ? pb.getRequiredField(2, msg.value) + + ok(msg) + +proc encode*(msg: AddValueMessage): seq[byte] = + var pb = initProtoBuffer() + + pb.write(1, msg.cId) + pb.write(2, msg.value) + + pb.finish() + pb.buffer + +proc decode*( + T: typedesc[GetValueMessage], + buffer: openArray[byte]): Result[GetValueMessage, ProtoError] = + + let pb = initProtoBuffer(buffer) + var msg = GetValueMessage() + + ? pb.getRequiredField(1, msg.cId) + + ok(msg) + +proc encode*(msg: GetValueMessage): seq[byte] = + var pb = initProtoBuffer() + + pb.write(1, msg.cId) + + pb.finish() + pb.buffer + +proc decode*( + T: typedesc[ValueMessage], + buffer: openArray[byte]): Result[ValueMessage, ProtoError] = + + let pb = initProtoBuffer(buffer) + var msg = ValueMessage() + ? pb.getRequiredField(1, msg.value) + + ok(msg) + +proc encode*(msg: ValueMessage): seq[byte] = + var pb = initProtoBuffer() + + pb.write(1, msg.value) + + pb.finish() + pb.buffer diff --git a/libp2pdht/dht/value_messages.nim b/libp2pdht/dht/value_messages.nim new file mode 100644 index 0000000..57e6362 --- /dev/null +++ b/libp2pdht/dht/value_messages.nim @@ -0,0 +1,14 @@ +import + ../discv5/[node] + +type + AddValueMessage* = object + cId*: NodeId + value*: seq[byte] + + GetValueMessage* = object + cId*: NodeId + + ValueMessage* = object + #total*: uint32 + value*: seq[byte] diff --git a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim index da3cdb0..1bbfa9f 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim @@ -17,9 +17,11 @@ import bearssl/rand, ./spr, ./node, - ../../../../dht/providers_messages + ../../../../dht/providers_messages, + ../../../../dht/value_messages export providers_messages +export value_messages type MessageKind* {.pure.} = enum @@ -41,6 +43,9 @@ type addProvider = 0x0B getProviders = 0x0C providers = 0x0D + addValue = 0x0E + getValue = 0x0F + respValue = 0x10 findNodeFast = 0x83 RequestId* = object @@ -79,7 +84,8 @@ type SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or - ProvidersMessage or FindNodeFastMessage + ProvidersMessage or FindNodeFastMessage or + AddValueMessage or GetValueMessage or ValueMessage Message* = object reqId*: RequestId @@ -112,6 +118,12 @@ type getProviders*: GetProvidersMessage of providers: provs*: ProvidersMessage + of addValue: + addValue*: AddValueMessage + of getValue: + getValue*: GetValueMessage + of respValue: + value*: ValueMessage else: discard @@ -126,6 +138,9 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind = elif T is AddProviderMessage: MessageKind.addProvider elif T is GetProvidersMessage: MessageKind.getProviders elif T is ProvidersMessage: MessageKind.providers + elif T is AddValueMessage: MessageKind.addValue + elif T is GetValueMessage: MessageKind.getValue + elif T is ValueMessage: MessageKind.respValue proc hash*(reqId: RequestId): Hash = hash(reqId.id) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim b/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim index 6de6596..4d6d989 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim @@ -14,7 +14,8 @@ import libp2p/routing_record, libp2p/signed_envelope, "."/[messages, spr, node], - ../../../../dht/providers_encoding + ../../../../dht/providers_encoding, + ../../../../dht/value_encoding from stew/objects import checkedEnumAssign @@ -434,6 +435,30 @@ proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] = else: return err("Unable to decode ProvidersMessage") + of addValue: + let res = AddValueMessage.decode(encoded) + if res.isOk: + message.addValue = res.get + return ok(message) + else: + return err "Unable to decode AddValueMessage" + + of getValue: + let res = GetValueMessage.decode(encoded) + if res.isOk: + message.getValue = res.get + return ok(message) + else: + return err("Unable to decode GetValueMessage") + + of respValue: + let res = ValueMessage.decode(encoded) + if res.isOk: + message.value = res.get + return ok(message) + else: + return err("Unable to decode ValueMessage") + of regTopic, ticket, regConfirmation, topicQuery: # We just pass the empty type of this message without attempting to # decode, so that the protocol knows what was received. diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index 43fbf2c..c11578a 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -174,6 +174,7 @@ type talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of rng*: ref HmacDrbgContext providers: ProvidersManager + valueStore: Table[NodeId, seq[byte]] TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] {.gcsafe, raises: [Defect].} @@ -399,6 +400,38 @@ proc handleGetProviders( let response = ProvidersMessage(total: 1, provs: provs.get) d.sendResponse(fromId, fromAddr, response, reqId) +proc addValueLocal(p: Protocol, cId: NodeId, value: seq[byte]) {.async.} = + trace "adding value to local db", n = p.localNode, cId, value + + p.valueStore[cId] = value + +proc handleAddValue( + d: Protocol, + fromId: NodeId, + fromAddr: Address, + addValue: AddValueMessage, + reqId: RequestId) = + asyncSpawn d.addValueLocal(addValue.cId, addValue.value) + +proc handleGetValue( + d: Protocol, + fromId: NodeId, + fromAddr: Address, + getValue: GetValueMessage, + reqId: RequestId) {.async.} = + + try: + let value = d.valueStore[getValue.cId] + trace "retrieved value from local db", n = d.localNode, cID = getValue.cId, value + ##TODO: handle multiple messages? + let response = ValueMessage(value: value) + d.sendResponse(fromId, fromAddr, response, reqId) + + except KeyError: + # should we respond here? I would say so + trace "no value in local db", n = d.localNode, cID = getValue.cId + # TODO: add noValue response + proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, message: Message) = case message.kind @@ -421,6 +454,13 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, of getProviders: discovery_message_requests_incoming.inc() asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) + of addValue: + discovery_message_requests_incoming.inc() + #discovery_message_requests_incoming.inc(labelValues = ["no_response"]) + d.handleAddValue(srcId, fromAddr, message.addValue, message.reqId) + of getValue: + discovery_message_requests_incoming.inc() + asyncSpawn d.handleGetValue(srcId, fromAddr, message.getValue, message.reqId) of regTopic, topicQuery: discovery_message_requests_incoming.inc() discovery_message_requests_incoming.inc(labelValues = ["no_response"]) @@ -820,6 +860,92 @@ proc getProviders*( return ok res +proc addValue*( + d: Protocol, + cId: NodeId, + value: seq[byte]): Future[seq[Node]] {.async.} = + + var res = await d.lookup(cId) + trace "lookup returned:", res + # TODO: lookup is specified as not returning local, even if that is the closest. Is this OK? + if res.len == 0: + res.add(d.localNode) + for toNode in res: + if toNode != d.localNode: + let reqId = RequestId.init(d.rng[]) + d.sendRequest(toNode, AddValueMessage(cId: cId, value: value), reqId) + else: + asyncSpawn d.addValueLocal(cId, value) + + return res + +proc sendGetValue(d: Protocol, toNode: Node, + cId: NodeId): Future[DiscResult[ValueMessage]] + {.async.} = + let msg = GetValueMessage(cId: cId) + trace "sendGetValue", toNode, msg + + let + resp = await d.waitResponse(toNode, msg) + + if resp.isSome(): + if resp.get().kind == MessageKind.respValue: + d.routingTable.setJustSeen(toNode) + return ok(resp.get().value) + else: + # TODO: do we need to do something when there is an invalid response? + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) + return err("Invalid response to GetValue message") + else: + # TODO: do we need to do something when there is no response? + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) + return err("GetValue response message not received in time") + +proc getValue*( + d: Protocol, + cId: NodeId, + timeout: Duration = 5000.milliseconds # TODO: not used? + ): Future[DiscResult[seq[byte]]] {.async.} = + + # # What value do we know about? + # var res = await d.getProvidersLocal(cId, maxitems) + # trace "local providers:", prov = res.mapIt(it) + + let nodesNearby = await d.lookup(cId) + trace "nearby:", nodesNearby + var providersFut: seq[Future[DiscResult[ValueMessage]]] + for n in nodesNearby: + if n != d.localNode: + providersFut.add(d.sendGetValue(n, cId)) + + while providersFut.len > 0: + let providersMsg = await one(providersFut) + # trace "Got providers response", providersMsg + + let index = providersFut.find(providersMsg) + if index != -1: + providersFut.del(index) + + let providersMsg2 = await providersMsg + + let providersMsgRes = providersMsg.read + if providersMsgRes.isOk: + let value = providersMsgRes.get.value + var res = value + # TODO: validate result before accepting as the right one + # TODO: cancel pending futures! + return ok res + else: + error "Sending of GetValue message failed", error = providersMsgRes.error + # TODO: should we consider this as an error result if all GetProviders + # requests fail?? + + trace "getValue returned no result", cId + + return err "getValue failed" + proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] {.async.} = ## Query k nodes for the given target, returns all nodes found, including the