mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-01 16:40:32 +00:00
simplify code upon review findings.
This commit is contained in:
parent
313d0fc53b
commit
8368c3f1dc
@ -13,16 +13,3 @@ type PublishOutcome* {.pure, public.} = enum
|
|||||||
DuplicateMessage
|
DuplicateMessage
|
||||||
NoPeersToPublish
|
NoPeersToPublish
|
||||||
CannotGenerateMessageId
|
CannotGenerateMessageId
|
||||||
|
|
||||||
proc `$`*(publishOutcome: PublishOutcome): string =
|
|
||||||
case publishOutcome
|
|
||||||
of NoTopicSpecified:
|
|
||||||
return "NoTopicSpecified"
|
|
||||||
of DuplicateMessage:
|
|
||||||
return "DuplicateMessage"
|
|
||||||
of NoPeersToPublish:
|
|
||||||
return "NoPeersToPublish"
|
|
||||||
of CannotGenerateMessageId:
|
|
||||||
return "CannotGenerateMessageId"
|
|
||||||
else:
|
|
||||||
return "unknown"
|
|
||||||
|
@ -195,7 +195,7 @@ method init*(f: FloodSub) =
|
|||||||
|
|
||||||
method doPublish*(
|
method doPublish*(
|
||||||
f: FloodSub, topic: string, data: seq[byte]
|
f: FloodSub, topic: string, data: seq[byte]
|
||||||
): Future[Result[int, PublishOutcome]] {.async: (raises: []).} =
|
): Future[PublishResult] {.async: (raises: []).} =
|
||||||
# base returns always 0
|
# base returns always 0
|
||||||
discard await procCall PubSub(f).doPublish(topic, data)
|
discard await procCall PubSub(f).doPublish(topic, data)
|
||||||
|
|
||||||
@ -212,7 +212,7 @@ method doPublish*(
|
|||||||
return err(NoPeersToPublish)
|
return err(NoPeersToPublish)
|
||||||
|
|
||||||
let (msg, msgId) = f.createMessage(topic, data).valueOr:
|
let (msg, msgId) = f.createMessage(topic, data).valueOr:
|
||||||
trace "Error generating message id, skipping publish", error = error
|
trace "Error creating message, skipping publish", error = error
|
||||||
return err(CannotGenerateMessageId)
|
return err(CannotGenerateMessageId)
|
||||||
|
|
||||||
trace "Created new message", payload = shortLog(msg), peers = peers.len, topic, msgId
|
trace "Created new message", payload = shortLog(msg), peers = peers.len, topic, msgId
|
||||||
|
@ -765,7 +765,7 @@ proc collectPeersForPublish(
|
|||||||
|
|
||||||
method doPublish*(
|
method doPublish*(
|
||||||
g: GossipSub, topic: string, data: seq[byte]
|
g: GossipSub, topic: string, data: seq[byte]
|
||||||
): Future[Result[int, PublishOutcome]] {.async: (raises: []).} =
|
): Future[PublishResult] {.async: (raises: []).} =
|
||||||
logScope:
|
logScope:
|
||||||
topic
|
topic
|
||||||
|
|
||||||
@ -788,7 +788,7 @@ method doPublish*(
|
|||||||
return err(NoPeersToPublish)
|
return err(NoPeersToPublish)
|
||||||
|
|
||||||
let (msg, msgId) = g.createMessage(topic, data).valueOr:
|
let (msg, msgId) = g.createMessage(topic, data).valueOr:
|
||||||
trace "Error generating message id, skipping publish", error = error
|
error "Error creating message, skipping publish", error = error
|
||||||
return err(CannotGenerateMessageId)
|
return err(CannotGenerateMessageId)
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
|
@ -145,6 +145,8 @@ type
|
|||||||
## we have to store it, which may be an attack vector.
|
## we have to store it, which may be an attack vector.
|
||||||
## This callback can be used to reject topic we're not interested in
|
## This callback can be used to reject topic we're not interested in
|
||||||
|
|
||||||
|
PublishResult* {.public.} = Result[int, PublishOutcome]
|
||||||
|
|
||||||
PubSub* {.public.} = ref object of LPProtocol
|
PubSub* {.public.} = ref object of LPProtocol
|
||||||
switch*: Switch # the switch used to dial/connect to peers
|
switch*: Switch # the switch used to dial/connect to peers
|
||||||
peerInfo*: PeerInfo # this peer's info
|
peerInfo*: PeerInfo # this peer's info
|
||||||
@ -555,7 +557,7 @@ proc subscribe*(p: PubSub, topic: string, handler: TopicHandler) {.public.} =
|
|||||||
|
|
||||||
method createMessage*(
|
method createMessage*(
|
||||||
p: PubSub, topic: string, data: seq[byte]
|
p: PubSub, topic: string, data: seq[byte]
|
||||||
): Result[(Message, MessageId), ValidationResult] {.base, gcsafe, raises: [].} =
|
): Result[(Message, MessageId), string] {.base, gcsafe, raises: [].} =
|
||||||
let
|
let
|
||||||
msg =
|
msg =
|
||||||
if p.anonymize:
|
if p.anonymize:
|
||||||
@ -563,7 +565,8 @@ method createMessage*(
|
|||||||
else:
|
else:
|
||||||
inc p.msgSeqno
|
inc p.msgSeqno
|
||||||
Message.init(some(p.peerInfo), data, topic, some(p.msgSeqno), p.sign)
|
Message.init(some(p.peerInfo), data, topic, some(p.msgSeqno), p.sign)
|
||||||
msgId = ?p.msgIdProvider(msg)
|
msgId = p.msgIdProvider(msg).valueOr:
|
||||||
|
return err("Failed to generate message id")
|
||||||
|
|
||||||
return ok((msg, msgId))
|
return ok((msg, msgId))
|
||||||
|
|
||||||
@ -572,7 +575,7 @@ method createMessage*(
|
|||||||
# but call `publish`.
|
# but call `publish`.
|
||||||
method doPublish*(
|
method doPublish*(
|
||||||
p: PubSub, topic: string, data: seq[byte]
|
p: PubSub, topic: string, data: seq[byte]
|
||||||
): Future[Result[int, PublishOutcome]] {.base, async: (raises: []).} =
|
): Future[PublishResult] {.base, async: (raises: []).} =
|
||||||
## publish to a ``topic``
|
## publish to a ``topic``
|
||||||
##
|
##
|
||||||
## The return value is the number of neighbours that we attempted to send the
|
## The return value is the number of neighbours that we attempted to send the
|
||||||
|
@ -179,33 +179,30 @@ proc hasBeforeSendObservers*(p: PubSubPeer): bool =
|
|||||||
|
|
||||||
proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) =
|
proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) =
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
if not (isNil(p.observers)) and p.observers[].len > 0:
|
if not (isNil(p.observers)):
|
||||||
for obs in p.observers[]:
|
for obs in p.observers[]:
|
||||||
if not (isNil(obs)): # TODO: should never be nil, but...
|
if not (isNil(obs)) and not (isNil(obs.onRecv)):
|
||||||
if not (isNil(obs.onRecv)):
|
|
||||||
obs.onRecv(p, msg)
|
obs.onRecv(p, msg)
|
||||||
|
|
||||||
proc validatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) =
|
proc validatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) =
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
if not (isNil(p.observers)) and p.observers[].len > 0:
|
if not (isNil(p.observers)):
|
||||||
for obs in p.observers[]:
|
for obs in p.observers[]:
|
||||||
if not (isNil(obs.onValidated)):
|
if not (isNil(obs)) and not (isNil(obs.onValidated)):
|
||||||
obs.onValidated(p, msg, msgId)
|
obs.onValidated(p, msg, msgId)
|
||||||
|
|
||||||
proc beforeSendObservers(p: PubSubPeer, msg: var RPCMsg) =
|
proc beforeSendObservers(p: PubSubPeer, msg: var RPCMsg) =
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
if not (isNil(p.observers)) and p.observers[].len > 0:
|
if not (isNil(p.observers)):
|
||||||
for obs in p.observers[]:
|
for obs in p.observers[]:
|
||||||
if not (isNil(obs)): # TODO: should never be nil, but...
|
if not (isNil(obs)) and not (isNil(obs.onBeforeSend)):
|
||||||
if not (isNil(obs.onBeforeSend)):
|
|
||||||
obs.onBeforeSend(p, msg)
|
obs.onBeforeSend(p, msg)
|
||||||
|
|
||||||
proc afterSentObservers(p: PubSubPeer, msg: RPCMsg) =
|
proc afterSentObservers(p: PubSubPeer, msg: RPCMsg) =
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
if not (isNil(p.observers)) and p.observers[].len > 0:
|
if not (isNil(p.observers)):
|
||||||
for obs in p.observers[]:
|
for obs in p.observers[]:
|
||||||
if not (isNil(obs)): # TODO: should never be nil, but...
|
if not (isNil(obs)) and not (isNil(obs.onAfterSent)):
|
||||||
if not (isNil(obs.onAfterSent)):
|
|
||||||
obs.onAfterSent(p, msg)
|
obs.onAfterSent(p, msg)
|
||||||
|
|
||||||
proc handle*(p: PubSubPeer, conn: Connection) {.async: (raises: []).} =
|
proc handle*(p: PubSubPeer, conn: Connection) {.async: (raises: []).} =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user