Use separate handler and decoders

This commit is contained in:
nicksavers 2014-10-04 19:52:56 +02:00
parent 8642f06e21
commit 1e3b496e8e
24 changed files with 621 additions and 169 deletions

View File

@ -67,7 +67,7 @@ public interface Ethereum {
public void connect(InetAddress addr, int port);
public void connect(String ip, int port);
public Blockchain getBlockChain();
public Blockchain getBlockchain();
public boolean isBlockchainLoading();

View File

@ -76,7 +76,6 @@ public class EthereumImpl implements Ethereum {
@Override
public Peer waitForOnlinePeer() {
Peer peer = null;
while (peer == null) {
try {
@ -116,7 +115,7 @@ public class EthereumImpl implements Ethereum {
}
@Override
public Blockchain getBlockChain() {
public Blockchain getBlockchain() {
return WorldManager.getInstance().getBlockchain();
}

View File

@ -2,8 +2,6 @@ package org.ethereum.manager;
import static org.ethereum.config.SystemProperties.CONFIG;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import org.ethereum.core.BlockchainImpl;
@ -15,7 +13,6 @@ import org.ethereum.facade.Blockchain;
import org.ethereum.facade.Repository;
import org.ethereum.listener.EthereumListener;
import org.ethereum.net.client.PeerClient;
import org.ethereum.net.client.Peer;
import org.ethereum.net.client.PeerDiscovery;
/**
@ -92,29 +89,6 @@ public class WorldManager {
public EthereumListener getListener() {
return listener;
}
public Set<Peer> parsePeerDiscoveryIpList(final String peerDiscoveryIpList) {
final List<String> ipList = Arrays.asList(peerDiscoveryIpList.split(","));
final Set<Peer> peers = new HashSet<>();
for (String ip : ipList){
String[] addr = ip.trim().split(":");
String ip_trim = addr[0];
String port_trim = addr[1];
try {
InetAddress iAddr = InetAddress.getByName(ip_trim);
int port = Integer.parseInt(port_trim);
Peer peer = new Peer(iAddr, port, null);
peers.add(peer);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
return peers;
}
public void setWallet(Wallet wallet) {
this.wallet = wallet;

View File

@ -1,13 +1,10 @@
package org.ethereum.net;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.ethereum.net.message.*;
import org.ethereum.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -55,14 +52,12 @@ public class MessageQueue {
if (listener != null)
listener.console("[Recv: " + msg + "]");
if (logger.isInfoEnabled()
&& msg.getCommand() != Command.PING
&& msg.getCommand() != Command.PONG
&& msg.getCommand() != Command.PEERS
&& msg.getCommand() != Command.GET_PEERS)
if (logger.isInfoEnabled())
// && msg.getCommand() != Command.PING
// && msg.getCommand() != Command.PONG
// && msg.getCommand() != Command.PEERS
// && msg.getCommand() != Command.GET_PEERS)
logger.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), msg);
if (logger.isDebugEnabled())
logger.debug("Encoded: [{}]", Hex.toHexString(msg.getEncoded()));
if (messageQueue.peek() != null) {
MessageRoundtrip messageRoundtrip = messageQueue.peek();
@ -75,8 +70,6 @@ public class MessageQueue {
messageRoundtrip.getMsg().getCommand());
}
}
if (msg instanceof DisconnectMessage)
disconnect();
}
private void nudgeQueue() {
@ -90,11 +83,11 @@ public class MessageQueue {
}
// Now send the next message
if (null != messageQueue.peek()) {
if (messageQueue.peek() != null) {
MessageRoundtrip messageRoundtrip = messageQueue.peek();
if (messageRoundtrip.getRetryTimes() == 0) {
// todo: retry logic || messageRoundtrip.hasToRetry()){
// TODO: retry logic || messageRoundtrip.hasToRetry()){
Message msg = messageRoundtrip.getMsg();
sendToWire(msg);
@ -113,30 +106,13 @@ public class MessageQueue {
if (listener != null)
listener.console("[Send: " + msg + "]");
if (logger.isInfoEnabled()
&& msg.getCommand() != Command.PING
&& msg.getCommand() != Command.PONG
&& msg.getCommand() != Command.PEERS
&& msg.getCommand() != Command.GET_PEERS)
if (logger.isInfoEnabled())
// && msg.getCommand() != Command.PING
// && msg.getCommand() != Command.PONG
// && msg.getCommand() != Command.PEERS
// && msg.getCommand() != Command.GET_PEERS)
logger.info("To: \t{} \tSend: \t{}", ctx.channel().remoteAddress(), msg);
if (logger.isDebugEnabled())
logger.debug("Encdoded: [{}]", Hex.toHexString(msg.getEncoded()));
int packetLength = StaticMessages.SYNC_TOKEN.length + msg.getEncoded().length;
ByteBuf buffer = ctx.alloc().buffer(packetLength);
buffer.writeBytes(StaticMessages.SYNC_TOKEN);
buffer.writeBytes(ByteUtil.calcPacketLength(msg.getEncoded()));
buffer.writeBytes(msg.getEncoded());
ctx.writeAndFlush(buffer);
if(msg instanceof DisconnectMessage)
disconnect();
ctx.writeAndFlush(msg);
}
private void disconnect() {
throw new DisconnectException();
}
@SuppressWarnings("serial")
public class DisconnectException extends RuntimeException {}
}

View File

@ -1,6 +1,5 @@
package org.ethereum.net.client;
import org.ethereum.net.message.HelloMessage;
import org.ethereum.util.RLP;
import org.spongycastle.util.encoders.Hex;
@ -20,7 +19,6 @@ public class Peer {
private byte[] peerId;
private List<String> capabilities;
private HelloMessage handshake;
private transient boolean isOnline = false;
private transient long lastCheckTime = 0;
@ -63,19 +61,7 @@ public class Peer {
}
public List<String> getCapabilities() {
if (handshake != null)
return handshake.getCapabilities();
else
return new ArrayList<String>();
}
public HelloMessage getHandshake() {
return handshake;
}
public void setHandshake(HelloMessage handshake) {
this.handshake = handshake;
return capabilities;
}
public byte[] getEncoded() {

View File

@ -6,10 +6,11 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.ethereum.listener.EthereumListener;
import org.ethereum.manager.WorldManager;
import org.ethereum.net.EthereumMessageSizeEstimator;
import org.ethereum.net.PeerListener;
import org.ethereum.net.handler.P2pHandler;
import org.ethereum.net.handler.PacketDecoder;
import org.ethereum.net.handler.PacketEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -19,7 +20,7 @@ import java.util.concurrent.TimeUnit;
import static org.ethereum.config.SystemProperties.CONFIG;
/**
* This class creates the initial connection using the Netty framework
* This class creates the connection to an remote address using the Netty framework
* @see <a href="http://netty.io">http://netty.io</a>
*/
public class PeerClient {
@ -27,24 +28,21 @@ public class PeerClient {
private Logger logger = LoggerFactory.getLogger("wire");
private PeerListener peerListener;
private ProtocolHandler handler;
private P2pHandler handler;
public PeerClient() {
}
public PeerClient(PeerListener peerListener) {
this.peerListener = peerListener;
this.peerListener = peerListener;
}
public void connect(String host, int port) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
if (peerListener != null) {
if (peerListener != null)
peerListener.console("Connecting to: " + host + ":" + port);
handler = new ProtocolHandler(peerListener);
} else
handler = new ProtocolHandler();
try {
Bootstrap b = new Bootstrap();
@ -52,21 +50,26 @@ public class PeerClient {
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, new EthereumMessageSizeEstimator());
b.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONFIG.peerConnectionTimeout());
b.remoteAddress(host, port);
b.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
public void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(CONFIG.peerChannelReadTimeout(), TimeUnit.SECONDS));
ch.pipeline().addLast(new EthereumFrameDecoder());
ch.pipeline().addLast(handler);
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new PacketEncoder());
ch.pipeline().addLast(new P2pHandler(peerListener));
// limit the size of receiving buffer to 1024
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368));
ch.config().setOption(ChannelOption.SO_RCVBUF, 32368);
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
ChannelFuture f = b.connect().sync();
WorldManager.getInstance().setActivePeer(this);
// Wait until the connection is closed.
@ -76,11 +79,7 @@ public class PeerClient {
} catch (Exception e) {
logger.debug("Exception: {} ({})", e.getMessage(), e.getClass().getName());
} finally {
try {
workerGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
workerGroup.shutdownGracefully();
handler.killTimers();
@ -93,9 +92,6 @@ public class PeerClient {
peer.setOnline(false);
}
}
EthereumListener listener = WorldManager.getInstance().getListener();
if (listener != null) listener.onPeerDisconnect(host, port);
}
}
@ -103,7 +99,7 @@ public class PeerClient {
this.peerListener = peerListener;
}
public ProtocolHandler getHandler() {
public P2pHandler getHandler() {
return handler;
}
}

View File

@ -3,8 +3,12 @@ package org.ethereum.net.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@ -59,6 +63,10 @@ public class PeerDiscovery {
return started.get();
}
public Set<Peer> getPeers() {
return peers;
}
/**
* Update list of known peers with new peers
* This method checks for duplicate peer id's and addresses
@ -69,19 +77,38 @@ public class PeerDiscovery {
synchronized (peers) {
for (final Peer newPeer : newPeers) {
if(!peers.contains(newPeer))
addNewPeer(newPeer);
startWorker(newPeer);
peers.add(newPeer);
}
}
}
public void addNewPeer(Peer peer) {
private void startWorker(Peer peer) {
logger.debug("Add new peer for discovery: {}", peer);
executorPool.execute(new WorkerThread(peer, executorPool));
}
public Set<Peer> getPeers() {
return peers;
}
private Set<Peer> parsePeerDiscoveryIpList(final String peerDiscoveryIpList) {
final List<String> ipList = Arrays.asList(peerDiscoveryIpList.split(","));
final Set<Peer> peers = new HashSet<>();
for (String ip : ipList){
String[] addr = ip.trim().split(":");
String ip_trim = addr[0];
String port_trim = addr[1];
try {
InetAddress iAddr = InetAddress.getByName(ip_trim);
int port = Integer.parseInt(port_trim);
Peer peer = new Peer(iAddr, port, null);
peers.add(peer);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
return peers;
}
}

View File

@ -36,9 +36,7 @@ public class WorkerThread implements Runnable {
try {
clientPeer = new PeerClient();
clientPeer.connect(peer.getAddress().getHostAddress(), peer.getPort());
peer.setOnline(true);
peer.setHandshake(clientPeer.getHandler().getHandshake());
} catch (Throwable e) {
if (peer.isOnline())
logger.info("Peer: [{}] went offline, due to: [{}]", peer

View File

@ -0,0 +1,274 @@
package org.ethereum.net.handler;
import static org.ethereum.net.message.StaticMessages.GET_TRANSACTIONS_MESSAGE;
import static org.ethereum.config.SystemProperties.CONFIG;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.ethereum.core.Block;
import org.ethereum.core.BlockQueue;
import org.ethereum.core.Transaction;
import org.ethereum.facade.Blockchain;
import org.ethereum.listener.EthereumListener;
import org.ethereum.manager.WorldManager;
import org.ethereum.net.MessageQueue;
import org.ethereum.net.PeerListener;
import org.ethereum.net.message.BlockHashesMessage;
import org.ethereum.net.message.BlocksMessage;
import org.ethereum.net.message.DisconnectMessage;
import org.ethereum.net.message.GetBlockHashesMessage;
import org.ethereum.net.message.GetBlocksMessage;
import org.ethereum.net.message.Message;
import org.ethereum.net.message.ReasonCode;
import org.ethereum.net.message.StatusMessage;
import org.ethereum.net.message.TransactionsMessage;
import org.ethereum.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Process the messages between peers with 'eth' capability on the network.
*
* Peers with 'eth' capability can send/receive:
* <ul>
* <li>STATUS : Announce their status to the peer</li>
* <li>GET_TRANSACTIONS : Request a list of pending transactions</li>
* <li>TRANSACTIONS : Send a list of pending transactions</li>
* <li>GET_BLOCK_HASHES : Request a list of known block hashes</li>
* <li>BLOCK_HASHES : Send a list of known block hashes</li>
* <li>GET_BLOCKS : Request a list of blocks</li>
* <li>BLOCKS : Send a list of blocks</li>
* </ul>
*/
public class EthHandler extends SimpleChannelInboundHandler<Message> {
private final static Logger logger = LoggerFactory.getLogger("wire");
private Timer getBlocksTimer = new Timer("GetBlocksTimer");
private int secToAskForBlocks = 1;
private Blockchain blockchain;
private PeerListener peerListener;
private EthereumListener listener;
private MessageQueue msgQueue = null;
private final Timer timer = new Timer("MiscMessageTimer");
public EthHandler() {
this.listener = WorldManager.getInstance().getListener();
this.blockchain = WorldManager.getInstance().getBlockchain();
}
public EthHandler(PeerListener peerListener) {
this();
this.peerListener = peerListener;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException {
logger.trace("Read channel for {}", ctx.channel().remoteAddress());
msgQueue.receivedMessage(msg);
if (listener != null) listener.onRecvMessage(msg);
switch (msg.getCommand()) {
case STATUS:
processStatus((StatusMessage)msg, ctx);
break;
case TRANSACTIONS:
// List<Transaction> txList = transactionsMessage.getTransactions();
// for(Transaction tx : txList)
// WorldManager.getInstance().getBlockchain().applyTransaction(null,
// tx);
// WorldManager.getInstance().getWallet().addTransaction(tx);
break;
case BLOCKS:
processBlocks((BlocksMessage)msg);
break;
case GET_TRANSACTIONS:
sendPendingTransactions();
break;
case GET_BLOCK_HASHES:
sendBlockHashes();
break;
case BLOCK_HASHES:
processBlockHashes((BlockHashesMessage)msg);
break;
case GET_BLOCKS:
sendBlocks();
break;
default:
break;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error(cause.getCause().toString());
super.exceptionCaught(ctx, cause);
ctx.close();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
msgQueue = new MessageQueue(ctx, peerListener);
sendStatus();
}
/**
* Processing:
* <ul>
* <li>checking if peer is using the same genesis, protocol and network</li>
* <li>seeing if total difficulty is higher than total difficulty from all other peers</li>
* <li>send GET_BLOCK_HASHES to this peer based on bestHash</li>
* </ul>
*
* @param msg is the StatusMessage
* @param ctx the ChannelHandlerContext
*/
private void processStatus(StatusMessage msg, ChannelHandlerContext ctx) {
if (!Arrays.equals(msg.getGenesisHash(), Blockchain.GENESIS_HASH) || msg.getProtocolVersion() != 33)
ctx.pipeline().remove(this);
else if (msg.getNetworkId() != 0)
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_NETWORK));
else {
BlockQueue chainQueue = this.blockchain.getQueue();
BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty());
BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty();
// if (peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) {
// this.blockchain.getQueue().setHighestTotalDifficulty(peerTotalDifficulty);
// this.blockchain.getQueue().setBestHash(msg.getBestHash());
// sendGetBlockHashes(msg.getBestHash());
// }
startTimers();
}
}
private void processBlockHashes(BlockHashesMessage blockHashesMessage) {
// for each block hash
// check if blockHash != known hash
// store blockhash
// if no known hash has been reached, another getBlockHashes with last stored hash.
}
private void processBlocks(BlocksMessage blocksMessage) {
List<Block> blockList = blocksMessage.getBlocks();
// If we get one block from a peer we ask less greedy
if (blockList.size() <= 1 && secToAskForBlocks != 10) {
logger.info("Now we ask for blocks every 10 seconds");
updateGetBlocksTimer(10);
}
// If we get more blocks from a peer we ask more greedy
if (blockList.size() > 2 && secToAskForBlocks != 1) {
logger.info("Now we ask for a chain every 1 seconds");
updateGetBlocksTimer(1);
}
if (blockList.isEmpty()) return;
this.blockchain.getQueue().addBlocks(blockList);
}
private void sendStatus() {
byte protocolVersion = 33, networkId = 0;
BigInteger totalDifficulty = this.blockchain.getTotalDifficulty();
byte[] bestHash = this.blockchain.getLatestBlockHash();
StatusMessage msg = new StatusMessage(protocolVersion, networkId,
ByteUtil.bigIntegerToBytes(totalDifficulty), bestHash, Blockchain.GENESIS_HASH);
msgQueue.sendMessage(msg);
}
/*
* The wire gets data for signed transactions and
* sends it to the net.
*/
public void sendTransaction(Transaction transaction) {
Set<Transaction> txs = new HashSet<>(Arrays.asList(transaction));
TransactionsMessage msg = new TransactionsMessage(txs);
msgQueue.sendMessage(msg);
}
private void sendGetTransactions() {
msgQueue.sendMessage(GET_TRANSACTIONS_MESSAGE);
}
private void sendGetBlockHashes(byte[] bestHash) {
GetBlockHashesMessage msg = new GetBlockHashesMessage(bestHash, CONFIG.maxHashesAsk());
msgQueue.sendMessage(msg);
}
private void sendGetBlocks() {
if (WorldManager.getInstance().getBlockchain().getQueue().size() >
CONFIG.maxBlocksQueued()) return;
Block lastBlock = this.blockchain.getQueue().getLastBlock();
if (lastBlock == null) return;
// retrieve list of block hashes from queue
int blocksPerPeer = CONFIG.maxBlocksAsk();
List<byte[]> hashes = this.blockchain.getQueue().getHashes(blocksPerPeer);
GetBlocksMessage msg = new GetBlocksMessage(hashes);
msgQueue.sendMessage(msg);
}
private void sendPendingTransactions() {
Set<Transaction> pendingTxs =
WorldManager.getInstance().getPendingTransactions();
TransactionsMessage msg = new TransactionsMessage(pendingTxs);
msgQueue.sendMessage(msg);
}
private void sendBlocks() {
// TODO: Send blocks
}
private void sendBlockHashes() {
// TODO: Send block hashes
}
private void startTimers() {
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
sendGetTransactions();
}
}, 2000, 10000);
getBlocksTimer.scheduleAtFixedRate(new TimerTask() {
public void run() {
sendGetBlocks();
}
}, 1000, secToAskForBlocks * 1000);
}
private void updateGetBlocksTimer(int seconds) {
secToAskForBlocks = seconds;
getBlocksTimer.cancel();
getBlocksTimer.purge();
getBlocksTimer = new Timer();
getBlocksTimer.scheduleAtFixedRate(new TimerTask() {
public void run() {
sendGetBlocks();
}
}, 3000, secToAskForBlocks * 1000);
}
public void killTimers(){
getBlocksTimer.cancel();
getBlocksTimer.purge();
timer.cancel();
timer.purge();
}
}

View File

@ -0,0 +1,152 @@
package org.ethereum.net.handler;
import static org.ethereum.net.message.StaticMessages.PING_MESSAGE;
import static org.ethereum.net.message.StaticMessages.PONG_MESSAGE;
import static org.ethereum.net.message.StaticMessages.HELLO_MESSAGE;
import static org.ethereum.net.message.StaticMessages.GET_PEERS_MESSAGE;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Timer;
import java.util.TimerTask;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.ethereum.manager.WorldManager;
import org.ethereum.net.MessageQueue;
import org.ethereum.net.PeerListener;
import org.ethereum.net.client.Peer;
import org.ethereum.net.client.PeerDiscovery;
import org.ethereum.net.message.DisconnectMessage;
import org.ethereum.net.message.HelloMessage;
import org.ethereum.net.message.Message;
import org.ethereum.net.message.PeersMessage;
import org.ethereum.net.message.ReasonCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Process the basic protocol messages between every peer on the network.
*
* Peers can send/receive
* <ul>
* <li>HELLO : Announce themselves to the network</li>
* <li>DISCONNECT : Disconnect themselves from the network</li>
* <li>GET_PEERS : Request a list of other knows peers</li>
* <li>PEERS : Send a list of known peers</li>
* <li>PING : Check if another peer is still alive</li>
* <li>PONG : Confirm that they themselves are still alive</li>
* </ul>
*/
public class P2pHandler extends SimpleChannelInboundHandler<Message> {
private final static Logger logger = LoggerFactory.getLogger("wire");
private final Timer timer = new Timer("MessageTimer");
private PeerDiscovery peerDiscovery;
private PeerListener peerListener;
private MessageQueue msgQueue = null;
private boolean tearDown = false;
public P2pHandler() {
this.peerDiscovery = WorldManager.getInstance().getPeerDiscovery();
}
public P2pHandler(PeerListener peerListener) {
this();
this.peerListener = peerListener;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException {
logger.trace("Read channel for {}", ctx.channel().remoteAddress());
msgQueue.receivedMessage(msg);
switch (msg.getCommand()) {
case HELLO:
setHandshake((HelloMessage)msg, ctx);
break;
case DISCONNECT:
break;
case PING:
msgQueue.sendMessage(PONG_MESSAGE);
break;
case PONG:
break;
case GET_PEERS:
sendPeers();
break;
case PEERS:
processPeers((PeersMessage)msg);
break;
default:
break;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error(cause.getCause().toString());
super.exceptionCaught(ctx, cause);
ctx.close();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
msgQueue = new MessageQueue(ctx, peerListener);
// Send HELLO once when channel connection has been established
msgQueue.sendMessage(HELLO_MESSAGE);
}
private void processPeers(PeersMessage peersMessage) {
peerDiscovery.addPeers(peersMessage.getPeers());
}
private void sendPeers() {
PeersMessage msg = new PeersMessage(peerDiscovery.getPeers());
msgQueue.sendMessage(msg);
}
private void setHandshake(HelloMessage msg, ChannelHandlerContext ctx) {
if (msg.getP2PVersion() != 0)
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_PROTOCOL));
else {
if(msg.getCapabilities().contains("eth"))
ctx.pipeline().addLast(new EthHandler(peerListener));
InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
int port = msg.getListenPort();
byte[] peerId = msg.getPeerId();
Peer confirmedPeer = new Peer(address, port, peerId);
confirmedPeer.setOnline(true);
confirmedPeer.getCapabilities().addAll(msg.getCapabilities());
WorldManager.getInstance().getPeerDiscovery().getPeers().add(confirmedPeer);
startTimers();
}
}
private void startTimers() {
// sample for pinging in background
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
if (tearDown) cancel();
msgQueue.sendMessage(PING_MESSAGE);
}
}, 2000, 5000);
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
msgQueue.sendMessage(GET_PEERS_MESSAGE);
}
}, 2000, 6000);
}
public void killTimers(){
timer.cancel();
timer.purge();
}
}

View File

@ -1,61 +1,67 @@
package org.ethereum.net.client;
package org.ethereum.net.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.ethereum.net.message.MessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;
import java.util.List;
/**
* @author Roman Mandeleil
* Created on: 13/04/14 21:51
* The PacketDecoder parses every valid Ethereum packet to a Message object
*/
public class EthereumFrameDecoder extends ByteToMessageDecoder {
public class PacketDecoder extends ByteToMessageDecoder {
private Logger logger = LoggerFactory.getLogger("wire");
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (!isValidEthereumPacket(in)) {
return;
}
byte[] encoded = new byte[in.readInt()];
in.readBytes(encoded);
if (logger.isDebugEnabled())
logger.debug("Encoded: [{}]", Hex.toHexString(encoded));
out.add(MessageFactory.createMessage(encoded));
}
private boolean isValidEthereumPacket(ByteBuf in) {
// Ethereum message is at least 8 bytes
if (in.readableBytes() < 8)
return;
return false;
long magicBytes = in.readUnsignedInt();
long msgSize = in.readUnsignedInt();
long syncToken = in.readUnsignedInt();
if (!((magicBytes >> 24 & 0xFF) == 0x22 &&
(magicBytes >> 16 & 0xFF) == 0x40 &&
(magicBytes >> 8 & 0xFF) == 0x08 &&
(magicBytes & 0xFF) == 0x91 )) {
if (!((syncToken >> 24 & 0xFF) == 0x22 &&
(syncToken >> 16 & 0xFF) == 0x40 &&
(syncToken >> 8 & 0xFF) == 0x08 &&
(syncToken & 0xFF) == 0x91 )) {
// TODO: Drop frame and continue.
// A collision can happen (although rare)
// If this happens too often, it's an attack.
// In that case, drop the peer.
logger.error("abandon garbage, wrong magic bytes: [{}] msgSize: [{}]", magicBytes, msgSize);
ctx.close();
logger.error("Abandon garbage, wrong sync token: [{}]", syncToken);
}
// Don't have the full packet yet
long msgSize = in.getInt(in.readerIndex());
if (msgSize > in.readableBytes()) {
logger.trace("msg decode: magicBytes: [{}], readBytes: [{}] / msgSize: [{}] ",
magicBytes, in.readableBytes(), msgSize);
syncToken, in.readableBytes(), msgSize);
in.resetReaderIndex();
return;
return false;
}
logger.trace("message fully constructed go handle it: readBytes: [{}] / msgSize: [{}]",
in.readableBytes(), msgSize);
byte[] decoded = new byte[(int) msgSize];
in.readBytes(decoded);
out.add(decoded);
in.markReaderIndex();
}
logger.trace("Message fully constructed: readBytes: [{}] / msgSize: [{}]", in.readableBytes(), msgSize);
return true;
}
}

View File

@ -0,0 +1,32 @@
package org.ethereum.net.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.ethereum.net.message.Message;
import org.ethereum.net.message.StaticMessages;
import org.ethereum.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;
/**
* The PacketEncoder encodes the message and adds a sync token to every packet.
*/
public class PacketEncoder extends MessageToByteEncoder<Message> {
private Logger logger = LoggerFactory.getLogger("wire");
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
byte[] encoded = msg.getEncoded();
if (logger.isDebugEnabled())
logger.debug("Encoded: [{}]", Hex.toHexString(encoded));
out.capacity(encoded.length + 8);
out.writeBytes(StaticMessages.SYNC_TOKEN);
out.writeBytes(ByteUtil.calcPacketLength(encoded));
out.writeBytes(encoded);
}
}

View File

@ -9,11 +9,7 @@ import java.util.Map;
* The codes for these commands are the first byte in every packet.
*
* @see <a href="https://github.com/ethereum/wiki/wiki/Wire-Protocol">
* https://github.com/ethereum/wiki/wiki/Wire-Protocol</a><br/>
* <a href="https://github.com/ethereum/cpp-ethereum/wiki/%C3%90%CE%9EVP2P-Networking">
* https://github.com/ethereum/cpp-ethereum/wiki/ÐΞVP2P-Networking</a><br/>
* <a href="https://github.com/ethereum/cpp-ethereum/wiki/PoC-6-Network-Protocol">
* https://github.com/ethereum/cpp-ethereum/wiki/PoC-6-Network-Protocol</a>
* https://github.com/ethereum/wiki/wiki/Wire-Protocol</a>
*/
public enum Command {

View File

@ -18,8 +18,6 @@ import java.util.List;
* Wrapper around an Ethereum HelloMessage on the network
*
* @see {@link org.ethereum.net.message.Command#HELLO}
*
* @author Roman Mandeleil
*/
public class HelloMessage extends Message {

View File

@ -0,0 +1,46 @@
package org.ethereum.net.message;
import org.ethereum.util.RLP;
/**
* Factory to create protocol message objects based on the RLP encoded data
*/
public class MessageFactory {
public static Message createMessage(byte[] encoded) {
Command receivedCommand = Command.fromInt(RLP.getCommandCode(encoded));
switch (receivedCommand) {
case HELLO:
return new HelloMessage(encoded);
case DISCONNECT:
return new DisconnectMessage(encoded);
case PING:
return StaticMessages.PING_MESSAGE;
case PONG:
return StaticMessages.PONG_MESSAGE;
case GET_PEERS:
return StaticMessages.GET_PEERS_MESSAGE;
case PEERS:
return new PeersMessage(encoded);
case STATUS:
return new StatusMessage(encoded);
case TRANSACTIONS:
return new TransactionsMessage(encoded);
case BLOCKS:
return new BlocksMessage(encoded);
case GET_TRANSACTIONS:
return StaticMessages.GET_TRANSACTIONS_MESSAGE;
case GET_BLOCK_HASHES:
return new GetBlockHashesMessage(encoded);
case BLOCK_HASHES:
return new BlockHashesMessage(encoded);
case GET_BLOCKS:
return new GetBlocksMessage(encoded);
default:
throw new IllegalArgumentException("No such message");
}
}
}

View File

@ -27,12 +27,11 @@ public enum ReasonCode {
/** [0x05] Already have a running connection with this peer */
ALREADY_CONNECTED(0x05),
/** [0x06] Genesis is not the same as ours */
WRONG_GENESIS(0x06),
/** [0x06] Version of the p2p protocol is not the same as ours */
INCOMPATIBLE_PROTOCOL(0x06),
/** [0x07] Peer identifies itself with the
* wrong p2pVersion, protocolVersion or networkId */
INCOMPATIBLE_PROTOCOL(0x07),
/** [0x07] Peer identifies itself with the wrong networkId */
INCOMPATIBLE_NETWORK(0x07),
/** [0x08] Peer quit voluntarily */
PEER_QUITING(0x08),

View File

@ -29,6 +29,7 @@ public class StaticMessages {
String helloAnnouncement = buildHelloAnnouncement();
byte p2pVersion = 0x00;
List<String> capabilities = new ArrayList<>(Arrays.asList("eth"));
// List<String> capabilities = new ArrayList<>(Arrays.asList("eth"));
int listenPort = 30303;
byte[] peerIdBytes = HashUtil.randomPeerId();

View File

@ -37,7 +37,6 @@ public class StatusMessage extends Message {
this.bestHash = bestHash;
this.genesisHash = genesisHash;
this.parsed = true;
this.encode();
}
private void parse() {

View File

@ -33,7 +33,7 @@ public class TransactionTask implements Callable<Transaction> {
PeerClient peer = WorldManager.getInstance().getActivePeer();
WalletTransaction walletTransaction = WorldManager.getInstance()
.getWallet().addByWalletTransaction(tx);
peer.getHandler().sendTransaction(tx);
// peer.getHandler().sendTransaction(tx);
while (walletTransaction.getApproved() < 1) {
sleep(10);

View File

@ -38,13 +38,13 @@ public class DisconnectMessageTest {
@Test /* DisconnectMessage 2 - from constructor */
public void test_3() {
DisconnectMessage disconnectMessage = new DisconnectMessage(ReasonCode.INCOMPATIBLE_PROTOCOL);
DisconnectMessage disconnectMessage = new DisconnectMessage(ReasonCode.INCOMPATIBLE_NETWORK);
System.out.println(disconnectMessage);
String expected = "c20107";
assertEquals(expected, Hex.toHexString(disconnectMessage.getEncoded()));
assertEquals(ReasonCode.INCOMPATIBLE_PROTOCOL, disconnectMessage.getReason());
assertEquals(ReasonCode.INCOMPATIBLE_NETWORK, disconnectMessage.getReason());
}
}

View File

@ -331,7 +331,7 @@ class ContractCallDialog extends JDialog implements MessageAwareDialog {
Transaction tx = createTransaction();
if (tx == null) return;
Block lastBlock = UIEthereumManager.ethereum.getBlockChain().getLastBlock();
Block lastBlock = UIEthereumManager.ethereum.getBlockchain().getLastBlock();
ProgramPlayDialog.createAndShowGUI(programCode, tx, lastBlock);
}

View File

@ -101,7 +101,7 @@ class ContractSubmitDialog extends JDialog implements MessageAwareDialog {
}
contractAddrInput.setText(Hex.toHexString(tx.getContractAddress()));
Block lastBlock = UIEthereumManager.ethereum.getBlockChain().getLastBlock();
Block lastBlock = UIEthereumManager.ethereum.getBlockchain().getLastBlock();
ProgramPlayDialog.createAndShowGUI(tx.getData(), tx, lastBlock);
}}
);
@ -298,7 +298,7 @@ class ContractSubmitDialog extends JDialog implements MessageAwareDialog {
Account account = ((AccountWrapper)creatorAddressCombo.getSelectedItem()).getAccount();
BigInteger currentBalance = account.getBalance();
long currGasPrice = UIEthereumManager.ethereum.getBlockChain().getGasPrice();
long currGasPrice = UIEthereumManager.ethereum.getBlockchain().getGasPrice();
BigInteger gasPrice = BigInteger.valueOf(currGasPrice);
BigInteger gasInput = new BigInteger( this.gasInput.getText());

View File

@ -113,7 +113,7 @@ class PayOutDialog extends JDialog implements MessageAwareDialog {
byte[] senderPrivKey = account.getEcKey().getPrivKeyBytes();
byte[] nonce = PayOutDialog.this.account.getNonce() == BigInteger.ZERO ? null : PayOutDialog.this.account.getNonce().toByteArray();
byte[] gasPrice = BigInteger.valueOf(UIEthereumManager.ethereum.getBlockChain().getGasPrice()).toByteArray();
byte[] gasPrice = BigInteger.valueOf(UIEthereumManager.ethereum.getBlockchain().getGasPrice()).toByteArray();
Transaction tx = new Transaction(nonce, gasPrice, BigIntegers
.asUnsignedByteArray(fee), address, BigIntegers
@ -194,7 +194,7 @@ class PayOutDialog extends JDialog implements MessageAwareDialog {
// check if the tx is affordable
BigInteger ammountValue = new BigInteger(amountText);
BigInteger feeValue = new BigInteger(feeText);
BigInteger gasPrice = BigInteger.valueOf(UIEthereumManager.ethereum.getBlockChain().getGasPrice());
BigInteger gasPrice = BigInteger.valueOf(UIEthereumManager.ethereum.getBlockchain().getGasPrice());
BigInteger currentBalance = account.getBalance();
boolean canAfford = gasPrice.multiply(feeValue).add(ammountValue).compareTo(currentBalance) != 1;

View File

@ -11,7 +11,6 @@ import javax.swing.table.AbstractTableModel;
import org.ethereum.geo.IpGeoDB;
import org.ethereum.net.client.Peer;
import org.ethereum.net.message.HelloMessage;
import org.ethereum.util.Utils;
import com.maxmind.geoip.Location;
@ -118,7 +117,7 @@ public class PeersTableModel extends AbstractTableModel {
for (Peer peer : peers) {
InetAddress addr = peer.getAddress();
Location cr = IpGeoDB.getLocationForIp(addr);
peerInfoList.add(new PeerInfo(cr, addr, peer.isOnline(), peer.getHandshake(), peer.getLastCheckTime()));
peerInfoList.add(new PeerInfo(cr, addr, peer.isOnline(), peer.getLastCheckTime()));
}
}
}
@ -129,12 +128,11 @@ public class PeersTableModel extends AbstractTableModel {
Location location;
InetAddress ip;
boolean connected;
HelloMessage handshake;
long lastAccessed = 0;
private PeerInfo(Location location, InetAddress ip, boolean isConnected,
HelloMessage handshake, long lastAccessed) {
private PeerInfo(Location location, InetAddress ip,
boolean isConnected, long lastAccessed) {
if (location == null)
this.location = new Location();
@ -143,7 +141,6 @@ public class PeersTableModel extends AbstractTableModel {
this.ip = ip;
this.connected = isConnected;
this.handshake = handshake;
this.lastAccessed = lastAccessed;
}
@ -159,10 +156,6 @@ public class PeersTableModel extends AbstractTableModel {
return connected;
}
public HelloMessage getHandshake() {
return handshake;
}
public long getLastAccessed() {
return lastAccessed;
}