From ac982a6767914a6826b181f3a8a5adb622550c58 Mon Sep 17 00:00:00 2001 From: nicksavers Date: Thu, 2 Oct 2014 00:23:12 +0200 Subject: [PATCH] Move console logging to MessageQueue --- .../java/org/ethereum/net/MessageQueue.java | 10 +- .../ethereum/net/client/ProtocolHandler.java | 149 +++++++++--------- 2 files changed, 83 insertions(+), 76 deletions(-) diff --git a/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java b/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java index e3628634..cbd817c4 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java @@ -32,11 +32,13 @@ public class MessageQueue { private Logger logger = LoggerFactory.getLogger("wire"); private Queue messageQueue = new ConcurrentLinkedQueue<>(); + private PeerListener listener; private ChannelHandlerContext ctx = null; private final Timer timer = new Timer(); - public MessageQueue(ChannelHandlerContext ctx) { + public MessageQueue(ChannelHandlerContext ctx, PeerListener listener) { this.ctx = ctx; + this.listener = listener; timer.scheduleAtFixedRate(new TimerTask() { public void run() { @@ -51,11 +53,13 @@ public class MessageQueue { public void receivedMessage(Message msg) throws InterruptedException { + if (listener != null) + listener.console("[Recv: " + msg + "]"); if (logger.isInfoEnabled()) logger.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), msg); if (logger.isDebugEnabled()) logger.debug("Encoded: [{}]", Hex.toHexString(msg.getEncoded())); - + if (messageQueue.peek() != null) { MessageRoundtrip messageRoundtrip = messageQueue.peek(); Message waitingMessage = messageRoundtrip.getMsg(); @@ -104,6 +108,8 @@ public class MessageQueue { private void sendToWire(Message msg) { + if (listener != null) + listener.console("[Send: " + msg + "]"); if (logger.isInfoEnabled()) logger.info("To: \t{} \tSend: \t{}", ctx.channel().remoteAddress(), msg); if (logger.isDebugEnabled()) diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java index d0e47e41..3f1e45d2 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java @@ -95,7 +95,7 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { - msgQueue = new MessageQueue(ctx); + msgQueue = new MessageQueue(ctx, peerListener); sendHello(); } @@ -103,125 +103,104 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { public void channelRead(final ChannelHandlerContext ctx, Object msg) throws InterruptedException { byte[] payload = (byte[]) msg; - Command receivedCommand = Command.fromInt(RLP.getCommandCode(payload)); - if (peerListener != null) peerListener.console("[Recv: " + receivedCommand.name() + "]"); + Command receivedCommand = Command.fromInt(RLP.getCommandCode(payload)); switch(receivedCommand) { case HELLO: HelloMessage helloMessage = new HelloMessage(payload); - if (peerListener != null) peerListener.console(helloMessage.toString()); msgQueue.receivedMessage(helloMessage); + setHandshake(helloMessage, ctx); - if (listener != null) - listener.onRecvMessage(helloMessage); + + if (listener != null) listener.onRecvMessage(helloMessage); break; case STATUS: StatusMessage statusMessage = new StatusMessage(payload); - if (peerListener != null) peerListener.console(statusMessage.toString()); + msgQueue.receivedMessage(statusMessage); + processStatus(statusMessage); - if (listener != null) - listener.onRecvMessage(statusMessage); - msgQueue.receivedMessage(statusMessage); + + if (listener != null) listener.onRecvMessage(statusMessage); break; case DISCONNECT: DisconnectMessage disconnectMessage = new DisconnectMessage(payload); msgQueue.receivedMessage(disconnectMessage); + ctx.close().sync(); ctx.disconnect().sync(); - if (peerListener != null) peerListener.console(disconnectMessage.toString()); - if (listener != null) - listener.onRecvMessage(disconnectMessage); + + if (listener != null) listener.onRecvMessage(disconnectMessage); break; case PING: msgQueue.receivedMessage(StaticMessages.PING_MESSAGE); + sendPong(); - if (listener != null) - listener.onRecvMessage(PING_MESSAGE); + + if (listener != null) listener.onRecvMessage(PING_MESSAGE); break; case PONG: - msgQueue.receivedMessage(StaticMessages.PONG_MESSAGE); + msgQueue.receivedMessage(PONG_MESSAGE); + lastPongTime = System.currentTimeMillis(); - if (listener != null) - listener.onRecvMessage(PONG_MESSAGE); + + if (listener != null) listener.onRecvMessage(PONG_MESSAGE); break; case GET_PEERS: - msgQueue.receivedMessage(StaticMessages.GET_PEERS_MESSAGE); + msgQueue.receivedMessage(GET_PEERS_MESSAGE); + sendPeers(); - if (listener != null) - listener.onRecvMessage(GET_PEERS_MESSAGE); + + if (listener != null) listener.onRecvMessage(GET_PEERS_MESSAGE); break; case PEERS: PeersMessage peersMessage = new PeersMessage(payload); msgQueue.receivedMessage(peersMessage); - if (peerListener != null) peerListener.console(peersMessage.toString()); - WorldManager.getInstance().addPeers(peersMessage.getPeers()); - if (listener != null) - listener.onRecvMessage(peersMessage); + processPeers(peersMessage); + + if (listener != null) listener.onRecvMessage(peersMessage); break; case TRANSACTIONS: TransactionsMessage transactionsMessage = new TransactionsMessage(payload); msgQueue.receivedMessage(transactionsMessage); - if (peerListener != null) peerListener.console(transactionsMessage.toString()); - + // List txList = transactionsMessage.getTransactions(); // for(Transaction tx : txList) // WorldManager.getInstance().getBlockchain().applyTransaction(null, tx); // WorldManager.getInstance().getWallet().addTransaction(tx); - if (listener != null) - listener.onRecvMessage(transactionsMessage); + if (listener != null) listener.onRecvMessage(transactionsMessage); break; case BLOCKS: BlocksMessage blocksMessage = new BlocksMessage(payload); - List blockList = blocksMessage.getBlocks(); msgQueue.receivedMessage(blocksMessage); - if (peerListener != null) peerListener.console(blocksMessage.toString()); - - // If we get one block from a peer we ask less greedy - if (blockList.size() <= 1 && secToAskForBlocks != 10) { - logger.info("Now we ask for blocks every 10 seconds"); - updateBlockAskTimer(10); - } - - // If we get more blocks from a peer we ask more greedy - if (blockList.size() > 2 && secToAskForBlocks != 1) { - logger.info("Now we ask for a chain every 1 seconds"); - updateBlockAskTimer(1); - } - - if (blockList.isEmpty()) return; - WorldManager.getInstance().getBlockchain().getBlockQueue().addBlocks(blockList); - if (listener != null) - listener.onRecvMessage(blocksMessage); + + processBlocks(blocksMessage); + + if (listener != null) listener.onRecvMessage(blocksMessage); break; case GET_TRANSACTIONS: - msgQueue.receivedMessage(StaticMessages.GET_TRANSACTIONS_MESSAGE); - + msgQueue.receivedMessage(GET_TRANSACTIONS_MESSAGE); + sendPendingTransactions(); - if (listener != null) - listener.onRecvMessage(GET_TRANSACTIONS_MESSAGE); + if (listener != null) listener.onRecvMessage(GET_TRANSACTIONS_MESSAGE); break; case GET_BLOCK_HASHES: GetBlockHashesMessage getBlockHashesMessage = new GetBlockHashesMessage(payload); msgQueue.receivedMessage(getBlockHashesMessage); - if (peerListener != null) peerListener.console(getBlockHashesMessage.toString()); - - sendBlockHashes(); - if (listener != null) - listener.onRecvMessage(getBlockHashesMessage); + sendBlockHashes(); + + if (listener != null) listener.onRecvMessage(getBlockHashesMessage); break; case BLOCK_HASHES: BlockHashesMessage blockHashesMessage = new BlockHashesMessage(payload); msgQueue.receivedMessage(blockHashesMessage); - if (peerListener != null) peerListener.console(blockHashesMessage.toString()); processBlockHashes(blockHashesMessage); - if (listener != null) - listener.onRecvMessage(blockHashesMessage); + if (listener != null) listener.onRecvMessage(blockHashesMessage); break; case GET_BLOCKS: GetBlocksMessage getBlocksMessage = new GetBlocksMessage(payload); @@ -229,26 +208,15 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { sendBlocks(); - if (listener != null) - listener.onRecvMessage(getBlocksMessage); + if (listener != null) listener.onRecvMessage(getBlocksMessage); break; default: break; } } - private void sendPendingTransactions() { - Set pendingTxs = - WorldManager.getInstance().getPendingTransactions(); - TransactionsMessage msg = new TransactionsMessage(pendingTxs); - sendMsg(msg); - } - - private void processBlockHashes(BlockHashesMessage blockHashesMessage) { - // for each block hash - // check if blockHash != known hash - // store blockhash - // if no known hash has been reached, another getBlockHashes with last stored hash. + private void processPeers(PeersMessage peersMessage) { + WorldManager.getInstance().addPeers(peersMessage.getPeers()); } /** @@ -272,7 +240,33 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { } // TODO: discard peer from pool } + + private void processBlockHashes(BlockHashesMessage blockHashesMessage) { + // for each block hash + // check if blockHash != known hash + // store blockhash + // if no known hash has been reached, another getBlockHashes with last stored hash. + } + + private void processBlocks(BlocksMessage blocksMessage) { + List blockList = blocksMessage.getBlocks(); + // If we get one block from a peer we ask less greedy + if (blockList.size() <= 1 && secToAskForBlocks != 10) { + logger.info("Now we ask for blocks every 10 seconds"); + updateBlockAskTimer(10); + } + // If we get more blocks from a peer we ask more greedy + if (blockList.size() > 2 && secToAskForBlocks != 1) { + logger.info("Now we ask for a chain every 1 seconds"); + updateBlockAskTimer(1); + } + + if (blockList.isEmpty()) return; + WorldManager.getInstance().getBlockchain().getBlockQueue().addBlocks(blockList); + + } + private void sendMsg(Message msg) { msgQueue.sendMessage(msg); @@ -379,6 +373,13 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { sendMsg(msg); } + private void sendPendingTransactions() { + Set pendingTxs = + WorldManager.getInstance().getPendingTransactions(); + TransactionsMessage msg = new TransactionsMessage(pendingTxs); + sendMsg(msg); + } + private void sendBlocks() { // TODO: Send blocks }