mirror of https://github.com/vacp2p/nim-libp2p.git
logging: switch debug for trace in most cases
This commit is contained in:
parent
011df568b7
commit
054085620c
|
@ -99,7 +99,7 @@ proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
|
||||||
if size > 0.uint:
|
if size > 0.uint:
|
||||||
await s.readExactly(addr buffer[0], int(size))
|
await s.readExactly(addr buffer[0], int(size))
|
||||||
except LPStreamIncompleteError, LPStreamReadError:
|
except LPStreamIncompleteError, LPStreamReadError:
|
||||||
debug "readLp: could not read from remote", exception = getCurrentExceptionMsg()
|
error "readLp: could not read from remote", exception = getCurrentExceptionMsg()
|
||||||
buffer.setLen(0)
|
buffer.setLen(0)
|
||||||
|
|
||||||
result = buffer
|
result = buffer
|
||||||
|
|
|
@ -48,7 +48,7 @@ proc select*(m: MultisteamSelect,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
proto: seq[string]):
|
proto: seq[string]):
|
||||||
Future[string] {.async.} =
|
Future[string] {.async.} =
|
||||||
debug "initiating handshake", codec = m.codec
|
trace "initiating handshake", codec = m.codec
|
||||||
## select a remote protocol
|
## select a remote protocol
|
||||||
await conn.write(m.codec) # write handshake
|
await conn.write(m.codec) # write handshake
|
||||||
if proto.len() > 0:
|
if proto.len() > 0:
|
||||||
|
@ -58,7 +58,7 @@ proc select*(m: MultisteamSelect,
|
||||||
result = cast[string](await conn.readLp()) # read ms header
|
result = cast[string](await conn.readLp()) # read ms header
|
||||||
result.removeSuffix("\n")
|
result.removeSuffix("\n")
|
||||||
if result != Codec:
|
if result != Codec:
|
||||||
debug "handshake failed", codec = result
|
trace "handshake failed", codec = result
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
if proto.len() == 0: # no protocols, must be a handshake call
|
if proto.len() == 0: # no protocols, must be a handshake call
|
||||||
|
@ -68,7 +68,7 @@ proc select*(m: MultisteamSelect,
|
||||||
trace "reading first requested proto"
|
trace "reading first requested proto"
|
||||||
result.removeSuffix("\n")
|
result.removeSuffix("\n")
|
||||||
if result == proto[0]:
|
if result == proto[0]:
|
||||||
debug "succesfully selected ", proto = proto
|
trace "succesfully selected ", proto = proto
|
||||||
return
|
return
|
||||||
|
|
||||||
if not result.len > 0:
|
if not result.len > 0:
|
||||||
|
@ -78,7 +78,7 @@ proc select*(m: MultisteamSelect,
|
||||||
result = cast[string](await conn.readLp()) # read the first proto
|
result = cast[string](await conn.readLp()) # read the first proto
|
||||||
result.removeSuffix("\n")
|
result.removeSuffix("\n")
|
||||||
if result == p:
|
if result == p:
|
||||||
debug "selected protocol", protocol = result
|
trace "selected protocol", protocol = result
|
||||||
break
|
break
|
||||||
|
|
||||||
proc select*(m: MultisteamSelect,
|
proc select*(m: MultisteamSelect,
|
||||||
|
|
|
@ -47,13 +47,13 @@ proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} =
|
||||||
let headerVarint = await conn.readMplexVarint()
|
let headerVarint = await conn.readMplexVarint()
|
||||||
if headerVarint.isNone:
|
if headerVarint.isNone:
|
||||||
return
|
return
|
||||||
|
|
||||||
debug "readMsg: read header varint ", varint = headerVarint
|
trace "readMsg: read header varint ", varint = headerVarint
|
||||||
|
|
||||||
let dataLenVarint = await conn.readMplexVarint()
|
let dataLenVarint = await conn.readMplexVarint()
|
||||||
var data: seq[byte]
|
var data: seq[byte]
|
||||||
if dataLenVarint.isSome and dataLenVarint.get() > 0.uint:
|
if dataLenVarint.isSome and dataLenVarint.get() > 0.uint:
|
||||||
debug "readMsg: read size varint ", varint = dataLenVarint
|
trace "readMsg: read size varint ", varint = dataLenVarint
|
||||||
data = await conn.read(dataLenVarint.get().int)
|
data = await conn.read(dataLenVarint.get().int)
|
||||||
|
|
||||||
let header = headerVarint.get()
|
let header = headerVarint.get()
|
||||||
|
|
|
@ -57,7 +57,7 @@ proc newStreamInternal*(m: Mplex,
|
||||||
m.getChannelList(initiator)[id] = result
|
m.getChannelList(initiator)[id] = result
|
||||||
|
|
||||||
method handle*(m: Mplex) {.async, gcsafe.} =
|
method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
debug "starting mplex main loop"
|
trace "starting mplex main loop"
|
||||||
try:
|
try:
|
||||||
while not m.connection.closed:
|
while not m.connection.closed:
|
||||||
let msgRes = await m.connection.readMsg()
|
let msgRes = await m.connection.readMsg()
|
||||||
|
@ -70,7 +70,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
if MessageType(msgType) != MessageType.New:
|
if MessageType(msgType) != MessageType.New:
|
||||||
let channels = m.getChannelList(initiator)
|
let channels = m.getChannelList(initiator)
|
||||||
if not channels.contains(id):
|
if not channels.contains(id):
|
||||||
debug "handle: Channel with id and msg type ", id = id, msg = msgType
|
trace "handle: Channel with id and msg type ", id = id, msg = msgType
|
||||||
continue
|
continue
|
||||||
channel = channels[id]
|
channel = channels[id]
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
of MessageType.New:
|
of MessageType.New:
|
||||||
let name = cast[string](data)
|
let name = cast[string](data)
|
||||||
channel = await m.newStreamInternal(false, id, name)
|
channel = await m.newStreamInternal(false, id, name)
|
||||||
debug "handle: created channel ", id = id, name = name
|
trace "handle: created channel ", id = id, name = name
|
||||||
if not isNil(m.streamHandler):
|
if not isNil(m.streamHandler):
|
||||||
let stream = newConnection(channel)
|
let stream = newConnection(channel)
|
||||||
stream.peerInfo = m.connection.peerInfo
|
stream.peerInfo = m.connection.peerInfo
|
||||||
|
@ -91,23 +91,23 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
proc(udata: pointer) =
|
proc(udata: pointer) =
|
||||||
channel.cleanUp()
|
channel.cleanUp()
|
||||||
.addCallback(proc(udata: pointer) =
|
.addCallback(proc(udata: pointer) =
|
||||||
debug "handle: cleaned up channel ", id = id))
|
trace "handle: cleaned up channel ", id = id))
|
||||||
handlerFut.addCallback(cleanUpChan)
|
handlerFut.addCallback(cleanUpChan)
|
||||||
continue
|
continue
|
||||||
of MessageType.MsgIn, MessageType.MsgOut:
|
of MessageType.MsgIn, MessageType.MsgOut:
|
||||||
debug "handle: pushing data to channel ", id = id, msgType = msgType
|
trace "handle: pushing data to channel ", id = id, msgType = msgType
|
||||||
await channel.pushTo(data)
|
await channel.pushTo(data)
|
||||||
of MessageType.CloseIn, MessageType.CloseOut:
|
of MessageType.CloseIn, MessageType.CloseOut:
|
||||||
debug "handle: closing channel ", id = id, msgType = msgType
|
trace "handle: closing channel ", id = id, msgType = msgType
|
||||||
await channel.closedByRemote()
|
await channel.closedByRemote()
|
||||||
m.getChannelList(initiator).del(id)
|
m.getChannelList(initiator).del(id)
|
||||||
of MessageType.ResetIn, MessageType.ResetOut:
|
of MessageType.ResetIn, MessageType.ResetOut:
|
||||||
debug "handle: resetting channel ", id = id
|
trace "handle: resetting channel ", id = id
|
||||||
await channel.resetByRemote()
|
await channel.resetByRemote()
|
||||||
break
|
break
|
||||||
else: raise newMplexUnknownMsgError()
|
else: raise newMplexUnknownMsgError()
|
||||||
except:
|
except:
|
||||||
debug "exception occurred", exception = getCurrentExceptionMsg()
|
error "exception occurred", exception = getCurrentExceptionMsg()
|
||||||
finally:
|
finally:
|
||||||
await m.connection.close()
|
await m.connection.close()
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,6 @@ proc decodeMsg*(buf: seq[byte]): IdentifyInfo =
|
||||||
var pubKey: PublicKey
|
var pubKey: PublicKey
|
||||||
if pb.getValue(1, pubKey) > 0:
|
if pb.getValue(1, pubKey) > 0:
|
||||||
result.pubKey = some(pubKey)
|
result.pubKey = some(pubKey)
|
||||||
debug "PUBKEY", pubKey = pubKey
|
|
||||||
|
|
||||||
result.addrs = newSeq[MultiAddress]()
|
result.addrs = newSeq[MultiAddress]()
|
||||||
var address = newSeq[byte]()
|
var address = newSeq[byte]()
|
||||||
|
@ -104,7 +103,7 @@ proc newIdentify*(peerInfo: PeerInfo): Identify =
|
||||||
|
|
||||||
method init*(p: Identify) =
|
method init*(p: Identify) =
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
debug "handling identify request"
|
trace "handling identify request"
|
||||||
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
|
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
|
||||||
await conn.writeLp(pb.buffer)
|
await conn.writeLp(pb.buffer)
|
||||||
|
|
||||||
|
@ -117,19 +116,19 @@ proc identify*(p: Identify,
|
||||||
Future[IdentifyInfo] {.async.} =
|
Future[IdentifyInfo] {.async.} =
|
||||||
var message = await conn.readLp()
|
var message = await conn.readLp()
|
||||||
if len(message) == 0:
|
if len(message) == 0:
|
||||||
debug "identify: Invalid or empty message received!"
|
trace "identify: Invalid or empty message received!"
|
||||||
raise newException(IdentityInvalidMsgError,
|
raise newException(IdentityInvalidMsgError,
|
||||||
"Invalid or empty message received!")
|
"Invalid or empty message received!")
|
||||||
|
|
||||||
result = decodeMsg(message)
|
result = decodeMsg(message)
|
||||||
debug "Identify for remote peer succeded"
|
trace "Identify for remote peer succeded"
|
||||||
|
|
||||||
# TODO: To enable the blow code, the private and public
|
# TODO: To enable the blow code, the private and public
|
||||||
# keys in PeerID need to be wrapped with Option[T]
|
# keys in PeerID need to be wrapped with Option[T]
|
||||||
# if remotePeerInfo.peerId.isSome and
|
# if remotePeerInfo.peerId.isSome and
|
||||||
# result.pubKey.isSome and
|
# result.pubKey.isSome and
|
||||||
# result.pubKey.get() != remotePeerInfo.peerId.get().publicKey:
|
# result.pubKey.get() != remotePeerInfo.peerId.get().publicKey:
|
||||||
# debug "identify: Peer's remote public key doesn't match"
|
# trace "identify: Peer's remote public key doesn't match"
|
||||||
# raise newException(IdentityNoMatchError,
|
# raise newException(IdentityNoMatchError,
|
||||||
# "Peer's remote public key doesn't match")
|
# "Peer's remote public key doesn't match")
|
||||||
|
|
||||||
|
|
|
@ -28,21 +28,19 @@ type
|
||||||
proc sendSubs(f: FloodSub,
|
proc sendSubs(f: FloodSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
topics: seq[string],
|
topics: seq[string],
|
||||||
subscribe: bool)
|
subscribe: bool) {.async, gcsafe.} =
|
||||||
{.async, gcsafe.} =
|
|
||||||
## send subscriptions to remote peer
|
## send subscriptions to remote peer
|
||||||
debug "sending subscriptions", peer = peer.id, subscribe = subscribe
|
trace "sending subscriptions", peer = peer.id, subscribe = subscribe
|
||||||
var msg: RPCMsg
|
var msg: RPCMsg
|
||||||
for t in topics:
|
for t in topics:
|
||||||
debug "sending topic", peer = peer.id, subscribe = subscribe, topicName = t
|
trace "sending topic", peer = peer.id, subscribe = subscribe, topicName = t
|
||||||
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
|
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
|
||||||
|
|
||||||
await peer.send(@[msg])
|
await peer.send(@[msg])
|
||||||
|
|
||||||
proc rpcHandler(f: FloodSub,
|
proc rpcHandler(f: FloodSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
rpcMsgs: seq[RPCMsg])
|
rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} =
|
||||||
{.async, gcsafe.} =
|
|
||||||
## method called by a PubSubPeer every
|
## method called by a PubSubPeer every
|
||||||
## time it receives an RPC message
|
## time it receives an RPC message
|
||||||
##
|
##
|
||||||
|
@ -50,9 +48,9 @@ proc rpcHandler(f: FloodSub,
|
||||||
## or messages forwarded to this peer
|
## or messages forwarded to this peer
|
||||||
##
|
##
|
||||||
|
|
||||||
debug "processing RPC message", peer = peer.id, msg = rpcMsgs
|
trace "processing RPC message", peer = peer.id, msg = rpcMsgs
|
||||||
for m in rpcMsgs: # for all RPC messages
|
for m in rpcMsgs: # for all RPC messages
|
||||||
debug "processing message", msg = rpcMsgs
|
trace "processing message", msg = rpcMsgs
|
||||||
if m.subscriptions.len > 0: # if there are any subscriptions
|
if m.subscriptions.len > 0: # if there are any subscriptions
|
||||||
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
|
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
|
||||||
let id = peer.id
|
let id = peer.id
|
||||||
|
@ -61,11 +59,11 @@ proc rpcHandler(f: FloodSub,
|
||||||
f.peerTopics[s.topic] = initSet[string]()
|
f.peerTopics[s.topic] = initSet[string]()
|
||||||
|
|
||||||
if s.subscribe:
|
if s.subscribe:
|
||||||
debug "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
trace "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
||||||
# subscribe the peer to the topic
|
# subscribe the peer to the topic
|
||||||
f.peerTopics[s.topic].incl(id)
|
f.peerTopics[s.topic].incl(id)
|
||||||
else:
|
else:
|
||||||
debug "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
trace "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
|
||||||
# unsubscribe the peer from the topic
|
# unsubscribe the peer from the topic
|
||||||
f.peerTopics[s.topic].excl(id)
|
f.peerTopics[s.topic].excl(id)
|
||||||
|
|
||||||
|
@ -86,8 +84,7 @@ proc rpcHandler(f: FloodSub,
|
||||||
await f.peers[p].send(@[RPCMsg(messages: m.messages)])
|
await f.peers[p].send(@[RPCMsg(messages: m.messages)])
|
||||||
|
|
||||||
proc handleConn(f: FloodSub,
|
proc handleConn(f: FloodSub,
|
||||||
conn: Connection)
|
conn: Connection) {.async, gcsafe.} =
|
||||||
{.async, gcsafe.} =
|
|
||||||
## handle incoming/outgoing connections
|
## handle incoming/outgoing connections
|
||||||
##
|
##
|
||||||
## this proc will:
|
## this proc will:
|
||||||
|
@ -129,19 +126,17 @@ method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} =
|
||||||
|
|
||||||
method publish*(f: FloodSub,
|
method publish*(f: FloodSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
data: seq[byte])
|
data: seq[byte]) {.async, gcsafe.} =
|
||||||
{.async, gcsafe.} =
|
trace "about to publish message on topic", topic = topic, data = data
|
||||||
debug "about to publish message on topic", topic = topic, data = data
|
|
||||||
let msg = makeMessage(f.peerInfo.peerId.get(), data, topic)
|
let msg = makeMessage(f.peerInfo.peerId.get(), data, topic)
|
||||||
if topic in f.peerTopics:
|
if topic in f.peerTopics:
|
||||||
for p in f.peerTopics[topic]:
|
for p in f.peerTopics[topic]:
|
||||||
debug "pubslishing message", topic = topic, peer = p, data = data
|
trace "pubslishing message", topic = topic, peer = p, data = data
|
||||||
await f.peers[p].send(@[RPCMsg(messages: @[msg])])
|
await f.peers[p].send(@[RPCMsg(messages: @[msg])])
|
||||||
|
|
||||||
method subscribe*(f: FloodSub,
|
method subscribe*(f: FloodSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
handler: TopicHandler)
|
handler: TopicHandler) {.async, gcsafe.} =
|
||||||
{.async, gcsafe.} =
|
|
||||||
await procCall PubSub(f).subscribe(topic, handler)
|
await procCall PubSub(f).subscribe(topic, handler)
|
||||||
for p in f.peers.values:
|
for p in f.peers.values:
|
||||||
await f.sendSubs(p, @[topic], true)
|
await f.sendSubs(p, @[topic], true)
|
||||||
|
|
|
@ -30,24 +30,24 @@ type
|
||||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
proc handle*(p: PubSubPeer) {.async, gcsafe.} =
|
proc handle*(p: PubSubPeer) {.async, gcsafe.} =
|
||||||
debug "handling pubsub rpc", peer = p.id
|
trace "handling pubsub rpc", peer = p.id
|
||||||
try:
|
try:
|
||||||
while not p.conn.closed:
|
while not p.conn.closed:
|
||||||
let data = await p.conn.readLp()
|
let data = await p.conn.readLp()
|
||||||
debug "Read data from peer", peer = p.peerInfo, data = data.toHex()
|
trace "Read data from peer", peer = p.peerInfo, data = data.toHex()
|
||||||
let msg = decodeRpcMsg(data)
|
let msg = decodeRpcMsg(data)
|
||||||
debug "Decoded msg from peer", peer = p.peerInfo, msg = msg
|
trace "Decoded msg from peer", peer = p.peerInfo, msg = msg
|
||||||
await p.handler(p, @[msg])
|
await p.handler(p, @[msg])
|
||||||
except:
|
except:
|
||||||
debug "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg()
|
error "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg()
|
||||||
return
|
return
|
||||||
finally:
|
finally:
|
||||||
debug "closing connection to pubsub peer", peer = p.id
|
trace "closing connection to pubsub peer", peer = p.id
|
||||||
await p.conn.close()
|
await p.conn.close()
|
||||||
|
|
||||||
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} =
|
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} =
|
||||||
for m in msgs:
|
for m in msgs:
|
||||||
debug "sending msgs to peer", peer = p.id, msgs = msgs
|
trace "sending msgs to peer", peer = p.id, msgs = msgs
|
||||||
let encoded = encodeRpcMsg(m)
|
let encoded = encodeRpcMsg(m)
|
||||||
if encoded.buffer.len > 0:
|
if encoded.buffer.len > 0:
|
||||||
await p.conn.writeLp(encoded.buffer)
|
await p.conn.writeLp(encoded.buffer)
|
||||||
|
|
|
@ -58,7 +58,7 @@ proc encodeSubs(subs: SubOpts, buff: var ProtoBuffer) {.gcsafe.} =
|
||||||
|
|
||||||
proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
|
proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
|
||||||
result = initProtoBuffer({WithVarintLength})
|
result = initProtoBuffer({WithVarintLength})
|
||||||
debug "encoding msg: ", msg = msg
|
trace "encoding msg: ", msg = msg
|
||||||
|
|
||||||
if msg.subscriptions.len > 0:
|
if msg.subscriptions.len > 0:
|
||||||
var subs = initProtoBuffer()
|
var subs = initProtoBuffer()
|
||||||
|
@ -85,7 +85,7 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
|
||||||
while true:
|
while true:
|
||||||
# decode SubOpts array
|
# decode SubOpts array
|
||||||
var field = pb.enterSubMessage()
|
var field = pb.enterSubMessage()
|
||||||
debug "processing submessage", field = field
|
trace "processing submessage", field = field
|
||||||
case field:
|
case field:
|
||||||
of 0:
|
of 0:
|
||||||
break
|
break
|
||||||
|
@ -95,14 +95,14 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
|
||||||
var subscr: int
|
var subscr: int
|
||||||
discard pb.getVarintValue(1, subscr)
|
discard pb.getVarintValue(1, subscr)
|
||||||
subOpt.subscribe = cast[bool](subscr)
|
subOpt.subscribe = cast[bool](subscr)
|
||||||
debug "read subscribe field", subscribe = subOpt.subscribe
|
trace "read subscribe field", subscribe = subOpt.subscribe
|
||||||
|
|
||||||
if pb.getString(2, subOpt.topic) < 0:
|
if pb.getString(2, subOpt.topic) < 0:
|
||||||
break
|
break
|
||||||
debug "read subscribe field", topicName = subOpt.topic
|
trace "read subscribe field", topicName = subOpt.topic
|
||||||
|
|
||||||
result.subscriptions.add(subOpt)
|
result.subscriptions.add(subOpt)
|
||||||
debug "got subscriptions", subscriptions = result.subscriptions
|
trace "got subscriptions", subscriptions = result.subscriptions
|
||||||
|
|
||||||
of 2:
|
of 2:
|
||||||
result.messages = newSeq[Message]()
|
result.messages = newSeq[Message]()
|
||||||
|
@ -111,29 +111,29 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
|
||||||
var msg: Message
|
var msg: Message
|
||||||
if pb.getBytes(1, msg.fromPeer) < 0:
|
if pb.getBytes(1, msg.fromPeer) < 0:
|
||||||
break
|
break
|
||||||
debug "read message field", fromPeer = msg.fromPeer
|
trace "read message field", fromPeer = msg.fromPeer
|
||||||
|
|
||||||
if pb.getBytes(2, msg.data) < 0:
|
if pb.getBytes(2, msg.data) < 0:
|
||||||
break
|
break
|
||||||
debug "read message field", data = msg.data
|
trace "read message field", data = msg.data
|
||||||
|
|
||||||
if pb.getBytes(3, msg.seqno) < 0:
|
if pb.getBytes(3, msg.seqno) < 0:
|
||||||
break
|
break
|
||||||
debug "read message field", seqno = msg.seqno
|
trace "read message field", seqno = msg.seqno
|
||||||
|
|
||||||
var topic: string
|
var topic: string
|
||||||
while true:
|
while true:
|
||||||
if pb.getString(4, topic) < 0:
|
if pb.getString(4, topic) < 0:
|
||||||
break
|
break
|
||||||
msg.topicIDs.add(topic)
|
msg.topicIDs.add(topic)
|
||||||
debug "read message field", topicName = topic
|
trace "read message field", topicName = topic
|
||||||
topic = ""
|
topic = ""
|
||||||
|
|
||||||
discard pb.getBytes(5, msg.signature)
|
discard pb.getBytes(5, msg.signature)
|
||||||
debug "read message field", signature = msg.signature
|
trace "read message field", signature = msg.signature
|
||||||
|
|
||||||
discard pb.getBytes(6, msg.key)
|
discard pb.getBytes(6, msg.key)
|
||||||
debug "read message field", key = msg.key
|
trace "read message field", key = msg.key
|
||||||
|
|
||||||
result.messages.add(msg)
|
result.messages.add(msg)
|
||||||
else:
|
else:
|
||||||
|
@ -157,8 +157,7 @@ proc sign*(peerId: PeerID, msg: Message): Message =
|
||||||
|
|
||||||
proc makeMessage*(peerId: PeerID,
|
proc makeMessage*(peerId: PeerID,
|
||||||
data: seq[byte],
|
data: seq[byte],
|
||||||
name: string):
|
name: string): Message {.gcsafe.} =
|
||||||
Message {.gcsafe.} =
|
|
||||||
var seqno: seq[byte] = newSeq[byte](20)
|
var seqno: seq[byte] = newSeq[byte](20)
|
||||||
if randomBytes(addr seqno[0], 20) > 0:
|
if randomBytes(addr seqno[0], 20) > 0:
|
||||||
result = Message(fromPeer: peerId.getBytes(),
|
result = Message(fromPeer: peerId.getBytes(),
|
||||||
|
|
|
@ -1,40 +0,0 @@
|
||||||
## Nim-LibP2P
|
|
||||||
## Copyright (c) 2018 Status Research & Development GmbH
|
|
||||||
## Licensed under either of
|
|
||||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
||||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
||||||
## at your option.
|
|
||||||
## This file may not be copied, modified, or distributed except according to
|
|
||||||
## those terms.
|
|
||||||
|
|
||||||
type
|
|
||||||
Exchanges* {.pure.} = enum
|
|
||||||
P256 = "P-256",
|
|
||||||
P384 = "P-384",
|
|
||||||
P521 = "P-521"
|
|
||||||
|
|
||||||
Ciphers* {.pure.} = enum
|
|
||||||
AES256 = "AES-256",
|
|
||||||
AES128 = "AES-128"
|
|
||||||
|
|
||||||
Hashes* {.pure.} = enum
|
|
||||||
SHA256 = "SHA256"
|
|
||||||
SHA512 = "SHA512"
|
|
||||||
|
|
||||||
Propose* = tuple
|
|
||||||
rand: seq[byte]
|
|
||||||
pubkey: seq[byte]
|
|
||||||
exchanges: string
|
|
||||||
ciphers: string
|
|
||||||
hashes: string
|
|
||||||
|
|
||||||
Exchange = tuple
|
|
||||||
epubkey: seq[byte]
|
|
||||||
signature: seq[byte]
|
|
||||||
|
|
||||||
proc proposal*() = discard
|
|
||||||
proc exchange*() = discard
|
|
||||||
proc selectBest*() = discard
|
|
||||||
proc verify*() = discard
|
|
||||||
proc generateKeys*() = discard
|
|
||||||
proc verifyNonce*() = discard
|
|
|
@ -16,4 +16,5 @@ type
|
||||||
|
|
||||||
method secure*(p: Secure, conn: Connection): Future[Connection]
|
method secure*(p: Secure, conn: Connection): Future[Connection]
|
||||||
{.base, async, gcsafe.} =
|
{.base, async, gcsafe.} =
|
||||||
|
## default implementation matches plaintext
|
||||||
result = conn
|
result = conn
|
||||||
|
|
|
@ -95,7 +95,7 @@ method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
|
||||||
|
|
||||||
method close*(s: ChronosStream) {.async, gcsafe.} =
|
method close*(s: ChronosStream) {.async, gcsafe.} =
|
||||||
if not s.closed:
|
if not s.closed:
|
||||||
debug "closing connection for", address = $s.client.remoteAddress()
|
trace "closing connection for", address = $s.client.remoteAddress()
|
||||||
if not s.reader.closed:
|
if not s.reader.closed:
|
||||||
await s.reader.closeWait()
|
await s.reader.closeWait()
|
||||||
|
|
||||||
|
|
|
@ -84,18 +84,18 @@ proc identify*(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
if info.protos.len > 0:
|
if info.protos.len > 0:
|
||||||
peerInfo.protocols = info.protos
|
peerInfo.protocols = info.protos
|
||||||
|
|
||||||
debug "identify: identified remote peer ", peer = peerInfo.peerId.get().pretty
|
trace "identify: identified remote peer ", peer = peerInfo.peerId.get().pretty
|
||||||
except IdentityInvalidMsgError as exc:
|
except IdentityInvalidMsgError as exc:
|
||||||
debug "identify: invalid message", msg = exc.msg
|
error "identify: invalid message", msg = exc.msg
|
||||||
except IdentityNoMatchError as exc:
|
except IdentityNoMatchError as exc:
|
||||||
debug "identify: peer's public keys don't match ", msg = exc.msg
|
error "identify: peer's public keys don't match ", msg = exc.msg
|
||||||
|
|
||||||
proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
|
proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
|
||||||
debug "muxing connection"
|
trace "muxing connection"
|
||||||
## mux incoming connection
|
## mux incoming connection
|
||||||
let muxers = toSeq(s.muxers.keys)
|
let muxers = toSeq(s.muxers.keys)
|
||||||
if muxers.len == 0:
|
if muxers.len == 0:
|
||||||
trace "no muxers registered, skipping upgrade flow"
|
warn "no muxers registered, skipping upgrade flow"
|
||||||
return
|
return
|
||||||
|
|
||||||
let muxerName = await s.ms.select(conn, muxers)
|
let muxerName = await s.ms.select(conn, muxers)
|
||||||
|
@ -114,7 +114,7 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
|
||||||
# add muxer handler cleanup proc
|
# add muxer handler cleanup proc
|
||||||
handlerFut.addCallback(
|
handlerFut.addCallback(
|
||||||
proc(udata: pointer = nil) {.gcsafe.} =
|
proc(udata: pointer = nil) {.gcsafe.} =
|
||||||
debug "mux: Muxer handler completed for peer ",
|
trace "mux: Muxer handler completed for peer ",
|
||||||
peer = conn.peerInfo.peerId.get().pretty
|
peer = conn.peerInfo.peerId.get().pretty
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -139,7 +139,7 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
|
||||||
s.muxed[conn.peerInfo.peerId.get().pretty] = muxer
|
s.muxed[conn.peerInfo.peerId.get().pretty] = muxer
|
||||||
|
|
||||||
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||||
debug "handling connection", conn = conn
|
trace "handling connection", conn = conn
|
||||||
result = conn
|
result = conn
|
||||||
## perform upgrade flow
|
## perform upgrade flow
|
||||||
if result.peerInfo.peerId.isSome:
|
if result.peerInfo.peerId.isSome:
|
||||||
|
@ -191,9 +191,9 @@ proc dial*(s: Switch,
|
||||||
trace "connection is muxed, return muxed stream"
|
trace "connection is muxed, return muxed stream"
|
||||||
result = stream.get()
|
result = stream.get()
|
||||||
|
|
||||||
debug "dial: attempting to select remote ", proto = proto
|
trace "dial: attempting to select remote ", proto = proto
|
||||||
if not (await s.ms.select(result, proto)):
|
if not (await s.ms.select(result, proto)):
|
||||||
debug "dial: Unable to select protocol: ", proto = proto
|
error "dial: Unable to select protocol: ", proto = proto
|
||||||
raise newException(CatchableError,
|
raise newException(CatchableError,
|
||||||
&"Unable to select protocol: {proto}")
|
&"Unable to select protocol: {proto}")
|
||||||
|
|
||||||
|
@ -209,14 +209,14 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
|
||||||
s.ms.addHandler(proto.codec, proto)
|
s.ms.addHandler(proto.codec, proto)
|
||||||
|
|
||||||
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
debug "upgrading incoming connection"
|
trace "upgrading incoming connection"
|
||||||
let ms = newMultistream()
|
let ms = newMultistream()
|
||||||
|
|
||||||
# secure incoming connections
|
# secure incoming connections
|
||||||
proc securedHandler (conn: Connection,
|
proc securedHandler (conn: Connection,
|
||||||
proto: string)
|
proto: string)
|
||||||
{.async, gcsafe, closure.} =
|
{.async, gcsafe, closure.} =
|
||||||
debug "Securing connection"
|
trace "Securing connection"
|
||||||
let secure = s.secureManagers[proto]
|
let secure = s.secureManagers[proto]
|
||||||
let sconn = await secure.secure(conn)
|
let sconn = await secure.secure(conn)
|
||||||
if not isNil(sconn):
|
if not isNil(sconn):
|
||||||
|
@ -305,17 +305,17 @@ proc newSwitch*(peerInfo: PeerInfo,
|
||||||
for key, val in muxers:
|
for key, val in muxers:
|
||||||
val.streamHandler = result.streamHandler
|
val.streamHandler = result.streamHandler
|
||||||
val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} =
|
val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} =
|
||||||
debug "got new muxer"
|
trace "got new muxer"
|
||||||
let stream = await muxer.newStream()
|
let stream = await muxer.newStream()
|
||||||
await s.identify(stream)
|
await s.identify(stream)
|
||||||
|
|
||||||
for k in secureManagers.keys:
|
for k in secureManagers.keys:
|
||||||
debug "adding secure manager ", codec = secureManagers[k].codec
|
trace "adding secure manager ", codec = secureManagers[k].codec
|
||||||
result.secureManagers[k] = secureManagers[k]
|
result.secureManagers[k] = secureManagers[k]
|
||||||
|
|
||||||
if result.secureManagers.len == 0:
|
if result.secureManagers.len == 0:
|
||||||
# use plain text if no secure managers are provided
|
# use plain text if no secure managers are provided
|
||||||
debug "no secure managers, falling back to palin text", codec = PlainTextCodec
|
warn "no secure managers, falling back to palin text", codec = PlainTextCodec
|
||||||
result.secureManagers[PlainTextCodec] = Secure(newPlainText())
|
result.secureManagers[PlainTextCodec] = Secure(newPlainText())
|
||||||
|
|
||||||
if pubSub.isSome:
|
if pubSub.isSome:
|
||||||
|
|
|
@ -23,7 +23,7 @@ proc connHandler*(t: Transport,
|
||||||
client: StreamTransport,
|
client: StreamTransport,
|
||||||
initiator: bool = false):
|
initiator: bool = false):
|
||||||
Future[Connection] {.async, gcsafe.} =
|
Future[Connection] {.async, gcsafe.} =
|
||||||
debug "handling connection for", address = $client.remoteAddress
|
trace "handling connection for", address = $client.remoteAddress
|
||||||
let conn: Connection = newConnection(newChronosStream(server, client))
|
let conn: Connection = newConnection(newChronosStream(server, client))
|
||||||
if not initiator:
|
if not initiator:
|
||||||
let handlerFut = if t.handler == nil: nil else: t.handler(conn)
|
let handlerFut = if t.handler == nil: nil else: t.handler(conn)
|
||||||
|
@ -34,7 +34,7 @@ proc connHandler*(t: Transport,
|
||||||
|
|
||||||
proc connCb(server: StreamServer,
|
proc connCb(server: StreamServer,
|
||||||
client: StreamTransport) {.async, gcsafe.} =
|
client: StreamTransport) {.async, gcsafe.} =
|
||||||
debug "incomming connection for", address = $client.remoteAddress
|
trace "incomming connection for", address = $client.remoteAddress
|
||||||
let t: Transport = cast[Transport](server.udata)
|
let t: Transport = cast[Transport](server.udata)
|
||||||
discard t.connHandler(server, client)
|
discard t.connHandler(server, client)
|
||||||
|
|
||||||
|
@ -43,12 +43,12 @@ method init*(t: TcpTransport) =
|
||||||
|
|
||||||
method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
|
method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
|
||||||
## start the transport
|
## start the transport
|
||||||
debug "stopping transport"
|
trace "stopping transport"
|
||||||
await procCall Transport(t).close() # call base
|
await procCall Transport(t).close() # call base
|
||||||
|
|
||||||
t.server.stop()
|
t.server.stop()
|
||||||
t.server.close()
|
t.server.close()
|
||||||
debug "transport stopped"
|
trace "transport stopped"
|
||||||
|
|
||||||
method listen*(t: TcpTransport,
|
method listen*(t: TcpTransport,
|
||||||
ma: MultiAddress,
|
ma: MultiAddress,
|
||||||
|
@ -66,7 +66,7 @@ method listen*(t: TcpTransport,
|
||||||
method dial*(t: TcpTransport,
|
method dial*(t: TcpTransport,
|
||||||
address: MultiAddress):
|
address: MultiAddress):
|
||||||
Future[Connection] {.async, gcsafe.} =
|
Future[Connection] {.async, gcsafe.} =
|
||||||
debug "dialing remote peer", address = $address
|
trace "dialing remote peer", address = $address
|
||||||
## dial a peer
|
## dial a peer
|
||||||
let client: StreamTransport = await connect(address)
|
let client: StreamTransport = await connect(address)
|
||||||
result = await t.connHandler(t.server, client, true)
|
result = await t.connHandler(t.server, client, true)
|
||||||
|
|
Loading…
Reference in New Issue