Move console logging to MessageQueue

This commit is contained in:
nicksavers 2014-10-02 00:23:12 +02:00
parent c167f1947e
commit ac982a6767
2 changed files with 83 additions and 76 deletions

View File

@ -32,11 +32,13 @@ public class MessageQueue {
private Logger logger = LoggerFactory.getLogger("wire"); private Logger logger = LoggerFactory.getLogger("wire");
private Queue<MessageRoundtrip> messageQueue = new ConcurrentLinkedQueue<>(); private Queue<MessageRoundtrip> messageQueue = new ConcurrentLinkedQueue<>();
private PeerListener listener;
private ChannelHandlerContext ctx = null; private ChannelHandlerContext ctx = null;
private final Timer timer = new Timer(); private final Timer timer = new Timer();
public MessageQueue(ChannelHandlerContext ctx) { public MessageQueue(ChannelHandlerContext ctx, PeerListener listener) {
this.ctx = ctx; this.ctx = ctx;
this.listener = listener;
timer.scheduleAtFixedRate(new TimerTask() { timer.scheduleAtFixedRate(new TimerTask() {
public void run() { public void run() {
@ -51,6 +53,8 @@ public class MessageQueue {
public void receivedMessage(Message msg) throws InterruptedException { public void receivedMessage(Message msg) throws InterruptedException {
if (listener != null)
listener.console("[Recv: " + msg + "]");
if (logger.isInfoEnabled()) if (logger.isInfoEnabled())
logger.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), msg); logger.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), msg);
if (logger.isDebugEnabled()) if (logger.isDebugEnabled())
@ -104,6 +108,8 @@ public class MessageQueue {
private void sendToWire(Message msg) { private void sendToWire(Message msg) {
if (listener != null)
listener.console("[Send: " + msg + "]");
if (logger.isInfoEnabled()) if (logger.isInfoEnabled())
logger.info("To: \t{} \tSend: \t{}", ctx.channel().remoteAddress(), msg); logger.info("To: \t{} \tSend: \t{}", ctx.channel().remoteAddress(), msg);
if (logger.isDebugEnabled()) if (logger.isDebugEnabled())

View File

@ -95,7 +95,7 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelActive(final ChannelHandlerContext ctx) { public void channelActive(final ChannelHandlerContext ctx) {
msgQueue = new MessageQueue(ctx); msgQueue = new MessageQueue(ctx, peerListener);
sendHello(); sendHello();
} }
@ -104,124 +104,103 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter {
byte[] payload = (byte[]) msg; byte[] payload = (byte[]) msg;
Command receivedCommand = Command.fromInt(RLP.getCommandCode(payload)); Command receivedCommand = Command.fromInt(RLP.getCommandCode(payload));
if (peerListener != null) peerListener.console("[Recv: " + receivedCommand.name() + "]");
switch(receivedCommand) { switch(receivedCommand) {
case HELLO: case HELLO:
HelloMessage helloMessage = new HelloMessage(payload); HelloMessage helloMessage = new HelloMessage(payload);
if (peerListener != null) peerListener.console(helloMessage.toString());
msgQueue.receivedMessage(helloMessage); msgQueue.receivedMessage(helloMessage);
setHandshake(helloMessage, ctx); setHandshake(helloMessage, ctx);
if (listener != null)
listener.onRecvMessage(helloMessage); if (listener != null) listener.onRecvMessage(helloMessage);
break; break;
case STATUS: case STATUS:
StatusMessage statusMessage = new StatusMessage(payload); StatusMessage statusMessage = new StatusMessage(payload);
if (peerListener != null) peerListener.console(statusMessage.toString());
processStatus(statusMessage);
if (listener != null)
listener.onRecvMessage(statusMessage);
msgQueue.receivedMessage(statusMessage); msgQueue.receivedMessage(statusMessage);
processStatus(statusMessage);
if (listener != null) listener.onRecvMessage(statusMessage);
break; break;
case DISCONNECT: case DISCONNECT:
DisconnectMessage disconnectMessage = new DisconnectMessage(payload); DisconnectMessage disconnectMessage = new DisconnectMessage(payload);
msgQueue.receivedMessage(disconnectMessage); msgQueue.receivedMessage(disconnectMessage);
ctx.close().sync(); ctx.close().sync();
ctx.disconnect().sync(); ctx.disconnect().sync();
if (peerListener != null) peerListener.console(disconnectMessage.toString());
if (listener != null) if (listener != null) listener.onRecvMessage(disconnectMessage);
listener.onRecvMessage(disconnectMessage);
break; break;
case PING: case PING:
msgQueue.receivedMessage(StaticMessages.PING_MESSAGE); msgQueue.receivedMessage(StaticMessages.PING_MESSAGE);
sendPong(); sendPong();
if (listener != null)
listener.onRecvMessage(PING_MESSAGE); if (listener != null) listener.onRecvMessage(PING_MESSAGE);
break; break;
case PONG: case PONG:
msgQueue.receivedMessage(StaticMessages.PONG_MESSAGE); msgQueue.receivedMessage(PONG_MESSAGE);
lastPongTime = System.currentTimeMillis(); lastPongTime = System.currentTimeMillis();
if (listener != null)
listener.onRecvMessage(PONG_MESSAGE); if (listener != null) listener.onRecvMessage(PONG_MESSAGE);
break; break;
case GET_PEERS: case GET_PEERS:
msgQueue.receivedMessage(StaticMessages.GET_PEERS_MESSAGE); msgQueue.receivedMessage(GET_PEERS_MESSAGE);
sendPeers(); sendPeers();
if (listener != null)
listener.onRecvMessage(GET_PEERS_MESSAGE); if (listener != null) listener.onRecvMessage(GET_PEERS_MESSAGE);
break; break;
case PEERS: case PEERS:
PeersMessage peersMessage = new PeersMessage(payload); PeersMessage peersMessage = new PeersMessage(payload);
msgQueue.receivedMessage(peersMessage); msgQueue.receivedMessage(peersMessage);
if (peerListener != null) peerListener.console(peersMessage.toString());
WorldManager.getInstance().addPeers(peersMessage.getPeers()); processPeers(peersMessage);
if (listener != null)
listener.onRecvMessage(peersMessage); if (listener != null) listener.onRecvMessage(peersMessage);
break; break;
case TRANSACTIONS: case TRANSACTIONS:
TransactionsMessage transactionsMessage = new TransactionsMessage(payload); TransactionsMessage transactionsMessage = new TransactionsMessage(payload);
msgQueue.receivedMessage(transactionsMessage); msgQueue.receivedMessage(transactionsMessage);
if (peerListener != null) peerListener.console(transactionsMessage.toString());
// List<Transaction> txList = transactionsMessage.getTransactions(); // List<Transaction> txList = transactionsMessage.getTransactions();
// for(Transaction tx : txList) // for(Transaction tx : txList)
// WorldManager.getInstance().getBlockchain().applyTransaction(null, tx); // WorldManager.getInstance().getBlockchain().applyTransaction(null, tx);
// WorldManager.getInstance().getWallet().addTransaction(tx); // WorldManager.getInstance().getWallet().addTransaction(tx);
if (listener != null) if (listener != null) listener.onRecvMessage(transactionsMessage);
listener.onRecvMessage(transactionsMessage);
break; break;
case BLOCKS: case BLOCKS:
BlocksMessage blocksMessage = new BlocksMessage(payload); BlocksMessage blocksMessage = new BlocksMessage(payload);
List<Block> blockList = blocksMessage.getBlocks();
msgQueue.receivedMessage(blocksMessage); msgQueue.receivedMessage(blocksMessage);
if (peerListener != null) peerListener.console(blocksMessage.toString());
// If we get one block from a peer we ask less greedy processBlocks(blocksMessage);
if (blockList.size() <= 1 && secToAskForBlocks != 10) {
logger.info("Now we ask for blocks every 10 seconds");
updateBlockAskTimer(10);
}
// If we get more blocks from a peer we ask more greedy if (listener != null) listener.onRecvMessage(blocksMessage);
if (blockList.size() > 2 && secToAskForBlocks != 1) {
logger.info("Now we ask for a chain every 1 seconds");
updateBlockAskTimer(1);
}
if (blockList.isEmpty()) return;
WorldManager.getInstance().getBlockchain().getBlockQueue().addBlocks(blockList);
if (listener != null)
listener.onRecvMessage(blocksMessage);
break; break;
case GET_TRANSACTIONS: case GET_TRANSACTIONS:
msgQueue.receivedMessage(StaticMessages.GET_TRANSACTIONS_MESSAGE); msgQueue.receivedMessage(GET_TRANSACTIONS_MESSAGE);
sendPendingTransactions(); sendPendingTransactions();
if (listener != null) if (listener != null) listener.onRecvMessage(GET_TRANSACTIONS_MESSAGE);
listener.onRecvMessage(GET_TRANSACTIONS_MESSAGE);
break; break;
case GET_BLOCK_HASHES: case GET_BLOCK_HASHES:
GetBlockHashesMessage getBlockHashesMessage = new GetBlockHashesMessage(payload); GetBlockHashesMessage getBlockHashesMessage = new GetBlockHashesMessage(payload);
msgQueue.receivedMessage(getBlockHashesMessage); msgQueue.receivedMessage(getBlockHashesMessage);
if (peerListener != null) peerListener.console(getBlockHashesMessage.toString());
sendBlockHashes(); sendBlockHashes();
if (listener != null) if (listener != null) listener.onRecvMessage(getBlockHashesMessage);
listener.onRecvMessage(getBlockHashesMessage);
break; break;
case BLOCK_HASHES: case BLOCK_HASHES:
BlockHashesMessage blockHashesMessage = new BlockHashesMessage(payload); BlockHashesMessage blockHashesMessage = new BlockHashesMessage(payload);
msgQueue.receivedMessage(blockHashesMessage); msgQueue.receivedMessage(blockHashesMessage);
if (peerListener != null) peerListener.console(blockHashesMessage.toString());
processBlockHashes(blockHashesMessage); processBlockHashes(blockHashesMessage);
if (listener != null) if (listener != null) listener.onRecvMessage(blockHashesMessage);
listener.onRecvMessage(blockHashesMessage);
break; break;
case GET_BLOCKS: case GET_BLOCKS:
GetBlocksMessage getBlocksMessage = new GetBlocksMessage(payload); GetBlocksMessage getBlocksMessage = new GetBlocksMessage(payload);
@ -229,26 +208,15 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter {
sendBlocks(); sendBlocks();
if (listener != null) if (listener != null) listener.onRecvMessage(getBlocksMessage);
listener.onRecvMessage(getBlocksMessage);
break; break;
default: default:
break; break;
} }
} }
private void sendPendingTransactions() { private void processPeers(PeersMessage peersMessage) {
Set<Transaction> pendingTxs = WorldManager.getInstance().addPeers(peersMessage.getPeers());
WorldManager.getInstance().getPendingTransactions();
TransactionsMessage msg = new TransactionsMessage(pendingTxs);
sendMsg(msg);
}
private void processBlockHashes(BlockHashesMessage blockHashesMessage) {
// for each block hash
// check if blockHash != known hash
// store blockhash
// if no known hash has been reached, another getBlockHashes with last stored hash.
} }
/** /**
@ -273,6 +241,32 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter {
// TODO: discard peer from pool // TODO: discard peer from pool
} }
private void processBlockHashes(BlockHashesMessage blockHashesMessage) {
// for each block hash
// check if blockHash != known hash
// store blockhash
// if no known hash has been reached, another getBlockHashes with last stored hash.
}
private void processBlocks(BlocksMessage blocksMessage) {
List<Block> blockList = blocksMessage.getBlocks();
// If we get one block from a peer we ask less greedy
if (blockList.size() <= 1 && secToAskForBlocks != 10) {
logger.info("Now we ask for blocks every 10 seconds");
updateBlockAskTimer(10);
}
// If we get more blocks from a peer we ask more greedy
if (blockList.size() > 2 && secToAskForBlocks != 1) {
logger.info("Now we ask for a chain every 1 seconds");
updateBlockAskTimer(1);
}
if (blockList.isEmpty()) return;
WorldManager.getInstance().getBlockchain().getBlockQueue().addBlocks(blockList);
}
private void sendMsg(Message msg) { private void sendMsg(Message msg) {
msgQueue.sendMessage(msg); msgQueue.sendMessage(msg);
@ -379,6 +373,13 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter {
sendMsg(msg); sendMsg(msg);
} }
private void sendPendingTransactions() {
Set<Transaction> pendingTxs =
WorldManager.getInstance().getPendingTransactions();
TransactionsMessage msg = new TransactionsMessage(pendingTxs);
sendMsg(msg);
}
private void sendBlocks() { private void sendBlocks() {
// TODO: Send blocks // TODO: Send blocks
} }