add DHT storage (addValue/getValue) functionality

this is a minimal implementation, with lots of
work still needed.

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-06-06 11:05:09 +02:00
parent 6aa27072e1
commit c2bb22ab5c
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
6 changed files with 264 additions and 5 deletions

View File

@ -1,4 +1,6 @@
import
./dht/[providers_encoding, providers_messages]
./dht/[providers_encoding, providers_messages],
./dht/[value_encoding, value_messages]
export providers_encoding, providers_messages
export providers_encoding, providers_messages
export value_encoding, value_messages

View File

@ -0,0 +1,77 @@
import
../discv5/[node],
libp2p/protobuf/minprotobuf,
./value_messages
func getField(pb: ProtoBuffer, field: int,
nid: var NodeId): ProtoResult[bool] {.inline.} =
## Read ``NodeId`` from ProtoBuf's message and validate it
var buffer: seq[byte]
let res = ? pb.getField(field, buffer)
if not(res):
ok(false)
else:
nid = readUintBE[256](buffer)
ok(true)
func write(pb: var ProtoBuffer, field: int, nid: NodeId) =
## Write NodeId value ``nodeid`` to object ``pb`` using ProtoBuf's encoding.
write(pb, field, nid.toBytesBE())
proc decode*(
T: typedesc[AddValueMessage],
buffer: openArray[byte]): Result[AddValueMessage, ProtoError] =
let pb = initProtoBuffer(buffer)
var msg = AddValueMessage()
? pb.getRequiredField(1, msg.cId)
? pb.getRequiredField(2, msg.value)
ok(msg)
proc encode*(msg: AddValueMessage): seq[byte] =
var pb = initProtoBuffer()
pb.write(1, msg.cId)
pb.write(2, msg.value)
pb.finish()
pb.buffer
proc decode*(
T: typedesc[GetValueMessage],
buffer: openArray[byte]): Result[GetValueMessage, ProtoError] =
let pb = initProtoBuffer(buffer)
var msg = GetValueMessage()
? pb.getRequiredField(1, msg.cId)
ok(msg)
proc encode*(msg: GetValueMessage): seq[byte] =
var pb = initProtoBuffer()
pb.write(1, msg.cId)
pb.finish()
pb.buffer
proc decode*(
T: typedesc[ValueMessage],
buffer: openArray[byte]): Result[ValueMessage, ProtoError] =
let pb = initProtoBuffer(buffer)
var msg = ValueMessage()
? pb.getRequiredField(1, msg.value)
ok(msg)
proc encode*(msg: ValueMessage): seq[byte] =
var pb = initProtoBuffer()
pb.write(1, msg.value)
pb.finish()
pb.buffer

View File

@ -0,0 +1,14 @@
import
../discv5/[node]
type
AddValueMessage* = object
cId*: NodeId
value*: seq[byte]
GetValueMessage* = object
cId*: NodeId
ValueMessage* = object
#total*: uint32
value*: seq[byte]

View File

@ -17,9 +17,11 @@ import
bearssl/rand,
./spr,
./node,
../../../../dht/providers_messages
../../../../dht/providers_messages,
../../../../dht/value_messages
export providers_messages
export value_messages
type
MessageKind* {.pure.} = enum
@ -41,6 +43,9 @@ type
addProvider = 0x0B
getProviders = 0x0C
providers = 0x0D
addValue = 0x0E
getValue = 0x0F
respValue = 0x10
findNodeFast = 0x83
RequestId* = object
@ -79,7 +84,8 @@ type
SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or
TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or
ProvidersMessage or FindNodeFastMessage
ProvidersMessage or FindNodeFastMessage or
AddValueMessage or GetValueMessage or ValueMessage
Message* = object
reqId*: RequestId
@ -112,6 +118,12 @@ type
getProviders*: GetProvidersMessage
of providers:
provs*: ProvidersMessage
of addValue:
addValue*: AddValueMessage
of getValue:
getValue*: GetValueMessage
of respValue:
value*: ValueMessage
else:
discard
@ -126,6 +138,9 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind =
elif T is AddProviderMessage: MessageKind.addProvider
elif T is GetProvidersMessage: MessageKind.getProviders
elif T is ProvidersMessage: MessageKind.providers
elif T is AddValueMessage: MessageKind.addValue
elif T is GetValueMessage: MessageKind.getValue
elif T is ValueMessage: MessageKind.respValue
proc hash*(reqId: RequestId): Hash =
hash(reqId.id)

View File

@ -14,7 +14,8 @@ import
libp2p/routing_record,
libp2p/signed_envelope,
"."/[messages, spr, node],
../../../../dht/providers_encoding
../../../../dht/providers_encoding,
../../../../dht/value_encoding
from stew/objects import checkedEnumAssign
@ -434,6 +435,30 @@ proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] =
else:
return err("Unable to decode ProvidersMessage")
of addValue:
let res = AddValueMessage.decode(encoded)
if res.isOk:
message.addValue = res.get
return ok(message)
else:
return err "Unable to decode AddValueMessage"
of getValue:
let res = GetValueMessage.decode(encoded)
if res.isOk:
message.getValue = res.get
return ok(message)
else:
return err("Unable to decode GetValueMessage")
of respValue:
let res = ValueMessage.decode(encoded)
if res.isOk:
message.value = res.get
return ok(message)
else:
return err("Unable to decode ValueMessage")
of regTopic, ticket, regConfirmation, topicQuery:
# We just pass the empty type of this message without attempting to
# decode, so that the protocol knows what was received.

View File

@ -174,6 +174,7 @@ type
talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of
rng*: ref HmacDrbgContext
providers: ProvidersManager
valueStore: Table[NodeId, seq[byte]]
TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
{.gcsafe, raises: [Defect].}
@ -399,6 +400,38 @@ proc handleGetProviders(
let response = ProvidersMessage(total: 1, provs: provs.get)
d.sendResponse(fromId, fromAddr, response, reqId)
proc addValueLocal(p: Protocol, cId: NodeId, value: seq[byte]) {.async.} =
trace "adding value to local db", n = p.localNode, cId, value
p.valueStore[cId] = value
proc handleAddValue(
d: Protocol,
fromId: NodeId,
fromAddr: Address,
addValue: AddValueMessage,
reqId: RequestId) =
asyncSpawn d.addValueLocal(addValue.cId, addValue.value)
proc handleGetValue(
d: Protocol,
fromId: NodeId,
fromAddr: Address,
getValue: GetValueMessage,
reqId: RequestId) {.async.} =
try:
let value = d.valueStore[getValue.cId]
trace "retrieved value from local db", n = d.localNode, cID = getValue.cId, value
##TODO: handle multiple messages?
let response = ValueMessage(value: value)
d.sendResponse(fromId, fromAddr, response, reqId)
except KeyError:
# should we respond here? I would say so
trace "no value in local db", n = d.localNode, cID = getValue.cId
# TODO: add noValue response
proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message) =
case message.kind
@ -421,6 +454,13 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
of getProviders:
discovery_message_requests_incoming.inc()
asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
of addValue:
discovery_message_requests_incoming.inc()
#discovery_message_requests_incoming.inc(labelValues = ["no_response"])
d.handleAddValue(srcId, fromAddr, message.addValue, message.reqId)
of getValue:
discovery_message_requests_incoming.inc()
asyncSpawn d.handleGetValue(srcId, fromAddr, message.getValue, message.reqId)
of regTopic, topicQuery:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
@ -820,6 +860,92 @@ proc getProviders*(
return ok res
proc addValue*(
d: Protocol,
cId: NodeId,
value: seq[byte]): Future[seq[Node]] {.async.} =
var res = await d.lookup(cId)
trace "lookup returned:", res
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
if res.len == 0:
res.add(d.localNode)
for toNode in res:
if toNode != d.localNode:
let reqId = RequestId.init(d.rng[])
d.sendRequest(toNode, AddValueMessage(cId: cId, value: value), reqId)
else:
asyncSpawn d.addValueLocal(cId, value)
return res
proc sendGetValue(d: Protocol, toNode: Node,
cId: NodeId): Future[DiscResult[ValueMessage]]
{.async.} =
let msg = GetValueMessage(cId: cId)
trace "sendGetValue", toNode, msg
let
resp = await d.waitResponse(toNode, msg)
if resp.isSome():
if resp.get().kind == MessageKind.respValue:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().value)
else:
# TODO: do we need to do something when there is an invalid response?
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to GetValue message")
else:
# TODO: do we need to do something when there is no response?
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("GetValue response message not received in time")
proc getValue*(
d: Protocol,
cId: NodeId,
timeout: Duration = 5000.milliseconds # TODO: not used?
): Future[DiscResult[seq[byte]]] {.async.} =
# # What value do we know about?
# var res = await d.getProvidersLocal(cId, maxitems)
# trace "local providers:", prov = res.mapIt(it)
let nodesNearby = await d.lookup(cId)
trace "nearby:", nodesNearby
var providersFut: seq[Future[DiscResult[ValueMessage]]]
for n in nodesNearby:
if n != d.localNode:
providersFut.add(d.sendGetValue(n, cId))
while providersFut.len > 0:
let providersMsg = await one(providersFut)
# trace "Got providers response", providersMsg
let index = providersFut.find(providersMsg)
if index != -1:
providersFut.del(index)
let providersMsg2 = await providersMsg
let providersMsgRes = providersMsg.read
if providersMsgRes.isOk:
let value = providersMsgRes.get.value
var res = value
# TODO: validate result before accepting as the right one
# TODO: cancel pending futures!
return ok res
else:
error "Sending of GetValue message failed", error = providersMsgRes.error
# TODO: should we consider this as an error result if all GetProviders
# requests fail??
trace "getValue returned no result", cId
return err "getValue failed"
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
{.async.} =
## Query k nodes for the given target, returns all nodes found, including the