mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 05:43:06 +00:00
Merge pull request #359 from protolambda/optional-sig-origin-seq
Signing policy + optional Signature, From and Seqno
This commit is contained in:
parent
aabbdb1143
commit
99507107b6
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@ -717,13 +718,48 @@ func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNonsensicalSigningOptions(t *testing.T) {
|
func TestWithNoSigning(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
hosts := getNetHosts(t, ctx, 1)
|
|
||||||
_, err := NewFloodSub(ctx, hosts[0], WithMessageSigning(false), WithStrictSignatureVerification(true))
|
hosts := getNetHosts(t, ctx, 2)
|
||||||
if err == nil {
|
psubs := getPubsubs(ctx, hosts, WithNoAuthor(), WithMessageIdFn(func(pmsg *pb.Message) string {
|
||||||
t.Error("expected constructor to fail on nonsensical options")
|
// silly content-based test message-ID: just use the data as whole
|
||||||
|
return base64.URLEncoding.EncodeToString(pmsg.Data)
|
||||||
|
}))
|
||||||
|
|
||||||
|
connect(t, hosts[0], hosts[1])
|
||||||
|
|
||||||
|
topic := "foobar"
|
||||||
|
data := []byte("this is a message")
|
||||||
|
|
||||||
|
sub, err := psubs[1].Subscribe(topic)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
|
||||||
|
err = psubs[0].Publish(topic, data)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := sub.Next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if msg.Signature != nil {
|
||||||
|
t.Fatal("signature in message")
|
||||||
|
}
|
||||||
|
if msg.From != nil {
|
||||||
|
t.Fatal("from in message")
|
||||||
|
}
|
||||||
|
if msg.Seqno != nil {
|
||||||
|
t.Fatal("seqno in message")
|
||||||
|
}
|
||||||
|
if string(msg.Data) != string(data) {
|
||||||
|
t.Fatalf("unexpected data: %s", string(msg.Data))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -758,6 +794,12 @@ func TestWithSigning(t *testing.T) {
|
|||||||
if msg.Signature == nil {
|
if msg.Signature == nil {
|
||||||
t.Fatal("no signature in message")
|
t.Fatal("no signature in message")
|
||||||
}
|
}
|
||||||
|
if msg.From == nil {
|
||||||
|
t.Fatal("from not in message")
|
||||||
|
}
|
||||||
|
if msg.Seqno == nil {
|
||||||
|
t.Fatal("seqno not in message")
|
||||||
|
}
|
||||||
if string(msg.Data) != string(data) {
|
if string(msg.Data) != string(data) {
|
||||||
t.Fatalf("unexpected data: %s", string(msg.Data))
|
t.Fatalf("unexpected data: %s", string(msg.Data))
|
||||||
}
|
}
|
||||||
|
|||||||
96
pubsub.go
96
pubsub.go
@ -140,12 +140,13 @@ type PubSub struct {
|
|||||||
// function used to compute the ID for a message
|
// function used to compute the ID for a message
|
||||||
msgID MsgIdFunction
|
msgID MsgIdFunction
|
||||||
|
|
||||||
// key for signing messages; nil when signing is disabled (default for now)
|
// key for signing messages; nil when signing is disabled
|
||||||
signKey crypto.PrivKey
|
signKey crypto.PrivKey
|
||||||
// source ID for signed messages; corresponds to signKey
|
// source ID for signed messages; corresponds to signKey, empty when signing is disabled.
|
||||||
|
// If empty, the author and seq-nr are completely omitted from the messages.
|
||||||
signID peer.ID
|
signID peer.ID
|
||||||
// strict mode rejects all unsigned messages prior to validation
|
// strict mode rejects all unsigned messages prior to validation
|
||||||
signStrict bool
|
signPolicy MessageSignaturePolicy
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
@ -212,8 +213,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
|||||||
maxMessageSize: DefaultMaxMessageSize,
|
maxMessageSize: DefaultMaxMessageSize,
|
||||||
peerOutboundQueueSize: 32,
|
peerOutboundQueueSize: 32,
|
||||||
signID: h.ID(),
|
signID: h.ID(),
|
||||||
signKey: h.Peerstore().PrivKey(h.ID()),
|
signKey: nil,
|
||||||
signStrict: true,
|
signPolicy: StrictSign,
|
||||||
incoming: make(chan *RPC, 32),
|
incoming: make(chan *RPC, 32),
|
||||||
publish: make(chan *Message),
|
publish: make(chan *Message),
|
||||||
newPeers: make(chan peer.ID),
|
newPeers: make(chan peer.ID),
|
||||||
@ -251,8 +252,14 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ps.signStrict && ps.signKey == nil {
|
if ps.signPolicy.mustSign() {
|
||||||
return nil, fmt.Errorf("strict signature verification enabled but message signing is disabled")
|
if ps.signID == "" {
|
||||||
|
return nil, fmt.Errorf("strict signature usage enabled but message author was disabled")
|
||||||
|
}
|
||||||
|
ps.signKey = ps.host.Peerstore().PrivKey(ps.signID)
|
||||||
|
if ps.signKey == nil {
|
||||||
|
return nil, fmt.Errorf("can't sign for peer %s: no private key", ps.signID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ps.disc.Start(ps); err != nil {
|
if err := ps.disc.Start(ps); err != nil {
|
||||||
@ -303,17 +310,23 @@ func WithPeerOutboundQueueSize(size int) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMessageSignaturePolicy sets the mode of operation for producing and verifying message signatures.
|
||||||
|
func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option {
|
||||||
|
return func(p *PubSub) error {
|
||||||
|
p.signPolicy = policy
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithMessageSigning enables or disables message signing (enabled by default).
|
// WithMessageSigning enables or disables message signing (enabled by default).
|
||||||
|
// Deprecated: signature verification without message signing,
|
||||||
|
// or message signing without verification, are not recommended.
|
||||||
func WithMessageSigning(enabled bool) Option {
|
func WithMessageSigning(enabled bool) Option {
|
||||||
return func(p *PubSub) error {
|
return func(p *PubSub) error {
|
||||||
if enabled {
|
if enabled {
|
||||||
p.signKey = p.host.Peerstore().PrivKey(p.signID)
|
p.signPolicy |= msgSigning
|
||||||
if p.signKey == nil {
|
|
||||||
return fmt.Errorf("can't sign for peer %s: no private key", p.signID)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
p.signKey = nil
|
p.signPolicy &^= msgSigning
|
||||||
p.signStrict = false
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -328,23 +341,32 @@ func WithMessageAuthor(author peer.ID) Option {
|
|||||||
if author == "" {
|
if author == "" {
|
||||||
author = p.host.ID()
|
author = p.host.ID()
|
||||||
}
|
}
|
||||||
if p.signKey != nil {
|
|
||||||
newSignKey := p.host.Peerstore().PrivKey(author)
|
|
||||||
if newSignKey == nil {
|
|
||||||
return fmt.Errorf("can't sign for peer %s: no private key", author)
|
|
||||||
}
|
|
||||||
p.signKey = newSignKey
|
|
||||||
}
|
|
||||||
p.signID = author
|
p.signID = author
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithNoAuthor omits the author and seq-number data of messages, and disables the use of signatures.
|
||||||
|
// Not recommended to use with the default message ID function, see WithMessageIdFn.
|
||||||
|
func WithNoAuthor() Option {
|
||||||
|
return func(p *PubSub) error {
|
||||||
|
p.signID = ""
|
||||||
|
p.signPolicy &^= msgSigning
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithStrictSignatureVerification is an option to enable or disable strict message signing.
|
// WithStrictSignatureVerification is an option to enable or disable strict message signing.
|
||||||
// When enabled (which is the default), unsigned messages will be discarded.
|
// When enabled (which is the default), unsigned messages will be discarded.
|
||||||
|
// Deprecated: signature verification without message signing,
|
||||||
|
// or message signing without verification, are not recommended.
|
||||||
func WithStrictSignatureVerification(required bool) Option {
|
func WithStrictSignatureVerification(required bool) Option {
|
||||||
return func(p *PubSub) error {
|
return func(p *PubSub) error {
|
||||||
p.signStrict = required
|
if required {
|
||||||
|
p.signPolicy |= msgVerification
|
||||||
|
} else {
|
||||||
|
p.signPolicy &^= msgVerification
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -942,10 +964,34 @@ func (p *PubSub) pushMsg(msg *Message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// reject unsigned messages when strict before we even process the id
|
// reject unsigned messages when strict before we even process the id
|
||||||
if p.signStrict && msg.Signature == nil {
|
if p.signPolicy.mustVerify() {
|
||||||
log.Debugf("dropping unsigned message from %s", src)
|
if p.signPolicy.mustSign() {
|
||||||
p.tracer.RejectMessage(msg, rejectMissingSignature)
|
if msg.Signature == nil {
|
||||||
return
|
log.Debugf("dropping unsigned message from %s", src)
|
||||||
|
p.tracer.RejectMessage(msg, rejectMissingSignature)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Actual signature verification happens in the validation pipeline,
|
||||||
|
// after checking if the message was already seen or not,
|
||||||
|
// to avoid unnecessary signature verification processing-cost.
|
||||||
|
} else {
|
||||||
|
if msg.Signature != nil {
|
||||||
|
log.Debugf("dropping message with unexpected signature from %s", src)
|
||||||
|
p.tracer.RejectMessage(msg, rejectUnexpectedSignature)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// If we are expecting signed messages, and not authoring messages,
|
||||||
|
// then do no accept seq numbers, from data, or key data.
|
||||||
|
// The default msgID function still relies on Seqno and From,
|
||||||
|
// but is not used if we are not authoring messages ourselves.
|
||||||
|
if p.signID == "" {
|
||||||
|
if msg.Seqno != nil || msg.From != nil || msg.Key != nil {
|
||||||
|
log.Debugf("dropping message with unexpected auth info from %s", src)
|
||||||
|
p.tracer.RejectMessage(msg, rejectUnexpectedAuthInfo)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reject messages claiming to be from ourselves but not locally published
|
// reject messages claiming to be from ourselves but not locally published
|
||||||
|
|||||||
4
score.go
4
score.go
@ -570,6 +570,10 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
|
|||||||
fallthrough
|
fallthrough
|
||||||
case rejectInvalidSignature:
|
case rejectInvalidSignature:
|
||||||
fallthrough
|
fallthrough
|
||||||
|
case rejectUnexpectedSignature:
|
||||||
|
fallthrough
|
||||||
|
case rejectUnexpectedAuthInfo:
|
||||||
|
fallthrough
|
||||||
case rejectSelfOrigin:
|
case rejectSelfOrigin:
|
||||||
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
||||||
return
|
return
|
||||||
|
|||||||
35
sign.go
35
sign.go
@ -9,6 +9,41 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MessageSignaturePolicy describes if signatures are produced, expected, and/or verified.
|
||||||
|
type MessageSignaturePolicy uint8
|
||||||
|
|
||||||
|
// LaxSign and LaxNoSign are deprecated. In the future msgSigning and msgVerification can be unified.
|
||||||
|
const (
|
||||||
|
// msgSigning is set when the locally produced messages must be signed
|
||||||
|
msgSigning MessageSignaturePolicy = 1 << iota
|
||||||
|
// msgVerification is set when external messages must be verfied
|
||||||
|
msgVerification
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// StrictSign produces signatures and expects and verifies incoming signatures
|
||||||
|
StrictSign = msgSigning | msgVerification
|
||||||
|
// StrictNoSign does not produce signatures and drops and penalises incoming messages that carry one
|
||||||
|
StrictNoSign = msgVerification
|
||||||
|
// LaxSign produces signatures and validates incoming signatures iff one is present
|
||||||
|
// Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
|
||||||
|
LaxSign = msgSigning
|
||||||
|
// LaxNoSign does not produce signatures and validates incoming signatures iff one is present
|
||||||
|
// Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
|
||||||
|
LaxNoSign = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
// mustVerify is true when a message signature must be verified.
|
||||||
|
// If signatures are not expected, verification checks if the signature is absent.
|
||||||
|
func (policy MessageSignaturePolicy) mustVerify() bool {
|
||||||
|
return policy&msgVerification != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// mustSign is true when messages should be signed, and incoming messages are expected to have a signature.
|
||||||
|
func (policy MessageSignaturePolicy) mustSign() bool {
|
||||||
|
return policy&msgSigning != 0
|
||||||
|
}
|
||||||
|
|
||||||
const SignPrefix = "libp2p-pubsub:"
|
const SignPrefix = "libp2p-pubsub:"
|
||||||
|
|
||||||
func verifyMessageSignature(m *pb.Message) error {
|
func verifyMessageSignature(m *pb.Message) error {
|
||||||
|
|||||||
12
topic.go
12
topic.go
@ -164,13 +164,15 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
|
|||||||
return ErrTopicClosed
|
return ErrTopicClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
seqno := t.p.nextSeqno()
|
|
||||||
id := t.p.host.ID()
|
|
||||||
m := &pb.Message{
|
m := &pb.Message{
|
||||||
Data: data,
|
Data: data,
|
||||||
TopicIDs: []string{t.topic},
|
TopicIDs: []string{t.topic},
|
||||||
From: []byte(id),
|
From: nil,
|
||||||
Seqno: seqno,
|
Seqno: nil,
|
||||||
|
}
|
||||||
|
if t.p.signID != "" {
|
||||||
|
m.From = []byte(t.p.signID)
|
||||||
|
m.Seqno = t.p.nextSeqno()
|
||||||
}
|
}
|
||||||
if t.p.signKey != nil {
|
if t.p.signKey != nil {
|
||||||
m.From = []byte(t.p.signID)
|
m.From = []byte(t.p.signID)
|
||||||
@ -193,7 +195,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case t.p.publish <- &Message{m, id, nil}:
|
case t.p.publish <- &Message{m, t.p.host.ID(), nil}:
|
||||||
case <-t.p.ctx.Done():
|
case <-t.p.ctx.Done():
|
||||||
return t.p.ctx.Err()
|
return t.p.ctx.Err()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -29,6 +29,8 @@ const (
|
|||||||
rejectBlacklstedPeer = "blacklisted peer"
|
rejectBlacklstedPeer = "blacklisted peer"
|
||||||
rejectBlacklistedSource = "blacklisted source"
|
rejectBlacklistedSource = "blacklisted source"
|
||||||
rejectMissingSignature = "missing signature"
|
rejectMissingSignature = "missing signature"
|
||||||
|
rejectUnexpectedSignature = "unexpected signature"
|
||||||
|
rejectUnexpectedAuthInfo = "unexpected auth info"
|
||||||
rejectInvalidSignature = "invalid signature"
|
rejectInvalidSignature = "invalid signature"
|
||||||
rejectValidationQueueFull = "validation queue full"
|
rejectValidationQueueFull = "validation queue full"
|
||||||
rejectValidationThrottled = "validation throttled"
|
rejectValidationThrottled = "validation throttled"
|
||||||
|
|||||||
@ -241,6 +241,8 @@ func (v *validation) validateWorker() {
|
|||||||
// signature validation is performed synchronously, while user validators are invoked
|
// signature validation is performed synchronously, while user validators are invoked
|
||||||
// asynchronously, throttled by the global validation throttle.
|
// asynchronously, throttled by the global validation throttle.
|
||||||
func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
|
func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
|
||||||
|
// If signature verification is enabled, but signing is disabled,
|
||||||
|
// the Signature is required to be nil upon receiving the message in PubSub.pushMsg.
|
||||||
if msg.Signature != nil {
|
if msg.Signature != nil {
|
||||||
if !v.validateSignature(msg) {
|
if !v.validateSignature(msg) {
|
||||||
log.Warnf("message signature validation failed; dropping message from %s", src)
|
log.Warnf("message signature validation failed; dropping message from %s", src)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user