mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 05:13:07 +00:00
check and mark seen messages prior to validation
this allows us to avoid revalidating messages, either because they were concurrently received or were previously rejected by the validator. also allows us to filter invalid messages from gossip.
This commit is contained in:
parent
448f380722
commit
b09c9d1a48
21
pubsub.go
21
pubsub.go
@ -245,7 +245,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
p.pushMsg(vals, p.host.ID(), msg)
|
||||
|
||||
case req := <-p.sendMsg:
|
||||
p.maybePublishMessage(req.from, req.msg.Message)
|
||||
p.publishMessage(req.from, req.msg.Message)
|
||||
|
||||
case req := <-p.addVal:
|
||||
p.addValidator(req)
|
||||
@ -409,6 +409,12 @@ func msgID(pmsg *pb.Message) string {
|
||||
|
||||
// pushMsg pushes a message performing validation as necessary
|
||||
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
|
||||
id := msgID(msg.Message)
|
||||
if p.seenMessage(id) {
|
||||
return
|
||||
}
|
||||
p.markSeen(id)
|
||||
|
||||
if len(vals) > 0 {
|
||||
// validation is asynchronous and globally throttled with the throttleValidate semaphore.
|
||||
// the purpose of the global throttle is to bound the goncurrency possible from incoming
|
||||
@ -426,7 +432,7 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
|
||||
return
|
||||
}
|
||||
|
||||
p.maybePublishMessage(src, msg.Message)
|
||||
p.publishMessage(src, msg.Message)
|
||||
}
|
||||
|
||||
// validate performs validation and only sends the message if all validators succeed
|
||||
@ -476,13 +482,7 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
|
||||
id := msgID(pmsg)
|
||||
if p.seenMessage(id) {
|
||||
return
|
||||
}
|
||||
|
||||
p.markSeen(id)
|
||||
func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) {
|
||||
p.notifySubs(pmsg)
|
||||
p.rt.Publish(from, pmsg)
|
||||
}
|
||||
@ -580,7 +580,8 @@ type listPeerReq struct {
|
||||
topic string
|
||||
}
|
||||
|
||||
// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done.
|
||||
// sendReq is a request to call publishMessage.
|
||||
// It is issued after message validation is done.
|
||||
type sendReq struct {
|
||||
from peer.ID
|
||||
msg *Message
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user