Rework discovery (#288)

* use multiaddrs instead of ip/port

* rework to support updating spr

* fix tests

* fix compilation

* use base for base methods
This commit is contained in:
Dmitriy Ryajov 2022-10-27 07:44:56 -06:00 committed by GitHub
parent e50ea88411
commit 6e4a8b86ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 61 deletions

View File

@ -100,14 +100,10 @@ proc new*(T: type CodexServer, config: CodexConf): T =
PrivateKey.init(bytes).expect("valid key bytes") PrivateKey.init(bytes).expect("valid key bytes")
let
addresses =
config.listenPorts.mapIt(MultiAddress.init("/ip4/" & $config.listenIp & "/tcp/" & $(it.int)).tryGet()) &
@[MultiAddress.init("/ip4/" & $config.listenIp & "/udp/" & $(config.discoveryPort.int)).tryGet()]
switch = SwitchBuilder switch = SwitchBuilder
.new() .new()
.withPrivateKey(privateKey) .withPrivateKey(privateKey)
.withAddresses(addresses) .withAddresses(config.listenAddrs)
.withRng(Rng.instance()) .withRng(Rng.instance())
.withNoise() .withNoise()
.withMplex(5.minutes, 5.minutes) .withMplex(5.minutes, 5.minutes)
@ -124,15 +120,22 @@ proc new*(T: type CodexServer, config: CodexConf): T =
cache = CacheStore.new(cacheSize = config.cacheSize * MiB) cache = CacheStore.new(cacheSize = config.cacheSize * MiB)
let let
discoveryBootstrapNodes = config.bootstrapNodes
discoveryStore = Datastore(SQLiteDatastore.new( discoveryStore = Datastore(SQLiteDatastore.new(
config.dataDir / "dht") config.dataDir / "dht")
.expect("Should not fail!")) .expect("Should not fail!"))
announceAddrs =
if config.announceAddrs.len <= 0:
config.announceAddrs
else:
config.listenAddrs
blockDiscovery = Discovery.new( blockDiscovery = Discovery.new(
switch.peerInfo, switch.peerInfo.privateKey,
discoveryPort = config.discoveryPort, announceAddrs = config.announceAddrs,
bootstrapNodes = discoveryBootstrapNodes, discoveryPort = config.discoveryPort,
store = discoveryStore) bootstrapNodes = config.bootstrapNodes,
store = discoveryStore)
wallet = WalletRef.new(EthPrivateKey.random()) wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)

View File

@ -85,22 +85,21 @@ type
defaultValue: noCommand }: StartUpCommand defaultValue: noCommand }: StartUpCommand
of noCommand: of noCommand:
listenPorts* {. listenAddrs* {.
desc: "Specifies one or more listening ports for the node to listen on." desc: "Multi Addresses to listen on"
defaultValue: @[Port(0)] defaultValue: @[
defaultValueDesc: "0" MultiAddress.init("/ip4/0.0.0.0/tcp/0")
abbr: "l" .expect("Should init multiaddress")]
name: "listen-port" }: seq[Port] defaultValueDesc: "/ip4/0.0.0.0/tcp/0"
# TODO We should have two options: the listen IP and the public IP
# Currently, they are tied together, so we can't be discoverable
# behind a NAT
listenIp* {.
desc: "The public IP"
defaultValue: ValidIpAddress.init("0.0.0.0")
defaultValueDesc: "0.0.0.0"
abbr: "i" abbr: "i"
name: "listen-ip" }: ValidIpAddress name: "listen-addrs" }: seq[MultiAddress]
announceAddrs* {.
desc: "Multi Addresses to announce behind a NAT"
defaultValue: @[]
defaultValueDesc: ""
abbr: "a"
name: "announce-addrs" }: seq[MultiAddress]
discoveryPort* {. discoveryPort* {.
desc: "Specify the discovery (UDP) port" desc: "Specify the discovery (UDP) port"

View File

@ -33,25 +33,9 @@ export discv5
type type
Discovery* = ref object of RootObj Discovery* = ref object of RootObj
protocol: discv5.Protocol protocol: discv5.Protocol
localInfo: PeerInfo key: PrivateKey
announceAddrs: seq[MultiAddress]
proc new*( record: SignedPeerRecord
T: type Discovery,
localInfo: PeerInfo,
discoveryPort = 0.Port,
bootstrapNodes: seq[SignedPeerRecord] = @[],
store: Datastore = SQLiteDatastore.new(Memory)
.expect("Should not fail!")): T =
T(
protocol: newProtocol(
localInfo.privateKey,
bindPort = discoveryPort,
record = localInfo.signedPeerRecord,
bootstrapRecords = bootstrapNodes,
rng = Rng.instance(),
providers = ProvidersManager.new(store)),
localInfo: localInfo)
proc toNodeId*(cid: Cid): NodeId = proc toNodeId*(cid: Cid): NodeId =
## Cid to discovery id ## Cid to discovery id
@ -97,8 +81,7 @@ method provide*(d: Discovery, cid: Cid) {.async, base.} =
trace "Providing block", cid trace "Providing block", cid
let let
nodes = await d.protocol.addProvider( nodes = await d.protocol.addProvider(
cid.toNodeId(), cid.toNodeId(), d.record)
d.localInfo.signedPeerRecord)
if nodes.len <= 0: if nodes.len <= 0:
trace "Couldn't provide to any nodes!" trace "Couldn't provide to any nodes!"
@ -133,25 +116,72 @@ method provide*(d: Discovery, host: ca.Address) {.async, base.} =
trace "Providing host", host = $host trace "Providing host", host = $host
let let
nodes = await d.protocol.addProvider( nodes = await d.protocol.addProvider(
host.toNodeId(), host.toNodeId(), d.record)
d.localInfo.signedPeerRecord)
if nodes.len > 0: if nodes.len > 0:
trace "Provided to nodes", nodes = nodes.len trace "Provided to nodes", nodes = nodes.len
method removeProvider*(d: Discovery, peerId: PeerId): Future[void] = method removeProvider*(d: Discovery, peerId: PeerId): Future[void] {.base.} =
## Remove provider from providers table ## Remove provider from providers table
## ##
trace "Removing provider", peerId trace "Removing provider", peerId
d.protocol.removeProvidersLocal(peerId) d.protocol.removeProvidersLocal(peerId)
proc start*(d: Discovery) {.async.} = proc updateRecord*(d: Discovery, addrs: openArray[MultiAddress]) =
d.protocol.updateRecord( ## Update providers record
d.localInfo.signedPeerRecord.some) ##
.expect("updating SPR")
d.announceAddrs = @addrs
d.record = SignedPeerRecord.init(
d.key,
PeerRecord.init(
PeerId.init(d.key).expect("Should construct PeerId"),
d.announceAddrs)).expect("Should construct signed record")
if not d.protocol.isNil:
d.protocol.updateRecord(d.record.some)
.expect("should update SPR")
proc start*(d: Discovery) {.async.} =
d.protocol.open() d.protocol.open()
await d.protocol.start() await d.protocol.start()
proc stop*(d: Discovery) {.async.} = proc stop*(d: Discovery) {.async.} =
await d.protocol.closeWait() await d.protocol.closeWait()
proc new*(
T: type Discovery,
key: PrivateKey,
discoveryIp = IPv4_any(),
discoveryPort = 0.Port,
announceAddrs: openArray[MultiAddress] = [],
bootstrapNodes: openArray[SignedPeerRecord] = [],
store: Datastore = SQLiteDatastore.new(Memory)
.expect("Should not fail!")): T =
let
announceAddrs =
if announceAddrs.len <= 0:
@[
MultiAddress.init(
ValidIpAddress.init(discoveryIp),
IpTransportProtocol.tcpProtocol,
discoveryPort)]
else:
@announceAddrs
var
self = T(key: key)
self.updateRecord(announceAddrs)
self.protocol = newProtocol(
key,
bindIp = discoveryIp,
bindPort = discoveryPort,
record = self.record,
bootstrapRecords = bootstrapNodes,
rng = Rng.instance(),
providers = ProvidersManager.new(store))
self

View File

@ -29,26 +29,26 @@ proc generateNodes*(
for i in 0..<num: for i in 0..<num:
let let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
blockDiscovery = Discovery.new(switch.peerInfo, Port(0)) discovery = Discovery.new(switch.peerInfo.privateKey)
wallet = WalletRef.example wallet = WalletRef.example
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it )) localStore = CacheStore.new(blocks.mapIt( it ))
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
networkStore = NetworkStore.new(engine, localStore) networkStore = NetworkStore.new(engine, localStore)
switch.mount(network) switch.mount(network)
result.add(( result.add((
switch, switch,
blockDiscovery, discovery,
wallet, wallet,
network, network,
localStore, localStore,
peerStore, peerStore,
pendingBlocks, pendingBlocks,
discovery, blockDiscovery,
engine, engine,
networkStore)) networkStore))

View File

@ -81,8 +81,8 @@ suite "Storage Proofs Network":
switch1 = newStandardSwitch() switch1 = newStandardSwitch()
switch2 = newStandardSwitch() switch2 = newStandardSwitch()
discovery1 = MockDiscovery.new(switch1.peerInfo) discovery1 = MockDiscovery.new(switch1.peerInfo.privateKey)
discovery2 = MockDiscovery.new(switch2.peerInfo) discovery2 = MockDiscovery.new(switch2.peerInfo.privateKey)
stpNetwork1 = StpNetwork.new(switch1, discovery1) stpNetwork1 = StpNetwork.new(switch1, discovery1)
stpNetwork2 = StpNetwork.new(switch2, discovery2) stpNetwork2 = StpNetwork.new(switch2, discovery2)

View File

@ -80,7 +80,7 @@ suite "Test Node":
wallet = WalletRef.new(EthPrivateKey.random()) wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
localStore = CacheStore.new() localStore = CacheStore.new()
blockDiscovery = Discovery.new(switch.peerInfo, Port(0)) blockDiscovery = Discovery.new(switch.peerInfo.privateKey)
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)