Merge pull request #97 from libp2p/feat/signing

Implement message signing
This commit is contained in:
vyzo 2018-10-14 02:42:14 +03:00 committed by GitHub
commit 4eb6b7cada
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 431 additions and 69 deletions

View File

@ -899,3 +899,39 @@ func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) {
}
}
}
func TestWithSigning(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts, WithMessageSigning(true))
connect(t, hosts[0], hosts[1])
topic := "foobar"
data := []byte("this is a message")
sub, err := psubs[1].Subscribe(topic)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 10)
err = psubs[0].Publish(topic, data)
if err != nil {
t.Fatal(err)
}
msg, err := sub.Next(ctx)
if err != nil {
t.Fatal(err)
}
if msg.Signature == nil {
t.Fatal("no signature in message")
}
if string(msg.Data) != string(data) {
t.Fatalf("unexpected data: %s", string(msg.Data))
}
}

View File

@ -65,6 +65,12 @@
"hash": "QmdxUuburamoF6zF9qjeQC4WYcWGbWuRmdLacMEsW8ioD8",
"name": "gogo-protobuf",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n",
"name": "go-libp2p-crypto",
"version": "2.0.1"
}
],
"gxVersion": "0.9.0",

View File

@ -56,7 +56,7 @@ func (x *TopicDescriptor_AuthOpts_AuthMode) UnmarshalJSON(data []byte) error {
return nil
}
func (TopicDescriptor_AuthOpts_AuthMode) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{7, 0, 0}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{7, 0, 0}
}
type TopicDescriptor_EncOpts_EncMode int32
@ -95,7 +95,7 @@ func (x *TopicDescriptor_EncOpts_EncMode) UnmarshalJSON(data []byte) error {
return nil
}
func (TopicDescriptor_EncOpts_EncMode) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{7, 1, 0}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{7, 1, 0}
}
type RPC struct {
@ -111,7 +111,7 @@ func (m *RPC) Reset() { *m = RPC{} }
func (m *RPC) String() string { return proto.CompactTextString(m) }
func (*RPC) ProtoMessage() {}
func (*RPC) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{0}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{0}
}
func (m *RPC) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -173,7 +173,7 @@ func (m *RPC_SubOpts) Reset() { *m = RPC_SubOpts{} }
func (m *RPC_SubOpts) String() string { return proto.CompactTextString(m) }
func (*RPC_SubOpts) ProtoMessage() {}
func (*RPC_SubOpts) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{0, 0}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{0, 0}
}
func (m *RPC_SubOpts) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -221,6 +221,8 @@ type Message struct {
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`
TopicIDs []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`
Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
Key []byte `protobuf:"bytes,6,opt,name=key" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -230,7 +232,7 @@ func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{1}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{1}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -287,6 +289,20 @@ func (m *Message) GetTopicIDs() []string {
return nil
}
func (m *Message) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
func (m *Message) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
type ControlMessage struct {
Ihave []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"`
Iwant []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"`
@ -301,7 +317,7 @@ func (m *ControlMessage) Reset() { *m = ControlMessage{} }
func (m *ControlMessage) String() string { return proto.CompactTextString(m) }
func (*ControlMessage) ProtoMessage() {}
func (*ControlMessage) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{2}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{2}
}
func (m *ControlMessage) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -370,7 +386,7 @@ func (m *ControlIHave) Reset() { *m = ControlIHave{} }
func (m *ControlIHave) String() string { return proto.CompactTextString(m) }
func (*ControlIHave) ProtoMessage() {}
func (*ControlIHave) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{3}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{3}
}
func (m *ControlIHave) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -424,7 +440,7 @@ func (m *ControlIWant) Reset() { *m = ControlIWant{} }
func (m *ControlIWant) String() string { return proto.CompactTextString(m) }
func (*ControlIWant) ProtoMessage() {}
func (*ControlIWant) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{4}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{4}
}
func (m *ControlIWant) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -471,7 +487,7 @@ func (m *ControlGraft) Reset() { *m = ControlGraft{} }
func (m *ControlGraft) String() string { return proto.CompactTextString(m) }
func (*ControlGraft) ProtoMessage() {}
func (*ControlGraft) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{5}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{5}
}
func (m *ControlGraft) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -518,7 +534,7 @@ func (m *ControlPrune) Reset() { *m = ControlPrune{} }
func (m *ControlPrune) String() string { return proto.CompactTextString(m) }
func (*ControlPrune) ProtoMessage() {}
func (*ControlPrune) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{6}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{6}
}
func (m *ControlPrune) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -568,7 +584,7 @@ func (m *TopicDescriptor) Reset() { *m = TopicDescriptor{} }
func (m *TopicDescriptor) String() string { return proto.CompactTextString(m) }
func (*TopicDescriptor) ProtoMessage() {}
func (*TopicDescriptor) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{7}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{7}
}
func (m *TopicDescriptor) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -630,7 +646,7 @@ func (m *TopicDescriptor_AuthOpts) Reset() { *m = TopicDescriptor_AuthOp
func (m *TopicDescriptor_AuthOpts) String() string { return proto.CompactTextString(m) }
func (*TopicDescriptor_AuthOpts) ProtoMessage() {}
func (*TopicDescriptor_AuthOpts) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{7, 0}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{7, 0}
}
func (m *TopicDescriptor_AuthOpts) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -685,7 +701,7 @@ func (m *TopicDescriptor_EncOpts) Reset() { *m = TopicDescriptor_EncOpts
func (m *TopicDescriptor_EncOpts) String() string { return proto.CompactTextString(m) }
func (*TopicDescriptor_EncOpts) ProtoMessage() {}
func (*TopicDescriptor_EncOpts) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_9f02422616139c62, []int{7, 1}
return fileDescriptor_rpc_0c4744ec88cf773a, []int{7, 1}
}
func (m *TopicDescriptor_EncOpts) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -883,6 +899,18 @@ func (m *Message) MarshalTo(dAtA []byte) (int, error) {
i += copy(dAtA[i:], s)
}
}
if m.Signature != nil {
dAtA[i] = 0x2a
i++
i = encodeVarintRpc(dAtA, i, uint64(len(m.Signature)))
i += copy(dAtA[i:], m.Signature)
}
if m.Key != nil {
dAtA[i] = 0x32
i++
i = encodeVarintRpc(dAtA, i, uint64(len(m.Key)))
i += copy(dAtA[i:], m.Key)
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
@ -1276,6 +1304,14 @@ func (m *Message) Size() (n int) {
n += 1 + l + sovRpc(uint64(l))
}
}
if m.Signature != nil {
l = len(m.Signature)
n += 1 + l + sovRpc(uint64(l))
}
if m.Key != nil {
l = len(m.Key)
n += 1 + l + sovRpc(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -1844,6 +1880,68 @@ func (m *Message) Unmarshal(dAtA []byte) error {
}
m.TopicIDs = append(m.TopicIDs, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Signature", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRpc
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Signature = append(m.Signature[:0], dAtA[iNdEx:postIndex]...)
if m.Signature == nil {
m.Signature = []byte{}
}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRpc
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRpc(dAtA[iNdEx:])
@ -2845,45 +2943,46 @@ var (
ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_9f02422616139c62) }
func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_0c4744ec88cf773a) }
var fileDescriptor_rpc_9f02422616139c62 = []byte{
// 577 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0x4d, 0x6e, 0xd3, 0x40,
0x14, 0xc7, 0x99, 0x38, 0x95, 0xe3, 0x17, 0xb7, 0x54, 0xa3, 0x2e, 0x4c, 0xa8, 0xa2, 0xc8, 0x02,
0x64, 0x09, 0x64, 0xa4, 0x48, 0x20, 0xb1, 0x41, 0xb4, 0x4d, 0x44, 0x22, 0xd4, 0x26, 0x9a, 0x56,
0xaa, 0x58, 0xda, 0xce, 0xa4, 0xb1, 0x9a, 0x78, 0x8c, 0x3f, 0x8a, 0x7a, 0x05, 0x0e, 0xc0, 0x71,
0x58, 0xb3, 0x64, 0xc1, 0x01, 0x50, 0x2e, 0xc0, 0x15, 0xd0, 0xbc, 0x71, 0x3e, 0x1c, 0x9a, 0xaa,
0xab, 0xbe, 0x99, 0xfe, 0x7e, 0x33, 0xff, 0xf7, 0x3c, 0x01, 0x23, 0x89, 0x03, 0x37, 0x4e, 0x44,
0x26, 0x68, 0x7d, 0x3c, 0x15, 0x62, 0x94, 0xe6, 0xbe, 0x1b, 0xfb, 0xf6, 0x5f, 0x02, 0x1a, 0x1b,
0x9e, 0xd0, 0xf7, 0xb0, 0x9b, 0xe6, 0x7e, 0x1a, 0x24, 0x61, 0x9c, 0x85, 0x22, 0x4a, 0x2d, 0xd2,
0xd2, 0x9c, 0x7a, 0xdb, 0x72, 0xd7, 0x60, 0x97, 0x0d, 0x4f, 0xdc, 0xf3, 0xdc, 0x1f, 0xc4, 0x59,
0xca, 0xca, 0x38, 0x75, 0x41, 0x8f, 0x73, 0x7f, 0x1a, 0xa6, 0x13, 0xab, 0x82, 0xe6, 0x41, 0xc9,
0x3c, 0xe5, 0x69, 0xea, 0x5d, 0x71, 0xb6, 0x80, 0xe8, 0x1b, 0xd0, 0x03, 0x11, 0x65, 0x89, 0x98,
0x5a, 0x5a, 0x8b, 0x38, 0xf5, 0xf6, 0xd3, 0x12, 0x7f, 0xa2, 0xfe, 0xb7, 0xd4, 0x0a, 0xb6, 0x71,
0x04, 0x7a, 0x11, 0x80, 0x1e, 0x82, 0x51, 0x44, 0xf0, 0xb9, 0x45, 0x5a, 0xc4, 0xa9, 0xb1, 0xd5,
0x06, 0xb5, 0x40, 0xcf, 0x44, 0x1c, 0x06, 0xe1, 0xc8, 0xaa, 0xb4, 0x88, 0x63, 0xb0, 0xc5, 0xd2,
0x0e, 0x40, 0x2f, 0x8e, 0xa5, 0x14, 0xaa, 0xe3, 0x44, 0xcc, 0xd0, 0x36, 0x19, 0xd6, 0x72, 0x6f,
0xe4, 0x65, 0x1e, 0x5a, 0x26, 0xc3, 0x9a, 0x1e, 0xc0, 0x4e, 0xca, 0xbf, 0x44, 0x02, 0xa3, 0x9a,
0x4c, 0x2d, 0x68, 0x03, 0x6a, 0x78, 0x66, 0xbf, 0x93, 0x5a, 0xd5, 0x96, 0xe6, 0x18, 0x6c, 0xb9,
0xb6, 0x7f, 0x13, 0xd8, 0x2b, 0xf7, 0x40, 0x5f, 0xc3, 0x4e, 0x38, 0xf1, 0x6e, 0x78, 0x31, 0xd9,
0x27, 0x77, 0xf5, 0xdb, 0xef, 0x79, 0x37, 0x9c, 0x29, 0x0e, 0x85, 0xaf, 0x5e, 0x94, 0x15, 0x03,
0xbd, 0x5b, 0xb8, 0xf4, 0xa2, 0x8c, 0x29, 0x4e, 0x0a, 0x57, 0x89, 0x37, 0xce, 0x2c, 0x6d, 0xbb,
0xf0, 0x51, 0x02, 0x4c, 0x71, 0x52, 0x88, 0x93, 0x3c, 0xe2, 0x18, 0x7f, 0x8b, 0x30, 0x94, 0x00,
0x53, 0x9c, 0xdd, 0x03, 0x73, 0x3d, 0xe9, 0x72, 0xca, 0xfd, 0x0e, 0xce, 0x70, 0x31, 0xe5, 0x7e,
0x87, 0x36, 0x01, 0x66, 0xaa, 0x71, 0x39, 0x9e, 0x0a, 0x8e, 0x67, 0x6d, 0xc7, 0x76, 0x57, 0x27,
0xc9, 0x16, 0x36, 0x78, 0xf2, 0x1f, 0xef, 0x2c, 0x79, 0xec, 0x60, 0xfb, 0xcd, 0x6b, 0x24, 0x46,
0xbf, 0x87, 0xfc, 0xa1, 0xc1, 0xe3, 0x0b, 0x59, 0x77, 0xb8, 0x7a, 0xc9, 0x22, 0x91, 0x9f, 0x3f,
0xf2, 0x66, 0xbc, 0x40, 0xb1, 0xa6, 0xef, 0xa0, 0xea, 0xe5, 0xd9, 0x04, 0x9f, 0x44, 0xbd, 0xfd,
0xbc, 0x34, 0xa5, 0x0d, 0xdf, 0x3d, 0xca, 0xb3, 0x09, 0xfe, 0x3e, 0x50, 0xa1, 0x6f, 0x41, 0xe3,
0x51, 0x50, 0x3c, 0xf1, 0x67, 0xf7, 0x9a, 0xdd, 0x28, 0x40, 0x51, 0x0a, 0x8d, 0x6f, 0x04, 0x6a,
0x8b, 0xa3, 0xe8, 0x31, 0x54, 0x67, 0x62, 0xa4, 0x32, 0xed, 0xb5, 0xdd, 0x07, 0xdd, 0x8f, 0xc5,
0xa9, 0x18, 0x71, 0x86, 0xae, 0xec, 0xeb, 0x9a, 0xdf, 0xaa, 0x2f, 0x61, 0x32, 0xac, 0xed, 0x17,
0xea, 0x0e, 0x49, 0xd1, 0x1a, 0x54, 0xcf, 0x06, 0x67, 0xdd, 0xfd, 0x47, 0x54, 0x07, 0xed, 0x53,
0xf7, 0xf3, 0x3e, 0x91, 0xc5, 0xe5, 0xe0, 0x62, 0xbf, 0xd2, 0xf8, 0x4e, 0x40, 0x2f, 0xd2, 0xd1,
0x0f, 0xa5, 0x2c, 0xaf, 0x1e, 0xd2, 0x91, 0xfc, 0xbb, 0x96, 0xe4, 0x10, 0x8c, 0x6b, 0x7e, 0xdb,
0xf3, 0xd2, 0x09, 0x5f, 0xc4, 0x59, 0x6d, 0xd8, 0x2f, 0xf1, 0xaa, 0x8d, 0x48, 0xbb, 0x60, 0x9c,
0xf7, 0x8e, 0x58, 0xb7, 0x53, 0x0e, 0x76, 0x6c, 0xfe, 0x9c, 0x37, 0xc9, 0xaf, 0x79, 0x93, 0xfc,
0x99, 0x37, 0xc9, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x80, 0x05, 0xeb, 0x9d, 0xe3, 0x04, 0x00,
0x00,
var fileDescriptor_rpc_0c4744ec88cf773a = []byte{
// 602 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xc7, 0xd9, 0x38, 0xc5, 0xf1, 0xc4, 0x2d, 0xd1, 0xaa, 0x07, 0x13, 0xaa, 0x28, 0xb2, 0x00,
0x59, 0x02, 0x19, 0x29, 0x12, 0x48, 0x5c, 0x10, 0x6d, 0x13, 0x91, 0x08, 0xf5, 0x43, 0xdb, 0x4a,
0x15, 0xc7, 0xb5, 0xb3, 0x6d, 0xac, 0x36, 0x5e, 0x63, 0xaf, 0x8b, 0xfa, 0x0a, 0x5c, 0x91, 0x78,
0x1c, 0xce, 0x1c, 0x39, 0xf0, 0x00, 0xa8, 0x2f, 0xc0, 0x2b, 0xa0, 0x1d, 0x3b, 0x1f, 0x2e, 0x4d,
0xd5, 0x53, 0x66, 0x27, 0xff, 0xdf, 0xce, 0x7f, 0x66, 0xd6, 0x60, 0xa5, 0x49, 0xe8, 0x27, 0xa9,
0x54, 0x92, 0x36, 0x4f, 0x2f, 0xa4, 0x1c, 0x67, 0x79, 0xe0, 0x27, 0x81, 0xfb, 0x97, 0x80, 0xc1,
0x0e, 0x77, 0xe9, 0x3b, 0x58, 0xcf, 0xf2, 0x20, 0x0b, 0xd3, 0x28, 0x51, 0x91, 0x8c, 0x33, 0x87,
0x74, 0x0d, 0xaf, 0xd9, 0x73, 0xfc, 0x25, 0xb1, 0xcf, 0x0e, 0x77, 0xfd, 0xa3, 0x3c, 0x38, 0x48,
0x54, 0xc6, 0xaa, 0x72, 0xea, 0x83, 0x99, 0xe4, 0xc1, 0x45, 0x94, 0x4d, 0x9c, 0x1a, 0x92, 0x9b,
0x15, 0x72, 0x4f, 0x64, 0x19, 0x3f, 0x13, 0x6c, 0x26, 0xa2, 0xaf, 0xc1, 0x0c, 0x65, 0xac, 0x52,
0x79, 0xe1, 0x18, 0x5d, 0xe2, 0x35, 0x7b, 0x4f, 0x2a, 0xfa, 0xdd, 0xe2, 0xbf, 0x39, 0x56, 0x6a,
0xdb, 0xdb, 0x60, 0x96, 0x06, 0xe8, 0x16, 0x58, 0xa5, 0x85, 0x40, 0x38, 0xa4, 0x4b, 0xbc, 0x06,
0x5b, 0x24, 0xa8, 0x03, 0xa6, 0x92, 0x49, 0x14, 0x46, 0x63, 0xa7, 0xd6, 0x25, 0x9e, 0xc5, 0x66,
0x47, 0xf7, 0x1b, 0x01, 0xb3, 0xbc, 0x97, 0x52, 0xa8, 0x9f, 0xa6, 0x72, 0x8a, 0xb8, 0xcd, 0x30,
0xd6, 0xb9, 0x31, 0x57, 0x1c, 0x31, 0x9b, 0x61, 0x4c, 0x37, 0x61, 0x2d, 0x13, 0x9f, 0x63, 0x89,
0x5e, 0x6d, 0x56, 0x1c, 0x68, 0x1b, 0x1a, 0x78, 0xe9, 0xa8, 0x9f, 0x39, 0xf5, 0xae, 0xe1, 0x59,
0x6c, 0x7e, 0x46, 0x77, 0xd1, 0x59, 0xcc, 0x55, 0x9e, 0x0a, 0x67, 0x0d, 0xa9, 0x45, 0x82, 0xb6,
0xc0, 0x38, 0x17, 0x57, 0xce, 0x43, 0xcc, 0xeb, 0xd0, 0xfd, 0x4d, 0x60, 0xa3, 0xda, 0x34, 0x7d,
0x05, 0x6b, 0xd1, 0x84, 0x5f, 0x8a, 0x72, 0x15, 0x8f, 0x6f, 0x1b, 0xd0, 0x68, 0xc8, 0x2f, 0x05,
0x2b, 0x74, 0x08, 0x7c, 0xe1, 0xb1, 0x2a, 0x37, 0x70, 0x3b, 0x70, 0xc2, 0x63, 0xc5, 0x0a, 0x9d,
0x06, 0xce, 0x52, 0x7e, 0xaa, 0x1c, 0x63, 0x35, 0xf0, 0x41, 0x0b, 0x58, 0xa1, 0xd3, 0x40, 0x92,
0xe6, 0xb1, 0xc0, 0x76, 0x57, 0x00, 0x87, 0x5a, 0xc0, 0x0a, 0x9d, 0x3b, 0x04, 0x7b, 0xd9, 0xe9,
0x7c, 0x2d, 0xa3, 0x3e, 0xce, 0x7c, 0xb6, 0x96, 0x51, 0x9f, 0x76, 0x00, 0xa6, 0x45, 0xe3, 0x7a,
0x9c, 0x35, 0x1c, 0xe7, 0x52, 0xc6, 0xf5, 0x17, 0x37, 0xe9, 0x16, 0x6e, 0xe8, 0xc9, 0x7f, 0x7a,
0x6f, 0xae, 0xc7, 0x0e, 0x56, 0x57, 0x5e, 0x52, 0xa2, 0xf5, 0x3b, 0x94, 0x3f, 0x0c, 0x78, 0x74,
0xac, 0xe3, 0xbe, 0x28, 0x9e, 0xbe, 0x4c, 0xf5, 0x73, 0x89, 0xf9, 0x54, 0x94, 0x52, 0x8c, 0xe9,
0x5b, 0xa8, 0xf3, 0x5c, 0x4d, 0xf0, 0x09, 0x35, 0x7b, 0xcf, 0x2a, 0x53, 0xba, 0xc1, 0xfb, 0xdb,
0xb9, 0x9a, 0xe0, 0x07, 0x85, 0x08, 0x7d, 0x03, 0x86, 0x88, 0xc3, 0xf2, 0x9b, 0x78, 0x7a, 0x27,
0x39, 0x88, 0x43, 0x04, 0x35, 0xd0, 0xfe, 0x4a, 0xa0, 0x31, 0xbb, 0x8a, 0xee, 0x40, 0x7d, 0x2a,
0xc7, 0x85, 0xa7, 0x8d, 0x9e, 0x7f, 0xaf, 0xfa, 0x18, 0xec, 0xc9, 0xb1, 0x60, 0xc8, 0xea, 0xbe,
0xce, 0xc5, 0x55, 0xb1, 0x09, 0x9b, 0x61, 0xec, 0x3e, 0x2f, 0x6a, 0x68, 0x15, 0x6d, 0x40, 0x7d,
0xff, 0x60, 0x7f, 0xd0, 0x7a, 0x40, 0x4d, 0x30, 0x3e, 0x0e, 0x3e, 0xb5, 0x88, 0x0e, 0x4e, 0x0e,
0x8e, 0x5b, 0xb5, 0xf6, 0x77, 0x02, 0x66, 0xe9, 0x8e, 0xbe, 0xaf, 0x78, 0x79, 0x79, 0x9f, 0x8e,
0xf4, 0xef, 0x92, 0x93, 0x2d, 0xb0, 0xce, 0xc5, 0xd5, 0x90, 0x67, 0x13, 0x31, 0xb3, 0xb3, 0x48,
0xb8, 0x2f, 0xb0, 0xd4, 0x0d, 0x4b, 0xeb, 0x60, 0x1d, 0x0d, 0xb7, 0xd9, 0xa0, 0x5f, 0x35, 0xb6,
0x63, 0xff, 0xbc, 0xee, 0x90, 0x5f, 0xd7, 0x1d, 0xf2, 0xe7, 0xba, 0x43, 0xfe, 0x05, 0x00, 0x00,
0xff, 0xff, 0x6e, 0x39, 0x18, 0xab, 0x14, 0x05, 0x00, 0x00,
}

View File

@ -19,6 +19,8 @@ message Message {
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
message ControlMessage {

109
pubsub.go
View File

@ -11,6 +11,7 @@ import (
pb "github.com/libp2p/go-floodsub/pb"
logging "github.com/ipfs/go-log"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
@ -89,6 +90,13 @@ type PubSub struct {
peers map[peer.ID]chan *RPC
seenMessages *timecache.TimeCache
// key for signing messages; nil when signing is disabled (default for now)
signKey crypto.PrivKey
// source ID for signed messages; corresponds to signKey
signID peer.ID
// strict mode rejects all unsigned messages prior to validation
signStrict bool
ctx context.Context
}
@ -186,6 +194,15 @@ func WithValidateThrottle(n int) Option {
}
}
func WithMessageSigning(strict bool) Option {
return func(p *PubSub) error {
p.signID = p.host.ID()
p.signKey = p.host.Peerstore().PrivKey(p.signID)
p.signStrict = strict
return nil
}
}
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
@ -446,13 +463,19 @@ func msgID(pmsg *pb.Message) string {
// pushMsg pushes a message performing validation as necessary
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
// reject unsigned messages when strict before we even process the id
if p.signStrict && msg.Signature == nil {
log.Debugf("dropping unsigned message from %s", src)
return
}
id := msgID(msg.Message)
if p.seenMessage(id) {
return
}
p.markSeen(id)
if len(vals) > 0 {
if len(vals) > 0 || msg.Signature != nil {
// validation is asynchronous and globally throttled with the throttleValidate semaphore.
// the purpose of the global throttle is to bound the goncurrency possible from incoming
// network traffic; each validator also has an individual throttle to preclude
@ -474,6 +497,42 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
// validate performs validation and only sends the message if all validators succeed
func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
if msg.Signature != nil {
if !p.validateSignature(msg) {
log.Warningf("message signature validation failed; dropping message from %s", src)
return
}
}
if len(vals) > 0 {
if !p.validateTopic(vals, msg) {
log.Warningf("message validation failed; dropping message from %s", src)
return
}
}
// all validators were successful, send the message
p.sendMsg <- &sendReq{
from: src,
msg: msg,
}
}
func (p *PubSub) validateSignature(msg *Message) bool {
err := verifyMessageSignature(msg.Message)
if err != nil {
log.Debugf("signature verification error: %s", err.Error())
return false
}
return true
}
func (p *PubSub) validateTopic(vals []*topicVal, msg *Message) bool {
if len(vals) == 1 {
return p.validateSingleTopic(vals[0], msg)
}
ctx, cancel := context.WithCancel(p.ctx)
defer cancel()
@ -500,22 +559,34 @@ loop:
}
if throttle {
log.Warningf("message validation throttled; dropping message from %s", src)
return
return false
}
for i := 0; i < rcount; i++ {
valid := <-rch
if !valid {
log.Warningf("message validation failed; dropping message from %s", src)
return
return false
}
}
// all validators were successful, send the message
p.sendMsg <- &sendReq{
from: src,
msg: msg,
return true
}
// fast path for single topic validation that avoids the extra goroutine
func (p *PubSub) validateSingleTopic(val *topicVal, msg *Message) bool {
select {
case val.validateThrottle <- struct{}{}:
ctx, cancel := context.WithCancel(p.ctx)
defer cancel()
res := val.validateMsg(ctx, msg)
<-val.validateThrottle
return res
default:
log.Debugf("validation throttled for topic %s", val.topic)
return false
}
}
@ -598,14 +669,20 @@ func (p *PubSub) GetTopics() []string {
// Publish publishes data under the given topic
func (p *PubSub) Publish(topic string, data []byte) error {
seqno := p.nextSeqno()
p.publish <- &Message{
&pb.Message{
Data: data,
TopicIDs: []string{topic},
From: []byte(p.host.ID()),
Seqno: seqno,
},
m := &pb.Message{
Data: data,
TopicIDs: []string{topic},
From: []byte(p.host.ID()),
Seqno: seqno,
}
if p.signKey != nil {
m.From = []byte(p.signID)
err := signMessage(p.signID, p.signKey, m)
if err != nil {
return err
}
}
p.publish <- &Message{m}
return nil
}

100
sign.go Normal file
View File

@ -0,0 +1,100 @@
package floodsub
import (
"fmt"
pb "github.com/libp2p/go-floodsub/pb"
crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
)
const SignPrefix = "libp2p-pubsub:"
func verifyMessageSignature(m *pb.Message) error {
pubk, err := messagePubKey(m)
if err != nil {
return err
}
xm := *m
xm.Signature = nil
xm.Key = nil
bytes, err := xm.Marshal()
if err != nil {
return err
}
bytes = withSignPrefix(bytes)
valid, err := pubk.Verify(bytes, m.Signature)
if err != nil {
return err
}
if !valid {
return fmt.Errorf("invalid signature")
}
return nil
}
func messagePubKey(m *pb.Message) (crypto.PubKey, error) {
var pubk crypto.PubKey
pid, err := peer.IDFromBytes(m.From)
if err != nil {
return nil, err
}
if m.Key == nil {
// no attached key, it must be extractable from the source ID
pubk, err = pid.ExtractPublicKey()
if err != nil {
return nil, fmt.Errorf("cannot extract signing key: %s", err.Error())
}
} else {
pubk, err = crypto.UnmarshalPublicKey(m.Key)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal signing key: %s", err.Error())
}
// verify that the source ID matches the attached key
if !pid.MatchesPublicKey(pubk) {
return nil, fmt.Errorf("bad signing key; source ID %s doesn't match key", pid)
}
}
return pubk, nil
}
func signMessage(pid peer.ID, key crypto.PrivKey, m *pb.Message) error {
bytes, err := m.Marshal()
if err != nil {
return err
}
bytes = withSignPrefix(bytes)
sig, err := key.Sign(bytes)
if err != nil {
return err
}
m.Signature = sig
pk, _ := pid.ExtractPublicKey()
if pk == nil {
pubk, err := key.GetPublic().Bytes()
if err != nil {
return err
}
m.Key = pubk
}
return nil
}
func withSignPrefix(bytes []byte) []byte {
return append([]byte(SignPrefix), bytes...)
}

42
sign_test.go Normal file
View File

@ -0,0 +1,42 @@
package floodsub
import (
"testing"
pb "github.com/libp2p/go-floodsub/pb"
crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
)
func TestSigning(t *testing.T) {
privk, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048)
if err != nil {
t.Fatal(err)
}
testSignVerify(t, privk)
privk, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 0)
if err != nil {
t.Fatal(err)
}
testSignVerify(t, privk)
}
func testSignVerify(t *testing.T, privk crypto.PrivKey) {
id, err := peer.IDFromPublicKey(privk.GetPublic())
if err != nil {
t.Fatal(err)
}
m := pb.Message{
Data: []byte("abc"),
TopicIDs: []string{"foo"},
From: []byte(id),
Seqno: []byte("123"),
}
signMessage(id, privk, &m)
err = verifyMessageSignature(&m)
if err != nil {
t.Fatal(err)
}
}