From 5b4cf4b44eb0a036c1b40405dce524d42ef65029 Mon Sep 17 00:00:00 2001 From: romanman Date: Wed, 21 May 2014 00:49:09 +0300 Subject: [PATCH] Working on peer discovery and sync mechanism --- .../ethereum/gui/ConnectionConsoleWindow.java | 4 +- .../java/org/ethereum/manager/MainData.java | 24 ++- .../org/ethereum/net/client/ClientPeer.java | 7 - .../net/client/EthereumPeerTasterHandler.java | 192 +++--------------- .../org/ethereum/net/client/PeerTaster.java | 56 +++-- .../ethereum/net/message/StaticMessages.java | 7 +- 6 files changed, 81 insertions(+), 209 deletions(-) diff --git a/ethereumj-core/src/main/java/org/ethereum/gui/ConnectionConsoleWindow.java b/ethereumj-core/src/main/java/org/ethereum/gui/ConnectionConsoleWindow.java index b20ec674..91672281 100644 --- a/ethereumj-core/src/main/java/org/ethereum/gui/ConnectionConsoleWindow.java +++ b/ethereumj-core/src/main/java/org/ethereum/gui/ConnectionConsoleWindow.java @@ -70,14 +70,14 @@ public class ConnectionConsoleWindow extends JFrame implements PeerListener{ public void run() { // new ClientPeer(thisConsole).connect("54.201.28.117", 30303); // peer discovery - new ClientPeer(thisConsole).connect("82.217.72.169", 30303); // Nick +// new ClientPeer(thisConsole).connect("82.217.72.169", 30303); // Nick // new ClientPeer(thisConsole).connect("54.204.10.41", 30303); // new ClientPeer(thisConsole).connect("54.211.14.10", 30303); // new ClientPeer(thisConsole).connect("54.204.10.41", 30303); // CPP: ZeroGox Poc5 -// new ClientPeer(thisConsole).connect("54.211.14.10", 30303); // CPP: ver16 + new ClientPeer(thisConsole).connect("54.211.14.10", 30303); // CPP: ver16 // new ClientPeer(thisConsole).connect("192.168.1.102", 30303); } diff --git a/ethereumj-core/src/main/java/org/ethereum/manager/MainData.java b/ethereumj-core/src/main/java/org/ethereum/manager/MainData.java index 233d3115..2215a990 100644 --- a/ethereumj-core/src/main/java/org/ethereum/manager/MainData.java +++ b/ethereumj-core/src/main/java/org/ethereum/manager/MainData.java @@ -27,7 +27,7 @@ import com.maxmind.geoip.Location; */ public class MainData { - private Set peers = Collections.synchronizedSet(new HashSet()); + private List peers = Collections.synchronizedList(new ArrayList()); private List blockChainDB = new ArrayList(); private Wallet wallet = new Wallet(); private ClientPeer activePeer; @@ -41,13 +41,19 @@ public class MainData { } public void addPeers(List newPeers){ - this.peers.addAll(newPeers); - for (PeerData peerData : this.peers){ - Location location = IpGeoDB.getLocationForIp(peerData.getInetAddress()); - if (location != null) - System.out.println("Hello: " + " [" + peerData.getInetAddress().toString() - + "] " + location.countryName); + + for (PeerData peer : newPeers){ + if (this.peers.indexOf(peer) == -1){ + this.peers.add(peer); + } } + +// for (PeerData peerData : this.peers){ +// Location location = IpGeoDB.getLocationForIp(peerData.getInetAddress()); +// if (location != null) +// System.out.println("Hello: " + " [" + peerData.getInetAddress().toString() +// + "] " + location.countryName); +// } } public void addBlocks(List blocks) { @@ -109,4 +115,8 @@ public class MainData { } public void addTransactions(List transactions) {} + + public List getPeers() { + return peers; + } } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/ClientPeer.java b/ethereumj-core/src/main/java/org/ethereum/net/client/ClientPeer.java index 1d6ade64..84c4ef56 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/ClientPeer.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/ClientPeer.java @@ -64,13 +64,6 @@ public class ClientPeer { }); - // todo: redesign that part , - // todo: 1) the method should finish. - // todo: 2) the peer should be saved as active peer in maindata - // todo: 3) b.connect(host, port).sync().channel(); // get the channel - // todo: 4) b.connect(host, port).sync().channel().writeAndFlush(); // use channel to send message - - // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) this.channel = f.channel(); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/EthereumPeerTasterHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/client/EthereumPeerTasterHandler.java index c4cd6870..606d7969 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/EthereumPeerTasterHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/EthereumPeerTasterHandler.java @@ -1,35 +1,27 @@ package org.ethereum.net.client; -import static org.ethereum.net.Command.*; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; - -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - import org.ethereum.core.Block; import org.ethereum.gui.PeerListener; import org.ethereum.manager.MainData; import org.ethereum.net.Command; -import org.ethereum.net.message.BlocksMessage; -import org.ethereum.net.message.DisconnectMessage; -import org.ethereum.net.message.GetChainMessage; -import org.ethereum.net.message.HelloMessage; -import org.ethereum.net.message.Message; -import org.ethereum.net.message.NotInChainMessage; -import org.ethereum.net.message.PeersMessage; -import org.ethereum.net.message.StaticMessages; -import org.ethereum.net.message.TransactionsMessage; +import org.ethereum.net.message.*; import org.ethereum.util.ByteUtil; import org.ethereum.util.RLP; import org.ethereum.util.RLPList; import org.ethereum.util.Utils; import org.spongycastle.util.encoders.Hex; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import static org.ethereum.net.Command.*; + /** * www.ethereumJ.com * User: Roman Mandeleil @@ -37,10 +29,7 @@ import org.spongycastle.util.encoders.Hex; */ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { - Timer chainAskTimer = new Timer(); - int secToAskForChain = 1; - - final Timer timer = new Timer(); + Timer timer = null; private final static byte[] MAGIC_PREFIX = {(byte)0x22, (byte)0x40, (byte)0x08, (byte)0x91}; private final static byte[] HELLO_MESSAGE = StaticMessages.HELLO_MESSAGE.getPayload(); @@ -59,10 +48,9 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { - // TODO: send hello - // TODO: send ping schedule another ping - // TODO: ByteBuf vs Stream vs new byte ??? + final ByteBuf buffer = ctx.alloc().buffer(HELLO_MESSAGE.length + 8); + timer = new Timer(); buffer.writeBytes(MAGIC_PREFIX); buffer.writeBytes(HELLO_MESSAGE_LEN); @@ -90,29 +78,6 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { } }, 2000, 5000); - timer.scheduleAtFixedRate(new TimerTask() { - - public void run() { - System.out.println("[Send: GET_PEERS]"); - sendGetPeers(ctx); - } - }, 2000, 60000); - - timer.scheduleAtFixedRate(new TimerTask() { - - public void run() { - System.out.println("[Send: GET_TRANSACTIONS]"); - sendGetTransactions(ctx); - } - }, 2000, 30000); - - chainAskTimer.schedule(new TimerTask() { - - public void run() { - System.out.println("[Send: GET_CHAIN]"); - sendGetChain(ctx); - } - }, 3000, secToAskForChain * 1000); } @@ -124,6 +89,7 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { Utils.printHexStringForByteArray(payload); byte command = RLP.getCommandCode(payload); + // got HELLO if (Command.fromInt(command) == HELLO) { System.out.println("[Recv: HELLO]" ); @@ -131,7 +97,8 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { HelloMessage helloMessage = new HelloMessage(rlpList); System.out.println(helloMessage.toString()); - if (peerListener != null) peerListener.console(helloMessage.toString()); + + sendGetPeers(ctx); } // got DISCONNECT if (Command.fromInt(command) == DISCONNECT) { @@ -142,42 +109,22 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { DisconnectMessage disconnectMessage = new DisconnectMessage(rlpList); System.out.println(disconnectMessage); - if (peerListener != null) peerListener.console(disconnectMessage.toString()); } + // got PING send pong if (Command.fromInt(command) == PING) { System.out.println("[Recv: PING]"); if (peerListener != null) peerListener.console("[Recv: PING]"); sendPong(ctx); } + // got PONG mark it if (Command.fromInt(command) == PONG) { System.out.println("[Recv: PONG]" ); if (peerListener != null) peerListener.console("[Recv: PONG]"); this.lastPongTime = System.currentTimeMillis(); } - // got GETPEERS send peers - if (Command.fromInt(command) == GET_PEERS) { - System.out.println("[Recv: GETPEERS]" ); - if (peerListener != null) peerListener.console("[Recv: GETPEERS]"); - String answer = "2240089100000134F9013111F84A8456084B1482765FB84072FD5DBC7F458FB0A52354E25234CEA90A51EA09858A21406056D9B9E0826BB153527E4C4CBEC53B46B0245E6E8503EEABDBF0F1789D7C5C78BBF2B1FDD9090CF84A8455417E2D82765FB840CE73F1F1F1F16C1B3FDA7B18EF7BA3CE17B6F1F1F1F141D3C6C654B7AE88B239407FF1F1F1F119025D785727ED017B6ADD21F1F1F1F1000001E321DBC31824BAF84A8436C91C7582765FB840D592C570B5082D357C30E61E3D8F26317BFD7A3A2A00A36CFB7254FEE80830F26DDFBD6A99712552F3D77314DB4AB58B9989F25699C4997A0F62489D4B86CB4DF84A8436CC0A2982765FB840E34C6E3EAC28CFD3DC930A5AEFD9552FEBCD72C33DFC74D8E4C7CF8A7BA71AE53316ADDBD241EB051ED0871C2B62825E66A45DC6A0E752A7F1C22ABEF9ABDE32"; - byte[] answerBytes = Hex.decode(answer); - - ByteBuf buffer = ctx.alloc().buffer(answerBytes.length); - buffer.writeBytes(answerBytes); - ctx.writeAndFlush(buffer); - - // send getpeers - answer = "22 40 08 91 00 00 00 02 C1 10 "; - answerBytes = Hex.decode(answer); - buffer = ctx.alloc().buffer(answerBytes.length); - - answerBytes = Utils.hexStringToByteArr(answer); - buffer = ctx.alloc().buffer(answerBytes.length); - buffer.writeBytes(answerBytes); - ctx.writeAndFlush(buffer); - } // got PEERS if (Command.fromInt(command) == PEERS) { System.out.println("[Recv: PEERS]"); @@ -187,102 +134,18 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { PeersMessage peersMessage = new PeersMessage(rlpList); MainData.instance.addPeers(peersMessage.getPeers()); - System.out.println(peersMessage); - if (peerListener != null) peerListener.console(peersMessage.toString()); - } - // got TRANSACTIONS - if (Command.fromInt(command) == TRANSACTIONS) { - System.out.println("Recv: TRANSACTIONS]"); - if (peerListener != null) peerListener.console("Recv: TRANSACTIONS]"); - RLPList rlpList = RLP.decode2(payload); - TransactionsMessage transactionsMessage = new TransactionsMessage(rlpList); - MainData.instance.addTransactions(transactionsMessage.getTransactions()); + sendDisconnectNice(ctx); - // todo: if you got transactions send it to your peers - System.out.println(transactionsMessage); - if (peerListener != null) peerListener.console(transactionsMessage.toString()); - } - // got BLOCKS - if (Command.fromInt(command) == BLOCKS) { - System.out.println("[Recv: BLOCKS]"); - if (peerListener != null) peerListener.console("[Recv: BLOCKS]"); + timer.cancel(); + timer.purge(); + timer = null; - RLPList rlpList = RLP.decode2(payload); + ctx.close().sync(); + ctx.disconnect().sync(); - BlocksMessage blocksMessage = new BlocksMessage(rlpList); - List blockList = blocksMessage.getBlockDataList(); - // If we get one block from a peer - // we ask less swinish - if (blockList.size() <= 1 && secToAskForChain != 10){ - - System.out.println("Now we ask for a chain each 10 seconds"); - secToAskForChain = 10; - - chainAskTimer.cancel(); - chainAskTimer.purge(); - chainAskTimer = new Timer(); - chainAskTimer.schedule(new TimerTask() { - - public void run() { - System.out.println("[Send: GET_CHAIN]"); - sendGetChain(ctx); - } - }, 3000, secToAskForChain * 1000); - } - - // If we get more blocks from a peer - // we ask more often - if (blockList.size() > 2 && secToAskForChain != 1){ - - System.out.println("Now we ask for a chain each 1 seconds"); - secToAskForChain = 11; - - chainAskTimer.cancel(); - chainAskTimer.purge(); - chainAskTimer = new Timer(); - chainAskTimer.schedule(new TimerTask() { - - public void run() { - System.out.println("[Send: GET_CHAIN]"); - sendGetChain(ctx); - } - }, 3000, secToAskForChain * 1000); - } - - MainData.instance.addBlocks(blockList); - System.out.println(blocksMessage); - if (peerListener != null) peerListener.console(blocksMessage.toString()); - } - // got GETCHAIN - if (Command.fromInt(command) == GET_CHAIN) { - System.out.println("[Recv: GET_CHAIN]"); - if (peerListener != null) peerListener.console("[Recv: GET_CHAIN]"); - - RLPList rlpList = RLP.decode2(payload); - GetChainMessage getChainMessage = new GetChainMessage(rlpList); - - System.out.println(getChainMessage); - if (peerListener != null) peerListener.console(getChainMessage.toString()); - } - // got NOTINCHAIN - if (Command.fromInt(command) == NOT_IN_CHAIN) { - System.out.println("[Recv: NOT_IN_CHAIN]"); - if (peerListener != null) peerListener.console("[Recv: NOT_IN_CHAIN]"); - - RLPList rlpList = RLP.decode2(payload); - NotInChainMessage notInChainMessage = new NotInChainMessage(rlpList); - - System.out.println(notInChainMessage); - if (peerListener != null) peerListener.console(notInChainMessage.toString()); - } - // got GETTRANSACTIONS - if (Command.fromInt(command) == GET_TRANSACTIONS) { - System.out.println("[Recv: GET_TRANSACTIONS]"); - if (peerListener != null) peerListener.console("[Recv: GET_TRANSACTIONS]"); - // todo: send the queue of the transactions } } @@ -298,8 +161,12 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { this.tearDown = true; System.out.println("Lost connection to the server"); cause.printStackTrace(); - ctx.close().sync(); timer.cancel(); + timer.purge(); + timer = null; + + ctx.close().sync(); + ctx.disconnect().sync(); } private void sendMsg(Message msg, ChannelHandlerContext ctx){ @@ -325,6 +192,13 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter { ctx.writeAndFlush(buffer); } + private void sendDisconnectNice(ChannelHandlerContext ctx){ + System.out.println("[Send: DISCONNECT]"); + ByteBuf buffer = ctx.alloc().buffer(StaticMessages.DISCONNECT_00.length); + buffer.writeBytes(StaticMessages.DISCONNECT_00); + ctx.writeAndFlush(buffer); + } + private void sendGetPeers(ChannelHandlerContext ctx){ ByteBuf buffer = ctx.alloc().buffer(StaticMessages.GET_PEERS.length); buffer.writeBytes(StaticMessages.GET_PEERS); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerTaster.java b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerTaster.java index 3092bee2..94131444 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerTaster.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerTaster.java @@ -43,15 +43,7 @@ public class PeerTaster { b.group(workerGroup); b.channel(NioSocketChannel.class); - b.option(ChannelOption.SO_KEEPALIVE, true); - - final EthereumProtocolHandler handler; - if (peerListener != null){ - handler = new EthereumProtocolHandler(peerListener); - peerListener.console("connecting to: " + host + ":" + port); - } - else - handler = new EthereumProtocolHandler(); + final EthereumPeerTasterHandler handler = new EthereumPeerTasterHandler(); b.handler(new ChannelInitializer() { @Override @@ -65,9 +57,8 @@ public class PeerTaster { // Start the client. + b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5); ChannelFuture f = b.connect(host, port).sync(); // (5) - this.channel = f.channel(); - MainData.instance.setActivePeer(this); // Wait until the connection is closed. f.channel().closeFuture().sync(); @@ -76,34 +67,37 @@ public class PeerTaster { System.out.println("-- ClientPeer: catch (InterruptedException ie) --"); ie.printStackTrace(); } finally { - workerGroup.shutdownGracefully(); + try { + workerGroup.shutdownGracefully().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } + + System.out.println("I am dead"); } - /* - * The wire gets data for signed transactions and - * sends it to the net. - * todo: find a way to set all "send to wire methods" in one place. - */ - public void sendTransaction(Transaction transaction){ - transaction.getEncoded(); - java.util.List txList = new ArrayList(); - txList.add(transaction); - TransactionsMessage transactionsMessage = new TransactionsMessage(txList); + public static void main(String args[]){ - byte[] payload = transactionsMessage.getPayload(); + PeerTaster peerTaster = new PeerTaster(); - ByteBuf buffer = channel.alloc().buffer(payload.length + 8); - buffer.writeBytes(StaticMessages.MAGIC_PACKET); - buffer.writeBytes(Utils.calcPacketSize(payload)); - buffer.writeBytes(payload); - System.out.println("Send msg: [ " + - Hex.toHexString(payload) + - " ]"); + try {peerTaster.connect("54.211.14.10", 30303);} catch (Exception e) {e.printStackTrace();} + try {peerTaster.connect("82.217.72.169", 30303);} catch (Exception e) {e.printStackTrace();} + try {peerTaster.connect("54.201.28.117", 30303);} catch (Exception e) {e.printStackTrace();} + try {peerTaster.connect("54.2.10.41", 30303);} catch (Exception e) {e.printStackTrace();} + try {peerTaster.connect("0.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();} + try {peerTaster.connect("54.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();} + + System.out.println("End of the roaad"); + + for (PeerData peer : MainData.instance.getPeers()){ + + System.out.println(peer.getInetAddress().getHostAddress().toString()); + }; + - channel.writeAndFlush(buffer); } } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java b/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java index d3e625b5..adec97bb 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java @@ -26,17 +26,18 @@ public class StaticMessages { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x02, (byte)0xC1, (byte)0x16 }; + + public static final byte[] DISCONNECT_00 = Hex.decode("2240089100000003C20100"); public static final byte[] GET_CHAIN = Hex.decode("2240089100000027F82514A069A7356A245F9DC5B865475ADA5EE4E89B18F93C06503A9DB3B3630E88E9FB4E820100"); - public static final byte[] GENESIS_HASH = Hex.decode("c305511e7cb9b33767e50f5e94ecd7b1c51359a04f45183860ec6808d80b0d3f"); - public static final byte[] MAGIC_PACKET = Hex.decode("22400891"); static { byte[] peerIdBytes = HashUtil.randomPeerId(); - HELLO_MESSAGE = new HelloMessage((byte)0x10, (byte)0x00, "EthereumJ [v0.5.1]", + // Hey Nick I like it that way ;) + HELLO_MESSAGE = new HelloMessage((byte)0x10, (byte)0x00, "EthereumJ [v0.5.1] by RomanJ ", (byte)0b00000111, (short)30303, peerIdBytes); }