add DHT storage (addValue/getValue) functionality

this is a minimal implementation, with lots of
work still needed.

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-06-06 11:05:09 +02:00
parent 12af1e626f
commit fc7d04e3bb
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
6 changed files with 264 additions and 5 deletions

View File

@ -1,4 +1,6 @@
import import
./dht/[providers_encoding, providers_messages] ./dht/[providers_encoding, providers_messages],
./dht/[value_encoding, value_messages]
export providers_encoding, providers_messages export providers_encoding, providers_messages
export value_encoding, value_messages

View File

@ -0,0 +1,77 @@
import
../discv5/[node],
libp2p/protobuf/minprotobuf,
./value_messages
func getField(pb: ProtoBuffer, field: int,
nid: var NodeId): ProtoResult[bool] {.inline.} =
## Read ``NodeId`` from ProtoBuf's message and validate it
var buffer: seq[byte]
let res = ? pb.getField(field, buffer)
if not(res):
ok(false)
else:
nid = readUintBE[256](buffer)
ok(true)
func write(pb: var ProtoBuffer, field: int, nid: NodeId) =
## Write NodeId value ``nodeid`` to object ``pb`` using ProtoBuf's encoding.
write(pb, field, nid.toBytesBE())
proc decode*(
T: typedesc[AddValueMessage],
buffer: openArray[byte]): Result[AddValueMessage, ProtoError] =
let pb = initProtoBuffer(buffer)
var msg = AddValueMessage()
? pb.getRequiredField(1, msg.cId)
? pb.getRequiredField(2, msg.value)
ok(msg)
proc encode*(msg: AddValueMessage): seq[byte] =
var pb = initProtoBuffer()
pb.write(1, msg.cId)
pb.write(2, msg.value)
pb.finish()
pb.buffer
proc decode*(
T: typedesc[GetValueMessage],
buffer: openArray[byte]): Result[GetValueMessage, ProtoError] =
let pb = initProtoBuffer(buffer)
var msg = GetValueMessage()
? pb.getRequiredField(1, msg.cId)
ok(msg)
proc encode*(msg: GetValueMessage): seq[byte] =
var pb = initProtoBuffer()
pb.write(1, msg.cId)
pb.finish()
pb.buffer
proc decode*(
T: typedesc[ValueMessage],
buffer: openArray[byte]): Result[ValueMessage, ProtoError] =
let pb = initProtoBuffer(buffer)
var msg = ValueMessage()
? pb.getRequiredField(1, msg.value)
ok(msg)
proc encode*(msg: ValueMessage): seq[byte] =
var pb = initProtoBuffer()
pb.write(1, msg.value)
pb.finish()
pb.buffer

View File

@ -0,0 +1,14 @@
import
../discv5/[node]
type
AddValueMessage* = object
cId*: NodeId
value*: seq[byte]
GetValueMessage* = object
cId*: NodeId
ValueMessage* = object
#total*: uint32
value*: seq[byte]

View File

@ -17,9 +17,11 @@ import
bearssl/rand, bearssl/rand,
./spr, ./spr,
./node, ./node,
../../../../dht/providers_messages ../../../../dht/providers_messages,
../../../../dht/value_messages
export providers_messages export providers_messages
export value_messages
type type
MessageKind* {.pure.} = enum MessageKind* {.pure.} = enum
@ -41,6 +43,9 @@ type
addProvider = 0x0B addProvider = 0x0B
getProviders = 0x0C getProviders = 0x0C
providers = 0x0D providers = 0x0D
addValue = 0x0E
getValue = 0x0F
respValue = 0x10
findNodeFast = 0x83 findNodeFast = 0x83
RequestId* = object RequestId* = object
@ -79,7 +84,8 @@ type
SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or
TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or
ProvidersMessage or FindNodeFastMessage ProvidersMessage or FindNodeFastMessage or
AddValueMessage or GetValueMessage or ValueMessage
Message* = object Message* = object
reqId*: RequestId reqId*: RequestId
@ -112,6 +118,12 @@ type
getProviders*: GetProvidersMessage getProviders*: GetProvidersMessage
of providers: of providers:
provs*: ProvidersMessage provs*: ProvidersMessage
of addValue:
addValue*: AddValueMessage
of getValue:
getValue*: GetValueMessage
of respValue:
value*: ValueMessage
else: else:
discard discard
@ -126,6 +138,9 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind =
elif T is AddProviderMessage: MessageKind.addProvider elif T is AddProviderMessage: MessageKind.addProvider
elif T is GetProvidersMessage: MessageKind.getProviders elif T is GetProvidersMessage: MessageKind.getProviders
elif T is ProvidersMessage: MessageKind.providers elif T is ProvidersMessage: MessageKind.providers
elif T is AddValueMessage: MessageKind.addValue
elif T is GetValueMessage: MessageKind.getValue
elif T is ValueMessage: MessageKind.respValue
proc hash*(reqId: RequestId): Hash = proc hash*(reqId: RequestId): Hash =
hash(reqId.id) hash(reqId.id)

View File

@ -14,7 +14,8 @@ import
libp2p/routing_record, libp2p/routing_record,
libp2p/signed_envelope, libp2p/signed_envelope,
"."/[messages, spr, node], "."/[messages, spr, node],
../../../../dht/providers_encoding ../../../../dht/providers_encoding,
../../../../dht/value_encoding
from stew/objects import checkedEnumAssign from stew/objects import checkedEnumAssign
@ -434,6 +435,30 @@ proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] =
else: else:
return err("Unable to decode ProvidersMessage") return err("Unable to decode ProvidersMessage")
of addValue:
let res = AddValueMessage.decode(encoded)
if res.isOk:
message.addValue = res.get
return ok(message)
else:
return err "Unable to decode AddValueMessage"
of getValue:
let res = GetValueMessage.decode(encoded)
if res.isOk:
message.getValue = res.get
return ok(message)
else:
return err("Unable to decode GetValueMessage")
of respValue:
let res = ValueMessage.decode(encoded)
if res.isOk:
message.value = res.get
return ok(message)
else:
return err("Unable to decode ValueMessage")
of regTopic, ticket, regConfirmation, topicQuery: of regTopic, ticket, regConfirmation, topicQuery:
# We just pass the empty type of this message without attempting to # We just pass the empty type of this message without attempting to
# decode, so that the protocol knows what was received. # decode, so that the protocol knows what was received.

View File

@ -172,6 +172,7 @@ type
talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of
rng*: ref HmacDrbgContext rng*: ref HmacDrbgContext
providers: ProvidersManager providers: ProvidersManager
valueStore: Table[NodeId, seq[byte]]
TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
{.gcsafe, raises: [Defect].} {.gcsafe, raises: [Defect].}
@ -397,6 +398,38 @@ proc handleGetProviders(
let response = ProvidersMessage(total: 1, provs: provs.get) let response = ProvidersMessage(total: 1, provs: provs.get)
d.sendResponse(fromId, fromAddr, response, reqId) d.sendResponse(fromId, fromAddr, response, reqId)
proc addValueLocal(p: Protocol, cId: NodeId, value: seq[byte]) {.async.} =
trace "adding value to local db", n = p.localNode, cId, value
p.valueStore[cId] = value
proc handleAddValue(
d: Protocol,
fromId: NodeId,
fromAddr: Address,
addValue: AddValueMessage,
reqId: RequestId) =
asyncSpawn d.addValueLocal(addValue.cId, addValue.value)
proc handleGetValue(
d: Protocol,
fromId: NodeId,
fromAddr: Address,
getValue: GetValueMessage,
reqId: RequestId) {.async.} =
try:
let value = d.valueStore[getValue.cId]
trace "retrieved value from local db", n = d.localNode, cID = getValue.cId, value
##TODO: handle multiple messages?
let response = ValueMessage(value: value)
d.sendResponse(fromId, fromAddr, response, reqId)
except KeyError:
# should we respond here? I would say so
trace "no value in local db", n = d.localNode, cID = getValue.cId
# TODO: add noValue response
proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message) = message: Message) =
case message.kind case message.kind
@ -419,6 +452,13 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
of getProviders: of getProviders:
discovery_message_requests_incoming.inc() discovery_message_requests_incoming.inc()
asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
of addValue:
discovery_message_requests_incoming.inc()
#discovery_message_requests_incoming.inc(labelValues = ["no_response"])
d.handleAddValue(srcId, fromAddr, message.addValue, message.reqId)
of getValue:
discovery_message_requests_incoming.inc()
asyncSpawn d.handleGetValue(srcId, fromAddr, message.getValue, message.reqId)
of regTopic, topicQuery: of regTopic, topicQuery:
discovery_message_requests_incoming.inc() discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"]) discovery_message_requests_incoming.inc(labelValues = ["no_response"])
@ -807,6 +847,92 @@ proc getProviders*(
return ok res return ok res
proc addValue*(
d: Protocol,
cId: NodeId,
value: seq[byte]): Future[seq[Node]] {.async.} =
var res = await d.lookup(cId)
trace "lookup returned:", res
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
if res.len == 0:
res.add(d.localNode)
for toNode in res:
if toNode != d.localNode:
let reqId = RequestId.init(d.rng[])
d.sendRequest(toNode, AddValueMessage(cId: cId, value: value), reqId)
else:
asyncSpawn d.addValueLocal(cId, value)
return res
proc sendGetValue(d: Protocol, toNode: Node,
cId: NodeId): Future[DiscResult[ValueMessage]]
{.async.} =
let msg = GetValueMessage(cId: cId)
trace "sendGetValue", toNode, msg
let
resp = await d.waitResponse(toNode, msg)
if resp.isSome():
if resp.get().kind == MessageKind.respValue:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().value)
else:
# TODO: do we need to do something when there is an invalid response?
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to GetValue message")
else:
# TODO: do we need to do something when there is no response?
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("GetValue response message not received in time")
proc getValue*(
d: Protocol,
cId: NodeId,
timeout: Duration = 5000.milliseconds # TODO: not used?
): Future[DiscResult[seq[byte]]] {.async.} =
# # What value do we know about?
# var res = await d.getProvidersLocal(cId, maxitems)
# trace "local providers:", prov = res.mapIt(it)
let nodesNearby = await d.lookup(cId)
trace "nearby:", nodesNearby
var providersFut: seq[Future[DiscResult[ValueMessage]]]
for n in nodesNearby:
if n != d.localNode:
providersFut.add(d.sendGetValue(n, cId))
while providersFut.len > 0:
let providersMsg = await one(providersFut)
# trace "Got providers response", providersMsg
let index = providersFut.find(providersMsg)
if index != -1:
providersFut.del(index)
let providersMsg2 = await providersMsg
let providersMsgRes = providersMsg.read
if providersMsgRes.isOk:
let value = providersMsgRes.get.value
var res = value
# TODO: validate result before accepting as the right one
# TODO: cancel pending futures!
return ok res
else:
error "Sending of GetValue message failed", error = providersMsgRes.error
# TODO: should we consider this as an error result if all GetProviders
# requests fail??
trace "getValue returned no result", cId
return err "getValue failed"
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
{.async.} = {.async.} =
## Query k nodes for the given target, returns all nodes found, including the ## Query k nodes for the given target, returns all nodes found, including the