mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-12 07:44:57 +00:00
chore: print the full peerId in the logs
This commit is contained in:
parent
72a1f8c724
commit
f664229bb4
@ -125,7 +125,7 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
|
|||||||
# Peer already managed and ENR info is already saved
|
# Peer already managed and ENR info is already saved
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
trace "Adding peer to manager", peerId = $remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
||||||
|
|
||||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
||||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
||||||
@ -158,7 +158,7 @@ proc connectRelay*(pm: PeerManager,
|
|||||||
pm.addPeer(peer)
|
pm.addPeer(peer)
|
||||||
|
|
||||||
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
||||||
debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
|
debug "Connecting to relay peer", wireAddr=peer.addrs, peerId= $peerId, failedAttempts=failedAttempts
|
||||||
|
|
||||||
var deadline = sleepAsync(dialTimeout)
|
var deadline = sleepAsync(dialTimeout)
|
||||||
var workfut = pm.switch.connect(peerId, peer.addrs)
|
var workfut = pm.switch.connect(peerId, peer.addrs)
|
||||||
@ -210,7 +210,7 @@ proc dialPeer(pm: PeerManager,
|
|||||||
error "dial shall not be used to connect to relays"
|
error "dial shall not be used to connect to relays"
|
||||||
return none(Connection)
|
return none(Connection)
|
||||||
|
|
||||||
debug "Dialing peer", wireAddr=addrs, peerId=peerId, proto=proto
|
debug "Dialing peer", wireAddr=addrs, peerId= $peerId, proto=proto
|
||||||
|
|
||||||
# Dial Peer
|
# Dial Peer
|
||||||
let dialFut = pm.switch.dial(peerId, addrs, proto)
|
let dialFut = pm.switch.dial(peerId, addrs, proto)
|
||||||
@ -224,7 +224,7 @@ proc dialPeer(pm: PeerManager,
|
|||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
reasonFailed = "failed"
|
reasonFailed = "failed"
|
||||||
|
|
||||||
debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto
|
debug "Dialing peer failed", peerId= $peerId, reason=reasonFailed, proto=proto
|
||||||
|
|
||||||
return none(Connection)
|
return none(Connection)
|
||||||
|
|
||||||
@ -363,7 +363,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
|||||||
clusterOk = true
|
clusterOk = true
|
||||||
|
|
||||||
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk:
|
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk:
|
||||||
info "disconnecting from peer", peerId=peerId, reason=reason
|
info "disconnecting from peer", peerId= $peerId, reason=reason
|
||||||
asyncSpawn(pm.switch.disconnect(peerId))
|
asyncSpawn(pm.switch.disconnect(peerId))
|
||||||
pm.peerStore.delete(peerId)
|
pm.peerStore.delete(peerId)
|
||||||
|
|
||||||
@ -377,7 +377,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
|||||||
if peersBehindIp.len > pm.colocationLimit:
|
if peersBehindIp.len > pm.colocationLimit:
|
||||||
# in theory this should always be one, but just in case
|
# in theory this should always be one, but just in case
|
||||||
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
|
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
|
||||||
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
|
debug "Pruning connection due to ip colocation", peerId = $peerId, ip = ip
|
||||||
asyncSpawn(pm.switch.disconnect(peerId))
|
asyncSpawn(pm.switch.disconnect(peerId))
|
||||||
pm.peerStore.delete(peerId)
|
pm.peerStore.delete(peerId)
|
||||||
|
|
||||||
@ -498,7 +498,7 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
|
|||||||
warn "Can't add relay peer to service peers slots"
|
warn "Can't add relay peer to service peers slots"
|
||||||
return
|
return
|
||||||
|
|
||||||
info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
|
info "Adding peer to service slots", peerId = $remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
|
||||||
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
|
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
|
||||||
|
|
||||||
# Set peer for service slot
|
# Set peer for service slot
|
||||||
@ -518,7 +518,7 @@ proc reconnectPeers*(pm: PeerManager,
|
|||||||
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
|
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
|
||||||
# Check that the peer can be connected
|
# Check that the peer can be connected
|
||||||
if peerInfo.connectedness == CannotConnect:
|
if peerInfo.connectedness == CannotConnect:
|
||||||
debug "Not reconnecting to unreachable or non-existing peer", peerId=peerInfo.peerId
|
debug "Not reconnecting to unreachable or non-existing peer", peerId= $peerInfo.peerId
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Respect optional backoff period where applicable.
|
# Respect optional backoff period where applicable.
|
||||||
@ -532,7 +532,7 @@ proc reconnectPeers*(pm: PeerManager,
|
|||||||
|
|
||||||
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
|
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
|
||||||
if backoffTime > ZeroDuration:
|
if backoffTime > ZeroDuration:
|
||||||
debug "Backing off before reconnect...", peerId=peerInfo.peerId, backoffTime=backoffTime
|
debug "Backing off before reconnect...", peerId= $peerInfo.peerId, backoffTime=backoffTime
|
||||||
# We disconnected recently and still need to wait for a backoff period before connecting
|
# We disconnected recently and still need to wait for a backoff period before connecting
|
||||||
await sleepAsync(backoffTime)
|
await sleepAsync(backoffTime)
|
||||||
|
|
||||||
@ -705,19 +705,19 @@ proc selectPeer*(pm: PeerManager, proto: string, shard: Option[PubsubTopic] = no
|
|||||||
if proto == WakuRelayCodec:
|
if proto == WakuRelayCodec:
|
||||||
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
||||||
if peers.len > 0:
|
if peers.len > 0:
|
||||||
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
debug "Got peer from peerstore", peerId= $peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
||||||
return some(peers[0])
|
return some(peers[0])
|
||||||
debug "No peer found for protocol", protocol=proto
|
debug "No peer found for protocol", protocol=proto
|
||||||
return none(RemotePeerInfo)
|
return none(RemotePeerInfo)
|
||||||
|
|
||||||
# For other protocols, we select the peer that is slotted for the given protocol
|
# For other protocols, we select the peer that is slotted for the given protocol
|
||||||
pm.serviceSlots.withValue(proto, serviceSlot):
|
pm.serviceSlots.withValue(proto, serviceSlot):
|
||||||
debug "Got peer from service slots", peerId=serviceSlot[].peerId, multi=serviceSlot[].addrs[0], protocol=proto
|
debug "Got peer from service slots", peerId= $serviceSlot[].peerId, multi=serviceSlot[].addrs[0], protocol=proto
|
||||||
return some(serviceSlot[])
|
return some(serviceSlot[])
|
||||||
|
|
||||||
# If not slotted, we select a random peer for the given protocol
|
# If not slotted, we select a random peer for the given protocol
|
||||||
if peers.len > 0:
|
if peers.len > 0:
|
||||||
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
debug "Got peer from peerstore", peerId= $peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
||||||
return some(peers[0])
|
return some(peers[0])
|
||||||
debug "No peer found for protocol", protocol=proto
|
debug "No peer found for protocol", protocol=proto
|
||||||
return none(RemotePeerInfo)
|
return none(RemotePeerInfo)
|
||||||
|
@ -544,7 +544,7 @@ proc filterSubscribe*(node: WakuNode,
|
|||||||
let remotePeer = remotePeerRes.value
|
let remotePeer = remotePeerRes.value
|
||||||
|
|
||||||
if pubsubTopic.isSome():
|
if pubsubTopic.isSome():
|
||||||
info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
|
info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer= $remotePeer.peerId
|
||||||
|
|
||||||
let subRes = await node.wakuFilterClient.subscribe(remotePeer, pubsubTopic.get(), contentTopics)
|
let subRes = await node.wakuFilterClient.subscribe(remotePeer, pubsubTopic.get(), contentTopics)
|
||||||
if subRes.isOk():
|
if subRes.isOk():
|
||||||
@ -565,7 +565,7 @@ proc filterSubscribe*(node: WakuNode,
|
|||||||
|
|
||||||
var futures = collect(newSeq):
|
var futures = collect(newSeq):
|
||||||
for pubsub, topics in topicMap.pairs:
|
for pubsub, topics in topicMap.pairs:
|
||||||
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
|
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer= $remotePeer.peerId
|
||||||
let content = topics.mapIt($it)
|
let content = topics.mapIt($it)
|
||||||
node.wakuFilterClient.subscribe(remotePeer, $pubsub, content)
|
node.wakuFilterClient.subscribe(remotePeer, $pubsub, content)
|
||||||
|
|
||||||
@ -604,7 +604,7 @@ proc legacyFilterUnsubscribe*(node: WakuNode,
|
|||||||
let remotePeer = remotePeerRes.value
|
let remotePeer = remotePeerRes.value
|
||||||
|
|
||||||
if pubsubTopic.isSome():
|
if pubsubTopic.isSome():
|
||||||
info "deregistering legacy filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
|
info "deregistering legacy filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer= $remotePeer.peerId
|
||||||
|
|
||||||
let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer)
|
let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer)
|
||||||
|
|
||||||
@ -624,7 +624,7 @@ proc legacyFilterUnsubscribe*(node: WakuNode,
|
|||||||
|
|
||||||
var futures = collect(newSeq):
|
var futures = collect(newSeq):
|
||||||
for pubsub, topics in topicMap.pairs:
|
for pubsub, topics in topicMap.pairs:
|
||||||
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
|
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer= $remotePeer.peerId
|
||||||
let content = topics.mapIt($it)
|
let content = topics.mapIt($it)
|
||||||
node.wakuFilterClientLegacy.unsubscribe($pubsub, content, peer=remotePeer)
|
node.wakuFilterClientLegacy.unsubscribe($pubsub, content, peer=remotePeer)
|
||||||
|
|
||||||
@ -662,7 +662,7 @@ proc filterUnsubscribe*(node: WakuNode,
|
|||||||
let remotePeer = remotePeerRes.value
|
let remotePeer = remotePeerRes.value
|
||||||
|
|
||||||
if pubsubTopic.isSome():
|
if pubsubTopic.isSome():
|
||||||
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId
|
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer= $remotePeer.peerId
|
||||||
|
|
||||||
let unsubRes = await node.wakuFilterClient.unsubscribe(remotePeer, pubsubTopic.get(), contentTopics)
|
let unsubRes = await node.wakuFilterClient.unsubscribe(remotePeer, pubsubTopic.get(), contentTopics)
|
||||||
if unsubRes.isOk():
|
if unsubRes.isOk():
|
||||||
@ -684,7 +684,7 @@ proc filterUnsubscribe*(node: WakuNode,
|
|||||||
|
|
||||||
var futures = collect(newSeq):
|
var futures = collect(newSeq):
|
||||||
for pubsub, topics in topicMap.pairs:
|
for pubsub, topics in topicMap.pairs:
|
||||||
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
|
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer= $remotePeer.peerId
|
||||||
let content = topics.mapIt($it)
|
let content = topics.mapIt($it)
|
||||||
node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content)
|
node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content)
|
||||||
|
|
||||||
@ -724,11 +724,11 @@ proc filterUnsubscribeAll*(node: WakuNode,
|
|||||||
|
|
||||||
let remotePeer = remotePeerRes.value
|
let remotePeer = remotePeerRes.value
|
||||||
|
|
||||||
info "deregistering all filter subscription to content", peer=remotePeer.peerId
|
info "deregistering all filter subscription to content", peer= $remotePeer.peerId
|
||||||
|
|
||||||
let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer)
|
let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer)
|
||||||
if unsubRes.isOk():
|
if unsubRes.isOk():
|
||||||
info "unsubscribed from all content-topic", peerId=remotePeer.peerId
|
info "unsubscribed from all content-topic", peerId= $remotePeer.peerId
|
||||||
else:
|
else:
|
||||||
error "failed filter unsubscription from all content-topic", error=unsubRes.error
|
error "failed filter unsubscription from all content-topic", error=unsubRes.error
|
||||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||||
@ -930,7 +930,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message
|
|||||||
return err("waku lightpush client is nil")
|
return err("waku lightpush client is nil")
|
||||||
|
|
||||||
if pubsubTopic.isSome():
|
if pubsubTopic.isSome():
|
||||||
debug "publishing message with lightpush", pubsubTopic=pubsubTopic.get(), contentTopic=message.contentTopic, peer=peer.peerId
|
debug "publishing message with lightpush", pubsubTopic=pubsubTopic.get(), contentTopic=message.contentTopic, peer= $peer.peerId
|
||||||
return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer)
|
return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer)
|
||||||
|
|
||||||
let topicMapRes = parseSharding(pubsubTopic, message.contentTopic)
|
let topicMapRes = parseSharding(pubsubTopic, message.contentTopic)
|
||||||
@ -941,7 +941,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message
|
|||||||
else: topicMapRes.get()
|
else: topicMapRes.get()
|
||||||
|
|
||||||
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
|
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
|
||||||
debug "publishing message with lightpush", pubsubTopic=pubsub, contentTopic=message.contentTopic, peer=peer.peerId
|
debug "publishing message with lightpush", pubsubTopic=pubsub, contentTopic=message.contentTopic, peer= $peer.peerId
|
||||||
return await node.wakuLightpushClient.publish($pubsub, message, peer)
|
return await node.wakuLightpushClient.publish($pubsub, message, peer)
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
@ -1106,7 +1106,7 @@ proc printNodeNetworkInfo*(node: WakuNode): void =
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
warn "Could not retrieve localIp", msg=e.msg
|
warn "Could not retrieve localIp", msg=e.msg
|
||||||
|
|
||||||
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
|
info "PeerInfo", peerId = $peerInfo.peerId, addrs = peerInfo.addrs
|
||||||
|
|
||||||
for address in node.announcedAddresses:
|
for address in node.announcedAddresses:
|
||||||
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
|
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
|
||||||
|
@ -66,7 +66,7 @@ proc initProtocolHandler(wf: WakuFilterClientLegacy) =
|
|||||||
requestId = rpc.requestId
|
requestId = rpc.requestId
|
||||||
push = rpc.push.get()
|
push = rpc.push.get()
|
||||||
|
|
||||||
info "received filter message push", peerId=conn.peerId, requestId=requestId
|
info "received filter message push", peerId= $conn.peerId, requestId=requestId
|
||||||
wf.handleMessagePush(peerId, requestId, push)
|
wf.handleMessagePush(peerId, requestId, push)
|
||||||
|
|
||||||
wf.handler = handle
|
wf.handler = handle
|
||||||
|
@ -75,10 +75,10 @@ proc handleFilterRequest(wf: WakuFilterLegacy, peerId: PeerId, rpc: FilterRPC) =
|
|||||||
contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic)
|
contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic)
|
||||||
|
|
||||||
if subscribe:
|
if subscribe:
|
||||||
info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
|
info "added filter subscription", peerId= $peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
|
||||||
wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics)
|
wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics)
|
||||||
else:
|
else:
|
||||||
info "removed filter subscritpiton", peerId=peerId, contentTopics=contentTopics
|
info "removed filter subscription", peerId= $peerId, contentTopics=contentTopics
|
||||||
wf.subscriptions.removeSubscription(peerId, contentTopics)
|
wf.subscriptions.removeSubscription(peerId, contentTopics)
|
||||||
|
|
||||||
waku_legacy_filter_subscribers.set(wf.subscriptions.len.int64)
|
waku_legacy_filter_subscribers.set(wf.subscriptions.len.int64)
|
||||||
|
@ -37,7 +37,7 @@ proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo,
|
|||||||
filterSubscribeRequest: FilterSubscribeRequest):
|
filterSubscribeRequest: FilterSubscribeRequest):
|
||||||
Future[FilterSubscribeResult]
|
Future[FilterSubscribeResult]
|
||||||
{.async.} =
|
{.async.} =
|
||||||
trace "Sending filter subscribe request", peerId=servicePeer.peerId, filterSubscribeRequest
|
trace "Sending filter subscribe request", peerId= $servicePeer.peerId, filterSubscribeRequest
|
||||||
|
|
||||||
let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
|
let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
|
||||||
if connOpt.isNone():
|
if connOpt.isNone():
|
||||||
@ -132,12 +132,12 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
|
|||||||
|
|
||||||
let decodeRes = MessagePush.decode(buf)
|
let decodeRes = MessagePush.decode(buf)
|
||||||
if decodeRes.isErr():
|
if decodeRes.isErr():
|
||||||
error "Failed to decode message push", peerId=conn.peerId
|
error "Failed to decode message push", peerId= $conn.peerId
|
||||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
return
|
return
|
||||||
|
|
||||||
let messagePush = decodeRes.value #TODO: toAPI() split here
|
let messagePush = decodeRes.value #TODO: toAPI() split here
|
||||||
trace "Received message push", peerId=conn.peerId, messagePush
|
trace "Received message push", peerId= $conn.peerId, messagePush
|
||||||
|
|
||||||
for handler in wfc.pushHandlers:
|
for handler in wfc.pushHandlers:
|
||||||
asyncSpawn handler(messagePush.pubsubTopic,
|
asyncSpawn handler(messagePush.pubsubTopic,
|
||||||
|
@ -34,10 +34,10 @@ type
|
|||||||
maintenanceTask: TimerCallback
|
maintenanceTask: TimerCallback
|
||||||
|
|
||||||
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||||
trace "pinging subscriber", peerId=peerId
|
trace "pinging subscriber", peerId= $peerId
|
||||||
|
|
||||||
if peerId notin wf.subscriptions:
|
if peerId notin wf.subscriptions:
|
||||||
debug "pinging peer has no subscriptions", peerId=peerId
|
debug "pinging peer has no subscriptions", peerId= $peerId
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
@ -51,7 +51,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
|
|||||||
|
|
||||||
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||||
|
|
||||||
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
trace "subscribing peer to filter criteria", peerId= $peerId, filterCriteria=filterCriteria
|
||||||
|
|
||||||
if peerId in wf.subscriptions:
|
if peerId in wf.subscriptions:
|
||||||
# We already have a subscription for this peer. Try to add the new filter criteria.
|
# We already have a subscription for this peer. Try to add the new filter criteria.
|
||||||
@ -65,7 +65,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
|
|||||||
# We don't have a subscription for this peer yet. Try to add it.
|
# We don't have a subscription for this peer yet. Try to add it.
|
||||||
if wf.subscriptions.len() >= MaxTotalSubscriptions:
|
if wf.subscriptions.len() >= MaxTotalSubscriptions:
|
||||||
return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions"))
|
return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions"))
|
||||||
debug "creating new subscription", peerId=peerId
|
debug "creating new subscription", peerId= $peerId
|
||||||
wf.subscriptions[peerId] = filterCriteria
|
wf.subscriptions[peerId] = filterCriteria
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
@ -79,10 +79,10 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
|
|||||||
|
|
||||||
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||||
|
|
||||||
trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
trace "unsubscribing peer from filter criteria", peerId= $peerId, filterCriteria=filterCriteria
|
||||||
|
|
||||||
if peerId notin wf.subscriptions:
|
if peerId notin wf.subscriptions:
|
||||||
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
debug "unsubscribing peer has no subscriptions", peerId= $peerId
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
||||||
@ -94,7 +94,7 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
|
|||||||
peerSubscription.excl(filterCriteria)
|
peerSubscription.excl(filterCriteria)
|
||||||
|
|
||||||
if peerSubscription.len() == 0:
|
if peerSubscription.len() == 0:
|
||||||
debug "peer has no more subscriptions, removing subscription", peerId=peerId
|
debug "peer has no more subscriptions, removing subscription", peerId= $peerId
|
||||||
wf.subscriptions.del(peerId)
|
wf.subscriptions.del(peerId)
|
||||||
else:
|
else:
|
||||||
wf.subscriptions[peerId] = peerSubscription
|
wf.subscriptions[peerId] = peerSubscription
|
||||||
@ -103,16 +103,16 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
|
|||||||
|
|
||||||
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||||
if peerId notin wf.subscriptions:
|
if peerId notin wf.subscriptions:
|
||||||
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
debug "unsubscribing peer has no subscriptions", peerId= $peerId
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
debug "removing peer subscription", peerId=peerId
|
debug "removing peer subscription", peerId= $peerId
|
||||||
wf.subscriptions.del(peerId)
|
wf.subscriptions.del(peerId)
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest): FilterSubscribeResponse =
|
proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest): FilterSubscribeResponse =
|
||||||
info "received filter subscribe request", peerId=peerId, request=request
|
info "received filter subscribe request", peerId= $peerId, request=request
|
||||||
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
||||||
|
|
||||||
var subscribeResult: FilterSubscribeResult
|
var subscribeResult: FilterSubscribeResult
|
||||||
@ -183,7 +183,7 @@ proc maintainSubscriptions*(wf: WakuFilter) =
|
|||||||
## TODO: currently we only maintain by syncing with peer store. We could
|
## TODO: currently we only maintain by syncing with peer store. We could
|
||||||
## consider other metrics, such as subscription age, activity, etc.
|
## consider other metrics, such as subscription age, activity, etc.
|
||||||
if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec):
|
if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec):
|
||||||
debug "peer has been removed from peer store, removing subscription", peerId=peerId
|
debug "peer has been removed from peer store, removing subscription", peerId= $peerId
|
||||||
peersToRemove.add(peerId)
|
peersToRemove.add(peerId)
|
||||||
|
|
||||||
if peersToRemove.len() > 0:
|
if peersToRemove.len() > 0:
|
||||||
@ -224,13 +224,13 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa
|
|||||||
proc initProtocolHandler(wf: WakuFilter) =
|
proc initProtocolHandler(wf: WakuFilter) =
|
||||||
|
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
trace "filter subscribe request handler triggered", peerId=conn.peerId
|
trace "filter subscribe request handler triggered", peerId= $conn.peerId
|
||||||
|
|
||||||
let buf = await conn.readLp(MaxSubscribeSize)
|
let buf = await conn.readLp(MaxSubscribeSize)
|
||||||
|
|
||||||
let decodeRes = FilterSubscribeRequest.decode(buf)
|
let decodeRes = FilterSubscribeRequest.decode(buf)
|
||||||
if decodeRes.isErr():
|
if decodeRes.isErr():
|
||||||
error "Failed to decode filter subscribe request", peerId=conn.peerId, err=decodeRes.error
|
error "Failed to decode filter subscribe request", peerId= $conn.peerId, err=decodeRes.error
|
||||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -238,7 +238,7 @@ proc initProtocolHandler(wf: WakuFilter) =
|
|||||||
|
|
||||||
let response = wf.handleSubscribeRequest(conn.peerId, request)
|
let response = wf.handleSubscribeRequest(conn.peerId, request)
|
||||||
|
|
||||||
debug "sending filter subscribe response", peerId=conn.peerId, response=response
|
debug "sending filter subscribe response", peerId= $conn.peerId, response=response
|
||||||
|
|
||||||
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
|
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
|
||||||
return
|
return
|
||||||
|
@ -57,7 +57,7 @@ proc handleRequest*(wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]): Futur
|
|||||||
pubSubTopic = request.get().pubSubTopic
|
pubSubTopic = request.get().pubSubTopic
|
||||||
message = request.get().message
|
message = request.get().message
|
||||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||||
debug "push request", peerId=peerId, requestId=requestId, pubsubTopic=pubsubTopic
|
debug "push request", peerId= $peerId, requestId=requestId, pubsubTopic=pubsubTopic
|
||||||
|
|
||||||
let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
|
let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
|
||||||
isSuccess = handleRes.isOk()
|
isSuccess = handleRes.isOk()
|
||||||
|
@ -69,7 +69,7 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
requestId = reqRpc.requestId
|
requestId = reqRpc.requestId
|
||||||
request = reqRpc.query.get().toAPI()
|
request = reqRpc.query.get().toAPI()
|
||||||
|
|
||||||
info "received history query", peerId=conn.peerId, requestId=requestId, query=request
|
info "received history query", peerId= $conn.peerId, requestId=requestId, query=request
|
||||||
waku_store_queries.inc()
|
waku_store_queries.inc()
|
||||||
|
|
||||||
var responseRes: HistoryResult
|
var responseRes: HistoryResult
|
||||||
@ -95,7 +95,7 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
|
|
||||||
let response = responseRes.toRPC()
|
let response = responseRes.toRPC()
|
||||||
|
|
||||||
info "sending history response", peerId=conn.peerId, requestId=requestId, messages=response.messages.len
|
info "sending history response", peerId= $conn.peerId, requestId=requestId, messages=response.messages.len
|
||||||
|
|
||||||
let rpc = HistoryRPC(requestId: requestId, response: some(response))
|
let rpc = HistoryRPC(requestId: requestId, response: some(response))
|
||||||
await conn.writeLp(rpc.encode().buffer)
|
await conn.writeLp(rpc.encode().buffer)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user