nim-libp2p/libp2p/protocols/rendezvous.nim

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

791 lines
24 KiB
Nim
Raw Permalink Normal View History

2022-09-30 08:41:04 +00:00
# Nim-LibP2P
# Copyright (c) 2023-2024 Status Research & Development GmbH
2022-09-30 08:41:04 +00:00
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
2023-06-07 11:12:49 +00:00
{.push raises: [].}
2022-09-30 08:41:04 +00:00
import tables, sequtils, sugar, sets
import metrics except collect
import chronos, chronicles, bearssl/rand, stew/[byteutils, objects, results]
2022-09-30 08:41:04 +00:00
import
./protocol,
../switch,
../routing_record,
../utils/heartbeat,
../stream/connection,
../utils/offsettedseq,
../utils/semaphore
export chronicles
logScope:
topics = "libp2p discovery rendezvous"
declareCounter(libp2p_rendezvous_register, "number of advertise requests")
declareCounter(libp2p_rendezvous_discover, "number of discovery requests")
declareGauge(libp2p_rendezvous_registered, "number of registered peers")
declareGauge(libp2p_rendezvous_namespaces, "number of registered namespaces")
2022-09-30 08:41:04 +00:00
const
RendezVousCodec* = "/rendezvous/1.0.0"
MinimumDuration* = 2.hours
2022-09-30 08:41:04 +00:00
MaximumDuration = 72.hours
RegistrationLimitPerPeer = 1000
DiscoverLimit = 1000'u64
SemaphoreDefaultSize = 5
type
MessageType {.pure.} = enum
Register = 0
RegisterResponse = 1
Unregister = 2
Discover = 3
DiscoverResponse = 4
ResponseStatus = enum
Ok = 0
InvalidNamespace = 100
InvalidSignedPeerRecord = 101
InvalidTTL = 102
InvalidCookie = 103
NotAuthorized = 200
InternalError = 300
Unavailable = 400
Cookie = object
offset: uint64
ns: string
Register = object
ns: string
signedPeerRecord: seq[byte]
ttl: Opt[uint64] # in seconds
2022-09-30 08:41:04 +00:00
RegisterResponse = object
status: ResponseStatus
text: Opt[string]
ttl: Opt[uint64] # in seconds
2022-09-30 08:41:04 +00:00
Unregister = object
ns: string
Discover = object
ns: string
limit: Opt[uint64]
cookie: Opt[seq[byte]]
2022-09-30 08:41:04 +00:00
DiscoverResponse = object
registrations: seq[Register]
cookie: Opt[seq[byte]]
2022-09-30 08:41:04 +00:00
status: ResponseStatus
text: Opt[string]
2022-09-30 08:41:04 +00:00
Message = object
msgType: MessageType
register: Opt[Register]
registerResponse: Opt[RegisterResponse]
unregister: Opt[Unregister]
discover: Opt[Discover]
discoverResponse: Opt[DiscoverResponse]
2022-09-30 08:41:04 +00:00
proc encode(c: Cookie): ProtoBuffer =
result = initProtoBuffer()
result.write(1, c.offset)
result.write(2, c.ns)
result.finish()
proc encode(r: Register): ProtoBuffer =
result = initProtoBuffer()
result.write(1, r.ns)
result.write(2, r.signedPeerRecord)
r.ttl.withValue(ttl):
result.write(3, ttl)
2022-09-30 08:41:04 +00:00
result.finish()
proc encode(rr: RegisterResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, rr.status.uint)
rr.text.withValue(text):
result.write(2, text)
rr.ttl.withValue(ttl):
result.write(3, ttl)
2022-09-30 08:41:04 +00:00
result.finish()
proc encode(u: Unregister): ProtoBuffer =
result = initProtoBuffer()
result.write(1, u.ns)
result.finish()
proc encode(d: Discover): ProtoBuffer =
result = initProtoBuffer()
result.write(1, d.ns)
d.limit.withValue(limit):
result.write(2, limit)
d.cookie.withValue(cookie):
result.write(3, cookie)
2022-09-30 08:41:04 +00:00
result.finish()
proc encode(dr: DiscoverResponse): ProtoBuffer =
2022-09-30 08:41:04 +00:00
result = initProtoBuffer()
for reg in dr.registrations:
2022-09-30 08:41:04 +00:00
result.write(1, reg.encode())
dr.cookie.withValue(cookie):
result.write(2, cookie)
result.write(3, dr.status.uint)
dr.text.withValue(text):
result.write(4, text)
2022-09-30 08:41:04 +00:00
result.finish()
proc encode(msg: Message): ProtoBuffer =
result = initProtoBuffer()
result.write(1, msg.msgType.uint)
msg.register.withValue(register):
result.write(2, register.encode())
msg.registerResponse.withValue(registerResponse):
result.write(3, registerResponse.encode())
msg.unregister.withValue(unregister):
result.write(4, unregister.encode())
msg.discover.withValue(discover):
result.write(5, discover.encode())
msg.discoverResponse.withValue(discoverResponse):
result.write(6, discoverResponse.encode())
2022-09-30 08:41:04 +00:00
result.finish()
proc decode(_: typedesc[Cookie], buf: seq[byte]): Opt[Cookie] =
2022-09-30 08:41:04 +00:00
var c: Cookie
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, c.offset)
r2 = pb.getRequiredField(2, c.ns)
if r1.isErr() or r2.isErr():
return Opt.none(Cookie)
Opt.some(c)
2022-09-30 08:41:04 +00:00
proc decode(_: typedesc[Register], buf: seq[byte]): Opt[Register] =
2022-09-30 08:41:04 +00:00
var
r: Register
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, r.ns)
r2 = pb.getRequiredField(2, r.signedPeerRecord)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr():
return Opt.none(Register)
if r3.get(false):
r.ttl = Opt.some(ttl)
Opt.some(r)
2022-09-30 08:41:04 +00:00
proc decode(_: typedesc[RegisterResponse], buf: seq[byte]): Opt[RegisterResponse] =
2022-09-30 08:41:04 +00:00
var
rr: RegisterResponse
statusOrd: uint
text: string
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, statusOrd)
r2 = pb.getField(2, text)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr() or
not checkedEnumAssign(rr.status, statusOrd):
return Opt.none(RegisterResponse)
if r2.get(false):
rr.text = Opt.some(text)
if r3.get(false):
rr.ttl = Opt.some(ttl)
Opt.some(rr)
2022-09-30 08:41:04 +00:00
proc decode(_: typedesc[Unregister], buf: seq[byte]): Opt[Unregister] =
2022-09-30 08:41:04 +00:00
var u: Unregister
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, u.ns)
if r1.isErr():
return Opt.none(Unregister)
Opt.some(u)
2022-09-30 08:41:04 +00:00
proc decode(_: typedesc[Discover], buf: seq[byte]): Opt[Discover] =
2022-09-30 08:41:04 +00:00
var
d: Discover
limit: uint64
cookie: seq[byte]
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, d.ns)
r2 = pb.getField(2, limit)
r3 = pb.getField(3, cookie)
if r1.isErr() or r2.isErr() or r3.isErr:
return Opt.none(Discover)
if r2.get(false):
d.limit = Opt.some(limit)
if r3.get(false):
d.cookie = Opt.some(cookie)
Opt.some(d)
2022-09-30 08:41:04 +00:00
proc decode(_: typedesc[DiscoverResponse], buf: seq[byte]): Opt[DiscoverResponse] =
2022-09-30 08:41:04 +00:00
var
dr: DiscoverResponse
registrations: seq[seq[byte]]
cookie: seq[byte]
statusOrd: uint
text: string
let
pb = initProtoBuffer(buf)
r1 = pb.getRepeatedField(1, registrations)
r2 = pb.getField(2, cookie)
r3 = pb.getRequiredField(3, statusOrd)
r4 = pb.getField(4, text)
if r1.isErr() or r2.isErr() or r3.isErr or r4.isErr() or
not checkedEnumAssign(dr.status, statusOrd):
return Opt.none(DiscoverResponse)
2022-09-30 08:41:04 +00:00
for reg in registrations:
var r: Register
let regOpt = Register.decode(reg).valueOr:
return
dr.registrations.add(regOpt)
if r2.get(false):
dr.cookie = Opt.some(cookie)
if r4.get(false):
dr.text = Opt.some(text)
Opt.some(dr)
proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] =
2022-09-30 08:41:04 +00:00
var
msg: Message
statusOrd: uint
pbr, pbrr, pbu, pbd, pbdr: ProtoBuffer
let pb = initProtoBuffer(buf)
?pb.getRequiredField(1, statusOrd).toOpt
if not checkedEnumAssign(msg.msgType, statusOrd):
return Opt.none(Message)
if ?pb.getField(2, pbr).optValue:
2022-09-30 08:41:04 +00:00
msg.register = Register.decode(pbr.buffer)
if msg.register.isNone():
return Opt.none(Message)
if ?pb.getField(3, pbrr).optValue:
2022-09-30 08:41:04 +00:00
msg.registerResponse = RegisterResponse.decode(pbrr.buffer)
if msg.registerResponse.isNone():
return Opt.none(Message)
if ?pb.getField(4, pbu).optValue:
2022-09-30 08:41:04 +00:00
msg.unregister = Unregister.decode(pbu.buffer)
if msg.unregister.isNone():
return Opt.none(Message)
if ?pb.getField(5, pbd).optValue:
2022-09-30 08:41:04 +00:00
msg.discover = Discover.decode(pbd.buffer)
if msg.discover.isNone():
return Opt.none(Message)
if ?pb.getField(6, pbdr).optValue:
2022-09-30 08:41:04 +00:00
msg.discoverResponse = DiscoverResponse.decode(pbdr.buffer)
if msg.discoverResponse.isNone():
return Opt.none(Message)
Opt.some(msg)
2022-09-30 08:41:04 +00:00
type
RendezVousError* = object of LPError
RegisteredData = object
expiration: Moment
peerId: PeerId
data: Register
RendezVous* = ref object of LPProtocol
# Registered needs to be an offsetted sequence
# because we need stable index for the cookies.
registered: OffsettedSeq[RegisteredData]
# Namespaces is a table whose key is a salted namespace and
# the value is the index sequence corresponding to this
# namespace in the offsettedqueue.
namespaces: Table[string, seq[int]]
rng: ref HmacDrbgContext
salt: string
defaultDT: Moment
registerDeletionLoop: Future[void]
#registerEvent: AsyncEvent # TODO: to raise during the heartbeat
# + make the heartbeat sleep duration "smarter"
sema: AsyncSemaphore
peers: seq[PeerId]
cookiesSaved: Table[PeerId, Table[string, seq[byte]]]
switch: Switch
minDuration: Duration
maxDuration: Duration
minTTL: uint64
maxTTL: uint64
2022-09-30 08:41:04 +00:00
proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
if spr.len == 0:
return err("Empty peer record")
let signedEnv = ?SignedPeerRecord.decode(spr).mapErr(x => $x)
if signedEnv.data.peerId != peerId:
return err("Bad Peer ID")
return ok()
proc sendRegisterResponse(conn: Connection, ttl: uint64) {.async.} =
let msg = encode(
Message(
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(status: Ok, ttl: Opt.some(ttl))),
)
)
2022-09-30 08:41:04 +00:00
await conn.writeLp(msg.buffer)
proc sendRegisterResponseError(
conn: Connection, status: ResponseStatus, text: string = ""
) {.async.} =
let msg = encode(
Message(
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(status: status, text: Opt.some(text))),
)
)
2022-09-30 08:41:04 +00:00
await conn.writeLp(msg.buffer)
proc sendDiscoverResponse(
conn: Connection, s: seq[Register], cookie: Cookie
) {.async.} =
let msg = encode(
Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(
DiscoverResponse(
status: Ok, registrations: s, cookie: Opt.some(cookie.encode().buffer)
)
),
2022-09-30 08:41:04 +00:00
)
)
await conn.writeLp(msg.buffer)
proc sendDiscoverResponseError(
conn: Connection, status: ResponseStatus, text: string = ""
) {.async.} =
let msg = encode(
Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(status: status, text: Opt.some(text))),
)
)
2022-09-30 08:41:04 +00:00
await conn.writeLp(msg.buffer)
proc countRegister(rdv: RendezVous, peerId: PeerId): int =
let n = Moment.now()
for data in rdv.registered:
if data.peerId == peerId and data.expiration > n:
result.inc()
proc save(
rdv: RendezVous, ns: string, peerId: PeerId, r: Register, update: bool = true
) =
let nsSalted = ns & rdv.salt
discard rdv.namespaces.hasKeyOrPut(nsSalted, newSeq[int]())
try:
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].peerId == peerId:
if update == false:
return
rdv.registered[index].expiration = rdv.defaultDT
rdv.registered.add(
RegisteredData(
peerId: peerId,
expiration: Moment.now() + r.ttl.get(rdv.minTTL).int64.seconds,
2022-09-30 08:41:04 +00:00
data: r,
)
)
rdv.namespaces[nsSalted].add(rdv.registered.high)
# rdv.registerEvent.fire()
except KeyError:
doAssert false, "Should have key"
proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
trace "Received Register", peerId = conn.peerId, ns = r.ns
libp2p_rendezvous_register.inc()
2022-09-30 08:41:04 +00:00
if r.ns.len notin 1 .. 255:
return conn.sendRegisterResponseError(InvalidNamespace)
let ttl = r.ttl.get(rdv.minTTL)
if ttl notin rdv.minTTL .. rdv.maxTTL:
2022-09-30 08:41:04 +00:00
return conn.sendRegisterResponseError(InvalidTTL)
let pr = checkPeerRecord(r.signedPeerRecord, conn.peerId)
if pr.isErr():
return conn.sendRegisterResponseError(InvalidSignedPeerRecord, pr.error())
if rdv.countRegister(conn.peerId) >= RegistrationLimitPerPeer:
return conn.sendRegisterResponseError(NotAuthorized, "Registration limit reached")
rdv.save(r.ns, conn.peerId, r)
libp2p_rendezvous_registered.inc()
libp2p_rendezvous_namespaces.set(int64(rdv.namespaces.len))
2022-09-30 08:41:04 +00:00
conn.sendRegisterResponse(ttl)
proc unregister(rdv: RendezVous, conn: Connection, u: Unregister) =
trace "Received Unregister", peerId = conn.peerId, ns = u.ns
let nsSalted = u.ns & rdv.salt
try:
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].peerId == conn.peerId:
rdv.registered[index].expiration = rdv.defaultDT
libp2p_rendezvous_registered.dec()
2022-09-30 08:41:04 +00:00
except KeyError:
return
proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
trace "Received Discover", peerId = conn.peerId, ns = d.ns
libp2p_rendezvous_discover.inc()
2022-09-30 08:41:04 +00:00
if d.ns.len notin 0 .. 255:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
var limit = min(DiscoverLimit, d.limit.get(DiscoverLimit))
var cookie =
if d.cookie.isSome():
try:
Cookie.decode(d.cookie.tryGet()).tryGet()
2022-09-30 08:41:04 +00:00
except CatchableError:
await conn.sendDiscoverResponseError(InvalidCookie)
return
else:
Cookie(offset: rdv.registered.low().uint64 - 1)
if cookie.ns != d.ns or
cookie.offset notin rdv.registered.low().uint64 .. rdv.registered.high().uint64:
cookie = Cookie(offset: rdv.registered.low().uint64 - 1)
let
nsSalted = d.ns & rdv.salt
namespaces =
if d.ns != "":
try:
rdv.namespaces[nsSalted]
except KeyError:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
else:
toSeq(cookie.offset.int .. rdv.registered.high())
if namespaces.len() == 0:
await conn.sendDiscoverResponse(@[], Cookie())
return
var offset = namespaces[^1]
let n = Moment.now()
var s = collect(newSeq()):
for index in namespaces:
var reg = rdv.registered[index]
if limit == 0:
offset = index
break
if reg.expiration < n or index.uint64 <= cookie.offset:
continue
limit.dec()
reg.data.ttl = Opt.some((reg.expiration - Moment.now()).seconds.uint64)
2022-09-30 08:41:04 +00:00
reg.data
rdv.rng.shuffle(s)
await conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
proc advertisePeer(rdv: RendezVous, peer: PeerId, msg: seq[byte]) {.async.} =
proc advertiseWrap() {.async.} =
try:
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer:
await conn.close()
await conn.writeLp(msg)
let
buf = await conn.readLp(4096)
msgRecv = Message.decode(buf).tryGet()
2022-09-30 08:41:04 +00:00
if msgRecv.msgType != MessageType.RegisterResponse:
trace "Unexpected register response", peer, msgType = msgRecv.msgType
elif msgRecv.registerResponse.tryGet().status != ResponseStatus.Ok:
2022-09-30 08:41:04 +00:00
trace "Refuse to register", peer, response = msgRecv.registerResponse
else:
trace "Successfully registered", peer, response = msgRecv.registerResponse
2022-09-30 08:41:04 +00:00
except CatchableError as exc:
trace "exception in the advertise", description = exc.msg
2022-09-30 08:41:04 +00:00
finally:
rdv.sema.release()
2022-09-30 08:41:04 +00:00
await rdv.sema.acquire()
discard await advertiseWrap().withTimeout(5.seconds)
proc advertise*(
rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId]
) {.async.} =
2022-09-30 08:41:04 +00:00
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")
if ttl notin rdv.minDuration .. rdv.maxDuration:
raise newException(RendezVousError, "Invalid time to live: " & $ttl)
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
raise newException(RendezVousError, "Wrong Signed Peer Record")
2022-09-30 08:41:04 +00:00
let
r = Register(ns: ns, signedPeerRecord: sprBuff, ttl: Opt.some(ttl.seconds.uint64))
msg = encode(Message(msgType: MessageType.Register, register: Opt.some(r)))
2022-09-30 08:41:04 +00:00
rdv.save(ns, rdv.switch.peerInfo.peerId, r)
let futs = collect(newSeq()):
for peer in peers:
2022-09-30 08:41:04 +00:00
trace "Send Advertise", peerId = peer, ns
rdv.advertisePeer(peer, msg.buffer)
await allFutures(futs)
method advertise*(
rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration
) {.async, base.} =
await rdv.advertise(ns, ttl, rdv.peers)
2022-09-30 08:41:04 +00:00
proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
let
nsSalted = ns & rdv.salt
n = Moment.now()
try:
collect(newSeq()):
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].expiration > n:
let res = SignedPeerRecord.decode(rdv.registered[index].data.signedPeerRecord).valueOr:
continue
res.data
2022-09-30 08:41:04 +00:00
except KeyError as exc:
@[]
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
2022-09-30 08:41:04 +00:00
): Future[seq[PeerRecord]] {.async.} =
var
s: Table[PeerId, (PeerRecord, Register)]
limit: uint64
d = Discover(ns: ns)
if l <= 0 or l > DiscoverLimit.int:
raise newException(RendezVousError, "Invalid limit")
if ns.len notin 0 .. 255:
raise newException(RendezVousError, "Invalid namespace")
limit = l.uint64
proc requestPeer(peer: PeerId) {.async.} =
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer:
await conn.close()
d.limit = Opt.some(limit)
2022-09-30 08:41:04 +00:00
d.cookie =
try:
Opt.some(rdv.cookiesSaved[peer][ns])
2022-09-30 08:41:04 +00:00
except KeyError as exc:
Opt.none(seq[byte])
2022-09-30 08:41:04 +00:00
await conn.writeLp(
encode(Message(msgType: MessageType.Discover, discover: Opt.some(d))).buffer
)
2022-09-30 08:41:04 +00:00
let
buf = await conn.readLp(65536)
msgRcv = Message.decode(buf).valueOr:
debug "Message undecodable"
return
if msgRcv.msgType != MessageType.DiscoverResponse:
2022-09-30 08:41:04 +00:00
debug "Unexpected discover response", msgType = msgRcv.msgType
return
let resp = msgRcv.discoverResponse.valueOr:
debug "Discover response is empty"
return
2022-09-30 08:41:04 +00:00
if resp.status != ResponseStatus.Ok:
trace "Cannot discover", ns, status = resp.status, text = resp.text
return
resp.cookie.withValue(cookie):
if cookie.len() < 1000 and
rdv.cookiesSaved.hasKeyOrPut(peer, {ns: cookie}.toTable()):
rdv.cookiesSaved[peer][ns] = cookie
2022-09-30 08:41:04 +00:00
for r in resp.registrations:
if limit == 0:
return
let ttl = r.ttl.get(rdv.maxTTL + 1)
if ttl > rdv.maxTTL:
continue
let
spr = SignedPeerRecord.decode(r.signedPeerRecord).valueOr:
continue
pr = spr.data
2022-09-30 08:41:04 +00:00
if s.hasKey(pr.peerId):
let (prSaved, rSaved) = s[pr.peerId]
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(rdv.maxTTL) < ttl) or
2022-09-30 08:41:04 +00:00
prSaved.seqNo < pr.seqNo:
s[pr.peerId] = (pr, r)
else:
s[pr.peerId] = (pr, r)
limit.dec()
for (_, r) in s.values():
rdv.save(ns, peer, r, false)
for peer in peers:
2022-09-30 08:41:04 +00:00
if limit == 0:
break
if RendezVousCodec notin rdv.switch.peerStore[ProtoBook][peer]:
continue
try:
trace "Send Request", peerId = peer, ns
await peer.requestPeer()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception catch in request", description = exc.msg
2022-09-30 08:41:04 +00:00
return toSeq(s.values()).mapIt(it[0])
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
): Future[seq[PeerRecord]] {.async.} =
await rdv.request(ns, l, rdv.peers)
2022-09-30 08:41:04 +00:00
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
let nsSalted = ns & rdv.salt
try:
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].peerId == rdv.switch.peerInfo.peerId:
rdv.registered[index].expiration = rdv.defaultDT
except KeyError:
return
proc unsubscribe*(rdv: RendezVous, ns: string, peerIds: seq[PeerId]) {.async.} =
2022-09-30 08:41:04 +00:00
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")
2022-09-30 08:41:04 +00:00
let msg = encode(
Message(msgType: MessageType.Unregister, unregister: Opt.some(Unregister(ns: ns)))
)
2022-09-30 08:41:04 +00:00
proc unsubscribePeer(peerId: PeerId) {.async.} =
2022-09-30 08:41:04 +00:00
try:
let conn = await rdv.switch.dial(peerId, RendezVousCodec)
defer:
await conn.close()
await conn.writeLp(msg.buffer)
except CatchableError as exc:
trace "exception while unsubscribing", description = exc.msg
2022-09-30 08:41:04 +00:00
let futs = collect(newSeq()):
for peer in peerIds:
unsubscribePeer(peer)
discard await allFutures(futs).withTimeout(5.seconds)
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
rdv.unsubscribeLocally(ns)
await rdv.unsubscribe(ns, rdv.peers)
2022-09-30 08:41:04 +00:00
proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch = switch
proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
rdv.peers.add(peerId)
elif event.kind == PeerEventKind.Left:
rdv.peers.keepItIf(it != peerId)
2022-09-30 08:41:04 +00:00
rdv.switch.addPeerEventHandler(handlePeer, Joined)
rdv.switch.addPeerEventHandler(handlePeer, Left)
proc new*(
T: typedesc[RendezVous],
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
): T {.raises: [RendezVousError].} =
if minDuration < 1.minutes:
raise newException(RendezVousError, "TTL too short: 1 minute minimum")
if maxDuration > 72.hours:
raise newException(RendezVousError, "TTL too long: 72 hours maximum")
if minDuration >= maxDuration:
raise newException(RendezVousError, "Minimum TTL longer than maximum")
let
minTTL = minDuration.seconds.uint64
maxTTL = maxDuration.seconds.uint64
2022-09-30 08:41:04 +00:00
let rdv = T(
rng: rng,
salt: string.fromBytes(generateBytes(rng[], 8)),
registered: initOffsettedSeq[RegisteredData](1),
defaultDT: Moment.now() - 1.days,
#registerEvent: newAsyncEvent(),
sema: newAsyncSemaphore(SemaphoreDefaultSize),
minDuration: minDuration,
maxDuration: maxDuration,
minTTL: minTTL,
maxTTL: maxTTL,
2022-09-30 08:41:04 +00:00
)
logScope:
topics = "libp2p discovery rendezvous"
proc handleStream(conn: Connection, proto: string) {.async.} =
2022-09-30 08:41:04 +00:00
try:
let
buf = await conn.readLp(4096)
msg = Message.decode(buf).tryGet()
2022-09-30 08:41:04 +00:00
case msg.msgType
of MessageType.Register:
await rdv.register(conn, msg.register.tryGet())
2022-09-30 08:41:04 +00:00
of MessageType.RegisterResponse:
trace "Got an unexpected Register Response", response = msg.registerResponse
of MessageType.Unregister:
rdv.unregister(conn, msg.unregister.tryGet())
of MessageType.Discover:
await rdv.discover(conn, msg.discover.tryGet())
2022-09-30 08:41:04 +00:00
of MessageType.DiscoverResponse:
trace "Got an unexpected Discover Response", response = msg.discoverResponse
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in rendezvous handler", description = exc.msg
2022-09-30 08:41:04 +00:00
finally:
await conn.close()
rdv.handler = handleStream
rdv.codec = RendezVousCodec
return rdv
proc new*(
T: typedesc[RendezVous],
switch: Switch,
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
2022-09-30 08:41:04 +00:00
): T =
let rdv = T.new(rng, minDuration, maxDuration)
2022-09-30 08:41:04 +00:00
rdv.setup(switch)
return rdv
proc deletesRegister(rdv: RendezVous) {.async.} =
heartbeat "Register timeout", 1.minutes:
let n = Moment.now()
var total = 0
2022-09-30 08:41:04 +00:00
rdv.registered.flushIfIt(it.expiration < n)
for data in rdv.namespaces.mvalues():
data.keepItIf(it >= rdv.registered.offset)
total += data.len
libp2p_rendezvous_registered.set(int64(total))
libp2p_rendezvous_namespaces.set(int64(rdv.namespaces.len))
2022-09-30 08:41:04 +00:00
method start*(
rdv: RendezVous
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
let fut = newFuture[void]()
fut.complete()
2022-09-30 08:41:04 +00:00
if not rdv.registerDeletionLoop.isNil:
warn "Starting rendezvous twice"
return fut
2022-09-30 08:41:04 +00:00
rdv.registerDeletionLoop = rdv.deletesRegister()
rdv.started = true
fut
2022-09-30 08:41:04 +00:00
method stop*(rdv: RendezVous): Future[void] {.async: (raises: [], raw: true).} =
let fut = newFuture[void]()
fut.complete()
2022-09-30 08:41:04 +00:00
if rdv.registerDeletionLoop.isNil:
warn "Stopping rendezvous without starting it"
return fut
2022-09-30 08:41:04 +00:00
rdv.started = false
rdv.registerDeletionLoop.cancel()
rdv.registerDeletionLoop = nil
fut