mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-12 21:34:33 +00:00
Improve logs and loglevels, and general cleanup (#1059)
This commit is contained in:
parent
4d126f2461
commit
d337806301
@ -53,13 +53,7 @@ make update
|
|||||||
make fluffy
|
make fluffy
|
||||||
```
|
```
|
||||||
|
|
||||||
### Run fluffy test suite
|
### Run fluffy on public testnet
|
||||||
```bash
|
|
||||||
# From the nimbus-eth1 repository
|
|
||||||
make fluffy-test
|
|
||||||
```
|
|
||||||
|
|
||||||
### Run fluffy on (Nimbus) public testnet0
|
|
||||||
|
|
||||||
There is a fleet of fluffy nodes deployed, and to easily join these, the
|
There is a fleet of fluffy nodes deployed, and to easily join these, the
|
||||||
`--network:testnet0` option can be used.
|
`--network:testnet0` option can be used.
|
||||||
@ -70,7 +64,8 @@ There is a fleet of fluffy nodes deployed, and to easily join these, the
|
|||||||
|
|
||||||
> **_Note:_** This `--network` option will merely select a static set of
|
> **_Note:_** This `--network` option will merely select a static set of
|
||||||
specific bootstrap nodes belonging to a "testnet". Currently `testnet0` is the
|
specific bootstrap nodes belonging to a "testnet". Currently `testnet0` is the
|
||||||
only option, which results in connecting to designated fluffy bootstrap nodes.
|
only option, which results in connecting to the
|
||||||
|
[testnet bootstrap nodes](https://github.com/ethereum/portal-network-specs/blob/master/testnet.md#bootnodes).
|
||||||
It should be noted that there is no real way to distinguish a "specific" Portal
|
It should be noted that there is no real way to distinguish a "specific" Portal
|
||||||
network, and as long as the same Portal protocols are supported, nodes can
|
network, and as long as the same Portal protocols are supported, nodes can
|
||||||
simply connect to it and no real separation can be made.
|
simply connect to it and no real separation can be made.
|
||||||
@ -80,7 +75,7 @@ nodes with the same IPs in the routing tables. This is needed because the fleet
|
|||||||
of fluffy nodes runs on only 2 machines / network interfaces.
|
of fluffy nodes runs on only 2 machines / network interfaces.
|
||||||
|
|
||||||
|
|
||||||
The network is currently storing only the first 2500 mainnet blocks. This can be
|
The network is currently storing only the first 25000 mainnet blocks. This can be
|
||||||
tested by using the JSON-RPC call `eth_getBlockByHash`:
|
tested by using the JSON-RPC call `eth_getBlockByHash`:
|
||||||
```
|
```
|
||||||
# Get the hash of a block from your favorite block explorer, e.g.:
|
# Get the hash of a block from your favorite block explorer, e.g.:
|
||||||
@ -89,6 +84,18 @@ tested by using the JSON-RPC call `eth_getBlockByHash`:
|
|||||||
curl -s -X POST -H 'Content-Type: application/json' -d '{"jsonrpc":"2.0","id":"1","method":"eth_getBlockByHash","params":["0x8dda3a641653c0454569c3b5be529f58b14d2a5b5d87956664c746ce1e367c21", false]}' http://localhost:8545 | jq
|
curl -s -X POST -H 'Content-Type: application/json' -d '{"jsonrpc":"2.0","id":"1","method":"eth_getBlockByHash","params":["0x8dda3a641653c0454569c3b5be529f58b14d2a5b5d87956664c746ce1e367c21", false]}' http://localhost:8545 | jq
|
||||||
```
|
```
|
||||||
|
|
||||||
|
One can also use the `blockwalk` tool to walk down the blocks one by one, e.g:
|
||||||
|
```bash
|
||||||
|
make fluffy-tools
|
||||||
|
|
||||||
|
./build/blockwalk --block-hash:0xf6bfad56d1a45d1661506343dd1e511b5d7e17565b3ec293125ff0890b9709e5
|
||||||
|
```
|
||||||
|
|
||||||
|
### Run fluffy test suite
|
||||||
|
```bash
|
||||||
|
# From the nimbus-eth1 repository
|
||||||
|
make fluffy-test
|
||||||
|
```
|
||||||
|
|
||||||
### Run fluffy local testnet script
|
### Run fluffy local testnet script
|
||||||
```bash
|
```bash
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
# Nimbus
|
# Nimbus
|
||||||
# Copyright (c) 2021 Status Research & Development GmbH
|
# Copyright (c) 2021-2022 Status Research & Development GmbH
|
||||||
# Licensed and distributed under either of
|
# Licensed and distributed under either of
|
||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
@ -45,7 +45,7 @@ type
|
|||||||
vacStmt: SqliteStmt[NoParams, void]
|
vacStmt: SqliteStmt[NoParams, void]
|
||||||
getAll: SqliteStmt[NoParams, RowInfo]
|
getAll: SqliteStmt[NoParams, RowInfo]
|
||||||
|
|
||||||
# we want objects to be sorted from largest distance to closests
|
# Objects must be sorted from largest to closest distance
|
||||||
proc `<`(a, b: ObjInfo): bool =
|
proc `<`(a, b: ObjInfo): bool =
|
||||||
return a.distFrom < b.distFrom
|
return a.distFrom < b.distFrom
|
||||||
|
|
||||||
@ -72,24 +72,26 @@ proc new*(T: type ContentDB, path: string, inMemory = false): ContentDB =
|
|||||||
|
|
||||||
let kvStore = kvStore db.openKvStore().expectDb()
|
let kvStore = kvStore db.openKvStore().expectDb()
|
||||||
|
|
||||||
# this need to go after `openKvStore`, as it checks that the table name kvstore
|
# This needs to go after `openKvStore`, as it checks whether the table name
|
||||||
# already exists.
|
# kvstore already exists.
|
||||||
let getKeysStmt = db.prepareStmt(
|
let getKeysStmt = db.prepareStmt(
|
||||||
"SELECT key, length(value) FROM kvstore",
|
"SELECT key, length(value) FROM kvstore",
|
||||||
NoParams, RowInfo
|
NoParams, RowInfo
|
||||||
).get()
|
).get()
|
||||||
|
|
||||||
ContentDB(kv: kvStore, sizeStmt: getSizeStmt, vacStmt: vacStmt, getAll: getKeysStmt)
|
ContentDB(
|
||||||
|
kv: kvStore, sizeStmt: getSizeStmt, vacStmt: vacStmt, getAll: getKeysStmt)
|
||||||
|
|
||||||
proc getNFurthestElements*(db: ContentDB, target: UInt256, n: uint64): seq[ObjInfo] =
|
proc getNFurthestElements*(
|
||||||
## Get at most n furthest elements from database in order from furthest to closest.
|
db: ContentDB, target: UInt256, n: uint64): seq[ObjInfo] =
|
||||||
## We are also returning payload lengths so caller can decide how many of those elements
|
## Get at most n furthest elements from db in order from furthest to closest.
|
||||||
## need to be deleted.
|
## Payload lengths are also returned so the caller can decide how many of
|
||||||
|
## those elements need to be deleted.
|
||||||
##
|
##
|
||||||
## Currently it uses xor metric
|
## Currently it uses xor metric
|
||||||
##
|
##
|
||||||
## Currently works by querying for all elements in database and doing all necessary
|
## Currently works by querying for all elements in database and doing all
|
||||||
## work on program level. This is mainly due to two facts:
|
## necessary work on program level. This is mainly due to two facts:
|
||||||
## - sqlite does not have build xor function, also it does not handle bitwise
|
## - sqlite does not have build xor function, also it does not handle bitwise
|
||||||
## operations on blobs as expected
|
## operations on blobs as expected
|
||||||
## - our nim wrapper for sqlite does not support create_function api of sqlite
|
## - our nim wrapper for sqlite does not support create_function api of sqlite
|
||||||
@ -104,10 +106,12 @@ proc getNFurthestElements*(db: ContentDB, target: UInt256, n: uint64): seq[ObjIn
|
|||||||
var ri: RowInfo
|
var ri: RowInfo
|
||||||
for e in db.getAll.exec(ri):
|
for e in db.getAll.exec(ri):
|
||||||
let contentId = UInt256.fromBytesBE(ri.contentId)
|
let contentId = UInt256.fromBytesBE(ri.contentId)
|
||||||
# TODO: Currently it assumes xor distance, but when we start testing networks with
|
# TODO: Currently it assumes xor distance, but when we start testing
|
||||||
# other distance functions this needs to be adjusted to the custom distance function
|
# networks with other distance functions this needs to be adjusted to the
|
||||||
|
# custom distance function
|
||||||
let dist = contentId xor target
|
let dist = contentId xor target
|
||||||
let obj = ObjInfo(contentId: ri.contentId, payloadLength: ri.payloadLength, distFrom: dist)
|
let obj = ObjInfo(
|
||||||
|
contentId: ri.contentId, payloadLength: ri.payloadLength, distFrom: dist)
|
||||||
|
|
||||||
if (uint64(len(heap)) < n):
|
if (uint64(len(heap)) < n):
|
||||||
heap.push(obj)
|
heap.push(obj)
|
||||||
@ -125,10 +129,11 @@ proc getNFurthestElements*(db: ContentDB, target: UInt256, n: uint64): seq[ObjIn
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
proc reclaimSpace*(db: ContentDB): void =
|
proc reclaimSpace*(db: ContentDB): void =
|
||||||
## Runs sqlie VACUMM commands which rebuilds db, repacking it into a minimal amount of disk space
|
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a
|
||||||
|
## minimal amount of disk space.
|
||||||
## Ideal mode of operation, is to run it after several deletes.
|
## Ideal mode of operation, is to run it after several deletes.
|
||||||
## Another options would be to run 'PRAGMA auto_vacuum = FULL;' statement at the start of
|
## Another options would be to run 'PRAGMA auto_vacuum = FULL;' statement at
|
||||||
## db to leave it in sqlite power to clean up
|
## the start of db to leave it up to sqlite to clean up
|
||||||
db.vacStmt.exec().expectDb()
|
db.vacStmt.exec().expectDb()
|
||||||
|
|
||||||
proc size*(db: ContentDB): int64 =
|
proc size*(db: ContentDB): int64 =
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
# Nimbus
|
# Nimbus
|
||||||
# Copyright (c) 2021 Status Research & Development GmbH
|
# Copyright (c) 2021-2022 Status Research & Development GmbH
|
||||||
# Licensed and distributed under either of
|
# Licensed and distributed under either of
|
||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
@ -9,7 +9,7 @@
|
|||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sugar],
|
std/[options, sugar],
|
||||||
stew/results, chronos,
|
stew/results, chronos, chronicles,
|
||||||
eth/[common/eth_types, rlp],
|
eth/[common/eth_types, rlp],
|
||||||
eth/p2p/discoveryv5/[protocol, enr],
|
eth/p2p/discoveryv5/[protocol, enr],
|
||||||
../../content_db,
|
../../content_db,
|
||||||
@ -17,6 +17,9 @@ import
|
|||||||
../wire/[portal_protocol, portal_stream, portal_protocol_config],
|
../wire/[portal_protocol, portal_stream, portal_protocol_config],
|
||||||
./history_content
|
./history_content
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "portal_hist"
|
||||||
|
|
||||||
const
|
const
|
||||||
historyProtocolId* = [byte 0x50, 0x0B]
|
historyProtocolId* = [byte 0x50, 0x0B]
|
||||||
|
|
||||||
@ -121,11 +124,13 @@ proc getBlockHeader*(
|
|||||||
let maybeHeaderFromDb = h.getContentFromDb(BlockHeader, contentId)
|
let maybeHeaderFromDb = h.getContentFromDb(BlockHeader, contentId)
|
||||||
|
|
||||||
if maybeHeaderFromDb.isSome():
|
if maybeHeaderFromDb.isSome():
|
||||||
|
info "Fetched block header from database", hash
|
||||||
return maybeHeaderFromDb
|
return maybeHeaderFromDb
|
||||||
|
|
||||||
let maybeHeaderContent = await h.portalProtocol.contentLookup(keyEncoded, contentId)
|
let maybeHeaderContent = await h.portalProtocol.contentLookup(keyEncoded, contentId)
|
||||||
|
|
||||||
if maybeHeaderContent.isNone():
|
if maybeHeaderContent.isNone():
|
||||||
|
warn "Failed fetching block header from the network", hash
|
||||||
return none(BlockHeader)
|
return none(BlockHeader)
|
||||||
|
|
||||||
let headerContent = maybeHeaderContent.unsafeGet()
|
let headerContent = maybeHeaderContent.unsafeGet()
|
||||||
@ -133,6 +138,7 @@ proc getBlockHeader*(
|
|||||||
let maybeHeader = validateHeaderBytes(headerContent.content, hash)
|
let maybeHeader = validateHeaderBytes(headerContent.content, hash)
|
||||||
|
|
||||||
if maybeHeader.isSome():
|
if maybeHeader.isSome():
|
||||||
|
info "Fetched block header from the network", hash
|
||||||
# Content is valid we can propagate it to interested peers
|
# Content is valid we can propagate it to interested peers
|
||||||
h.portalProtocol.triggerPoke(
|
h.portalProtocol.triggerPoke(
|
||||||
headerContent.nodesInterestedInContent,
|
headerContent.nodesInterestedInContent,
|
||||||
@ -163,11 +169,13 @@ proc getBlock*(
|
|||||||
let maybeBodyFromDb = h.getContentFromDb(BlockBody, contentId)
|
let maybeBodyFromDb = h.getContentFromDb(BlockBody, contentId)
|
||||||
|
|
||||||
if maybeBodyFromDb.isSome():
|
if maybeBodyFromDb.isSome():
|
||||||
|
info "Fetched block body from database", hash
|
||||||
return some[Block]((header, maybeBodyFromDb.unsafeGet()))
|
return some[Block]((header, maybeBodyFromDb.unsafeGet()))
|
||||||
|
|
||||||
let maybeBodyContent = await h.portalProtocol.contentLookup(keyEncoded, contentId)
|
let maybeBodyContent = await h.portalProtocol.contentLookup(keyEncoded, contentId)
|
||||||
|
|
||||||
if maybeBodyContent.isNone():
|
if maybeBodyContent.isNone():
|
||||||
|
warn "Failed fetching block body from the network", hash
|
||||||
return none(Block)
|
return none(Block)
|
||||||
|
|
||||||
let bodyContent = maybeBodyContent.unsafeGet()
|
let bodyContent = maybeBodyContent.unsafeGet()
|
||||||
@ -175,6 +183,7 @@ proc getBlock*(
|
|||||||
let maybeBody = validateBodyBytes(bodyContent.content, header.txRoot, header.ommersHash)
|
let maybeBody = validateBodyBytes(bodyContent.content, header.txRoot, header.ommersHash)
|
||||||
|
|
||||||
if maybeBody.isNone():
|
if maybeBody.isNone():
|
||||||
|
info "Fetched block body from the network", hash
|
||||||
return none(Block)
|
return none(Block)
|
||||||
|
|
||||||
let blockBody = maybeBody.unsafeGet()
|
let blockBody = maybeBody.unsafeGet()
|
||||||
@ -226,7 +235,7 @@ proc new*(
|
|||||||
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
||||||
|
|
||||||
proc start*(p: HistoryNetwork) =
|
proc start*(p: HistoryNetwork) =
|
||||||
info "Starting Portal history sub-network",
|
info "Starting Portal execution history network",
|
||||||
protocolId = p.portalProtocol.protocolId
|
protocolId = p.portalProtocol.protocolId
|
||||||
p.portalProtocol.start()
|
p.portalProtocol.start()
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
# Nimbus
|
# Nimbus
|
||||||
# Copyright (c) 2021 Status Research & Development GmbH
|
# Copyright (c) 2021-2022 Status Research & Development GmbH
|
||||||
# Licensed and distributed under either of
|
# Licensed and distributed under either of
|
||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
@ -7,13 +7,16 @@
|
|||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sugar],
|
std/[options, sugar],
|
||||||
stew/results, chronos,
|
stew/results, chronos, chronicles,
|
||||||
eth/p2p/discoveryv5/[protocol, enr],
|
eth/p2p/discoveryv5/[protocol, enr],
|
||||||
../../content_db,
|
../../content_db,
|
||||||
../wire/[portal_protocol, portal_stream, portal_protocol_config],
|
../wire/[portal_protocol, portal_stream, portal_protocol_config],
|
||||||
./state_content,
|
./state_content,
|
||||||
./state_distance
|
./state_distance
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "portal_state"
|
||||||
|
|
||||||
const
|
const
|
||||||
stateProtocolId* = [byte 0x50, 0x0A]
|
stateProtocolId* = [byte 0x50, 0x0A]
|
||||||
|
|
||||||
@ -75,7 +78,7 @@ proc new*(
|
|||||||
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
||||||
|
|
||||||
proc start*(n: StateNetwork) =
|
proc start*(n: StateNetwork) =
|
||||||
info "Starting Portal state sub-network",
|
info "Starting Portal execution state network",
|
||||||
protocolId = n.portalProtocol.protocolId
|
protocolId = n.portalProtocol.protocolId
|
||||||
n.portalProtocol.start()
|
n.portalProtocol.start()
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
# Nimbus - Portal Network
|
# Nimbus - Portal Network
|
||||||
# Copyright (c) 2021 Status Research & Development GmbH
|
# Copyright (c) 2021-2022 Status Research & Development GmbH
|
||||||
# Licensed and distributed under either of
|
# Licensed and distributed under either of
|
||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
@ -589,7 +589,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if connectionResult.isErr():
|
if connectionResult.isErr():
|
||||||
error "Utp connection error while trying to find content",
|
debug "Utp connection error while trying to find content",
|
||||||
msg = connectionResult.error
|
msg = connectionResult.error
|
||||||
return err("Error connecting uTP socket")
|
return err("Error connecting uTP socket")
|
||||||
|
|
||||||
@ -676,7 +676,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if connectionResult.isErr():
|
if connectionResult.isErr():
|
||||||
error "Utp connection error while trying to offer content",
|
debug "Utp connection error while trying to offer content",
|
||||||
msg = connectionResult.error
|
msg = connectionResult.error
|
||||||
return err("Error connecting uTP socket")
|
return err("Error connecting uTP socket")
|
||||||
|
|
||||||
@ -688,7 +688,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
if b:
|
if b:
|
||||||
let dataWritten = await clientSocket.write(o.contentList[i].content)
|
let dataWritten = await clientSocket.write(o.contentList[i].content)
|
||||||
if dataWritten.isErr:
|
if dataWritten.isErr:
|
||||||
error "Error writing requested data", error = dataWritten.error
|
debug "Error writing requested data", error = dataWritten.error
|
||||||
# No point in trying to continue writing data
|
# No point in trying to continue writing data
|
||||||
clientSocket.close()
|
clientSocket.close()
|
||||||
return err("Error writing requested data")
|
return err("Error writing requested data")
|
||||||
@ -704,7 +704,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
let content = maybeContent.get()
|
let content = maybeContent.get()
|
||||||
let dataWritten = await clientSocket.write(content)
|
let dataWritten = await clientSocket.write(content)
|
||||||
if dataWritten.isErr:
|
if dataWritten.isErr:
|
||||||
error "Error writing requested data", error = dataWritten.error
|
debug "Error writing requested data", error = dataWritten.error
|
||||||
# No point in trying to continue writing data
|
# No point in trying to continue writing data
|
||||||
clientSocket.close()
|
clientSocket.close()
|
||||||
return err("Error writing requested data")
|
return err("Error writing requested data")
|
||||||
@ -823,19 +823,20 @@ proc triggerPoke*(
|
|||||||
contentKey: ByteList,
|
contentKey: ByteList,
|
||||||
content: seq[byte]) =
|
content: seq[byte]) =
|
||||||
## Triggers asynchronous offer-accept interaction to provided nodes.
|
## Triggers asynchronous offer-accept interaction to provided nodes.
|
||||||
## Provided content should be in range of provided nodes
|
## Provided content should be in range of provided nodes.
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
if not p.offerQueue.full():
|
if not p.offerQueue.full():
|
||||||
try:
|
try:
|
||||||
let ci = ContentInfo(contentKey: contentKey, content: content)
|
let
|
||||||
let list = List[ContentInfo, contentKeysLimit].init(@[ci])
|
ci = ContentInfo(contentKey: contentKey, content: content)
|
||||||
let req = OfferRequest(dst: node, kind: Direct, contentList: list)
|
list = List[ContentInfo, contentKeysLimit].init(@[ci])
|
||||||
|
req = OfferRequest(dst: node, kind: Direct, contentList: list)
|
||||||
p.offerQueue.putNoWait(req)
|
p.offerQueue.putNoWait(req)
|
||||||
except AsyncQueueFullError as e:
|
except AsyncQueueFullError as e:
|
||||||
# should not happen as we always check is full before putting element to the queue
|
# Should not occur as full() check is done.
|
||||||
raiseAssert(e.msg)
|
raiseAssert(e.msg)
|
||||||
else:
|
else:
|
||||||
# offer queue full, do not start more offer offer-accept interactions
|
# Offer queue is full, do not start more offer-accept interactions
|
||||||
return
|
return
|
||||||
|
|
||||||
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
|
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
|
||||||
@ -894,7 +895,8 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
|
|||||||
case content.kind
|
case content.kind
|
||||||
of Nodes:
|
of Nodes:
|
||||||
let maybeRadius = p.radiusCache.get(content.src.id)
|
let maybeRadius = p.radiusCache.get(content.src.id)
|
||||||
if maybeRadius.isSome() and p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
|
if maybeRadius.isSome() and
|
||||||
|
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
|
||||||
# Only return nodes which may be interested in content.
|
# Only return nodes which may be interested in content.
|
||||||
# No need to check for duplicates in nodesWithoutContent
|
# No need to check for duplicates in nodesWithoutContent
|
||||||
# as requests are never made two times to the same node.
|
# as requests are never made two times to the same node.
|
||||||
@ -1028,9 +1030,11 @@ proc processContent(
|
|||||||
# Store content, should we recheck radius?
|
# Store content, should we recheck radius?
|
||||||
p.contentDB.put(contentId, content)
|
p.contentDB.put(contentId, content)
|
||||||
|
|
||||||
|
info "Received valid offered content", contentKey
|
||||||
|
|
||||||
asyncSpawn neighborhoodGossip(p, contentKeys, content)
|
asyncSpawn neighborhoodGossip(p, contentKeys, content)
|
||||||
else:
|
else:
|
||||||
error "Received invalid content", contentKey
|
error "Received invalid offered content", contentKey
|
||||||
|
|
||||||
proc seedTable*(p: PortalProtocol) =
|
proc seedTable*(p: PortalProtocol) =
|
||||||
## Seed the table with specifically provided Portal bootstrap nodes. These are
|
## Seed the table with specifically provided Portal bootstrap nodes. These are
|
||||||
|
@ -18,6 +18,9 @@ import
|
|||||||
|
|
||||||
export utp_discv5_protocol
|
export utp_discv5_protocol
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "portal_stream"
|
||||||
|
|
||||||
const
|
const
|
||||||
utpProtocolId* = "utp".toBytes()
|
utpProtocolId* = "utp".toBytes()
|
||||||
defaultConnectionTimeout = 5.seconds
|
defaultConnectionTimeout = 5.seconds
|
||||||
@ -131,25 +134,28 @@ proc addContentRequest*(
|
|||||||
proc connectTo*(
|
proc connectTo*(
|
||||||
stream: PortalStream,
|
stream: PortalStream,
|
||||||
nodeAddress: NodeAddress,
|
nodeAddress: NodeAddress,
|
||||||
connectionId: uint16): Future[Result[UtpSocket[NodeAddress], string]] {.async.} =
|
connectionId: uint16):
|
||||||
|
Future[Result[UtpSocket[NodeAddress], string]] {.async.} =
|
||||||
let socketRes = await stream.transport.connectTo(nodeAddress, connectionId)
|
let socketRes = await stream.transport.connectTo(nodeAddress, connectionId)
|
||||||
|
|
||||||
if socketRes.isErr():
|
if socketRes.isErr():
|
||||||
case socketRes.error.kind
|
case socketRes.error.kind
|
||||||
of SocketAlreadyExists:
|
of SocketAlreadyExists:
|
||||||
# This error means that there is already socket to this nodeAddress with given
|
# This means that there is already a socket to this nodeAddress with given
|
||||||
# connection id, in our use case it most probably means that other side sent us
|
# connection id. It probably means that a peersent us a connection id
|
||||||
# connection id which is already used.
|
# which is already in use..
|
||||||
# For now we just fail connection and return an error. Another strategy to consider
|
# For now just fail the connection and return an error. Another strategy
|
||||||
# would be to check what is the connection status, and then re-use it, or
|
# to consider would be to check what is the connection status, and then
|
||||||
# close it and retry connection.
|
# re-use it, or close it and retry connection.
|
||||||
let msg = "Socket to " & $nodeAddress & "with connection id: " & $connectionId & " already exists"
|
let msg = "Socket to " & $nodeAddress & "with connection id: " &
|
||||||
|
$connectionId & " already exists"
|
||||||
return err(msg)
|
return err(msg)
|
||||||
of ConnectionTimedOut:
|
of ConnectionTimedOut:
|
||||||
# Another strategy for handling this error would be to retry connecting a few times
|
# Another strategy for handling this error would be to retry connecting a
|
||||||
# before giving up. But we know (as we control the uTP impl) that this error will only
|
# few times before giving up. But we know (as we control the uTP impl)
|
||||||
# be returned when a SYN packet was re-sent 3 times and failed to be acked. This
|
# that this error will only occur when a SYN packet was re-sent 3 times
|
||||||
# should be enough for us to known that the remote host is not reachable.
|
# and failed to be acked. This should be enough of indication that the
|
||||||
|
# remote host is not reachable.
|
||||||
let msg = "uTP timeout while trying to connect to " & $nodeAddress
|
let msg = "uTP timeout while trying to connect to " & $nodeAddress
|
||||||
return err(msg)
|
return err(msg)
|
||||||
|
|
||||||
@ -180,8 +186,8 @@ proc readAndClose(
|
|||||||
if not stream.contentHandler.isNil():
|
if not stream.contentHandler.isNil():
|
||||||
stream.contentHandler(stream, offer.contentKeys, content)
|
stream.contentHandler(stream, offer.contentKeys, content)
|
||||||
|
|
||||||
# Destroy socket and not closing as we already received. Closing would send
|
# Destroy socket and not closing as we already received FIN. Closing would
|
||||||
# also a FIN from our side, see also:
|
# send also a FIN from our side, see also:
|
||||||
# https://github.com/status-im/nim-eth/blob/b2dab4be0839c95ca2564df9eacf81995bf57802/eth/utp/utp_socket.nim#L1223
|
# https://github.com/status-im/nim-eth/blob/b2dab4be0839c95ca2564df9eacf81995bf57802/eth/utp/utp_socket.nim#L1223
|
||||||
await socket.destroyWait()
|
await socket.destroyWait()
|
||||||
else:
|
else:
|
||||||
|
2
vendor/nim-eth
vendored
2
vendor/nim-eth
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 6d4b1f4fe162e76ef2b1e33fd2a19ef76f0276ea
|
Subproject commit 01684a2130051357bf0e5bf0c9357fcf195f15d9
|
Loading…
x
Reference in New Issue
Block a user