mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
Adds Application Specific RPC Inspector (#509)
* Update go.mod * Refactor GossipSub Construction (#1) * Enables non-atomic validation for peer scoring parameters (#499) * decouples topic scoring parameters * adds skiping atomic validation for topic parameters * cleans up * adds skip atomic validation to peer score threshold * adds skip atomic validation for peer parameters * adds test for non-atomic validation * adds tests for peer score * adds tests for peer score thresholds * refactors tests * chore: Update .github/workflows/stale.yml [skip ci] * adds with gossipsub tracker Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com> * decouples options * fixes conflict * reverts back module * fixes peer score helper * Adds send control message to gossipsub router (#2) * adjusts libp2p version (#3) * Update go.mod (#4) * adds app specific rpc handler * Create ci.yml (#5) * Create Makefile (#7) * Revert "Merge branch 'yahya/gossipsub-router-interface'" (#6) This reverts commit 1c91995b7fbce0e4b9c5990c5bfda0d555267182. * Update ci.yml (#9) * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 352d7471c58580480b7f6592001bc3e9b910fa77. * Revert "Merge remote-tracking branch 'origin/yahya/adds-rpc-inspector' into yahya/adds-rpc-inspector" This reverts commit 586c5cb6eb2a971a1590ea32050de139316984d2. * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 2e13ee8b95dded5a3401dd86f952fae3419bd86b. * moves app specific inspector to pubsub * removes option from gossipsub * moves app specific rpc inspector up * refactors app specific to return an error Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com>
This commit is contained in:
parent
972f19967f
commit
d3f151c224
22
pubsub.go
22
pubsub.go
@ -170,6 +170,12 @@ type PubSub struct {
|
||||
protoMatchFunc ProtocolMatchFn
|
||||
|
||||
ctx context.Context
|
||||
|
||||
// appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to
|
||||
// processing them. The inspector is invoked on an accepted RPC right prior to handling it.
|
||||
// The return value of the inspector function is an error indicating whether the RPC should be processed or not.
|
||||
// If the error is nil, the RPC is processed as usual. If the error is non-nil, the RPC is dropped.
|
||||
appSpecificRpcInspector func(peer.ID, *RPC) error
|
||||
}
|
||||
|
||||
// PubSubRouter is the message router component of PubSub.
|
||||
@ -527,6 +533,13 @@ func WithSeenMessagesTTL(ttl time.Duration) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option {
|
||||
return func(ps *PubSub) error {
|
||||
ps.appSpecificRpcInspector = inspector
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// processLoop handles all inputs arriving on the channels
|
||||
func (p *PubSub) processLoop(ctx context.Context) {
|
||||
defer func() {
|
||||
@ -1005,6 +1018,15 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
|
||||
}
|
||||
|
||||
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
||||
// pass the rpc through app specific validation (if any available).
|
||||
if p.appSpecificRpcInspector != nil {
|
||||
// check if the RPC is allowed by the external inspector
|
||||
if err := p.appSpecificRpcInspector(rpc.from, rpc); err != nil {
|
||||
log.Debugf("application-specific inspection failed, rejecting incoming rpc: %s", err)
|
||||
return // reject the RPC
|
||||
}
|
||||
}
|
||||
|
||||
p.tracer.RecvRPC(rpc)
|
||||
|
||||
subs := rpc.GetSubscriptions()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user