Working on peer discovery and sync mechanism

This commit is contained in:
romanman 2014-05-21 00:49:09 +03:00
parent 17aff88a4b
commit 5b4cf4b44e
6 changed files with 81 additions and 209 deletions

View File

@ -70,14 +70,14 @@ public class ConnectionConsoleWindow extends JFrame implements PeerListener{
public void run() {
// new ClientPeer(thisConsole).connect("54.201.28.117", 30303); // peer discovery
new ClientPeer(thisConsole).connect("82.217.72.169", 30303); // Nick
// new ClientPeer(thisConsole).connect("82.217.72.169", 30303); // Nick
// new ClientPeer(thisConsole).connect("54.204.10.41", 30303);
// new ClientPeer(thisConsole).connect("54.211.14.10", 30303);
// new ClientPeer(thisConsole).connect("54.204.10.41", 30303); // CPP: ZeroGox Poc5
// new ClientPeer(thisConsole).connect("54.211.14.10", 30303); // CPP: ver16
new ClientPeer(thisConsole).connect("54.211.14.10", 30303); // CPP: ver16
// new ClientPeer(thisConsole).connect("192.168.1.102", 30303);
}

View File

@ -27,7 +27,7 @@ import com.maxmind.geoip.Location;
*/
public class MainData {
private Set<PeerData> peers = Collections.synchronizedSet(new HashSet<PeerData>());
private List<PeerData> peers = Collections.synchronizedList(new ArrayList<PeerData>());
private List<Block> blockChainDB = new ArrayList<Block>();
private Wallet wallet = new Wallet();
private ClientPeer activePeer;
@ -41,13 +41,19 @@ public class MainData {
}
public void addPeers(List<PeerData> newPeers){
this.peers.addAll(newPeers);
for (PeerData peerData : this.peers){
Location location = IpGeoDB.getLocationForIp(peerData.getInetAddress());
if (location != null)
System.out.println("Hello: " + " [" + peerData.getInetAddress().toString()
+ "] " + location.countryName);
for (PeerData peer : newPeers){
if (this.peers.indexOf(peer) == -1){
this.peers.add(peer);
}
}
// for (PeerData peerData : this.peers){
// Location location = IpGeoDB.getLocationForIp(peerData.getInetAddress());
// if (location != null)
// System.out.println("Hello: " + " [" + peerData.getInetAddress().toString()
// + "] " + location.countryName);
// }
}
public void addBlocks(List<Block> blocks) {
@ -109,4 +115,8 @@ public class MainData {
}
public void addTransactions(List<Transaction> transactions) {}
public List<PeerData> getPeers() {
return peers;
}
}

View File

@ -64,13 +64,6 @@ public class ClientPeer {
});
// todo: redesign that part ,
// todo: 1) the method should finish.
// todo: 2) the peer should be saved as active peer in maindata
// todo: 3) b.connect(host, port).sync().channel(); // get the channel
// todo: 4) b.connect(host, port).sync().channel().writeAndFlush(); // use channel to send message
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
this.channel = f.channel();

View File

@ -1,35 +1,27 @@
package org.ethereum.net.client;
import static org.ethereum.net.Command.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.ethereum.core.Block;
import org.ethereum.gui.PeerListener;
import org.ethereum.manager.MainData;
import org.ethereum.net.Command;
import org.ethereum.net.message.BlocksMessage;
import org.ethereum.net.message.DisconnectMessage;
import org.ethereum.net.message.GetChainMessage;
import org.ethereum.net.message.HelloMessage;
import org.ethereum.net.message.Message;
import org.ethereum.net.message.NotInChainMessage;
import org.ethereum.net.message.PeersMessage;
import org.ethereum.net.message.StaticMessages;
import org.ethereum.net.message.TransactionsMessage;
import org.ethereum.net.message.*;
import org.ethereum.util.ByteUtil;
import org.ethereum.util.RLP;
import org.ethereum.util.RLPList;
import org.ethereum.util.Utils;
import org.spongycastle.util.encoders.Hex;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import static org.ethereum.net.Command.*;
/**
* www.ethereumJ.com
* User: Roman Mandeleil
@ -37,10 +29,7 @@ import org.spongycastle.util.encoders.Hex;
*/
public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
Timer chainAskTimer = new Timer();
int secToAskForChain = 1;
final Timer timer = new Timer();
Timer timer = null;
private final static byte[] MAGIC_PREFIX = {(byte)0x22, (byte)0x40, (byte)0x08, (byte)0x91};
private final static byte[] HELLO_MESSAGE = StaticMessages.HELLO_MESSAGE.getPayload();
@ -59,10 +48,9 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
// TODO: send hello
// TODO: send ping schedule another ping
// TODO: ByteBuf vs Stream vs new byte ???
final ByteBuf buffer = ctx.alloc().buffer(HELLO_MESSAGE.length + 8);
timer = new Timer();
buffer.writeBytes(MAGIC_PREFIX);
buffer.writeBytes(HELLO_MESSAGE_LEN);
@ -90,29 +78,6 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
}
}, 2000, 5000);
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
System.out.println("[Send: GET_PEERS]");
sendGetPeers(ctx);
}
}, 2000, 60000);
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
System.out.println("[Send: GET_TRANSACTIONS]");
sendGetTransactions(ctx);
}
}, 2000, 30000);
chainAskTimer.schedule(new TimerTask() {
public void run() {
System.out.println("[Send: GET_CHAIN]");
sendGetChain(ctx);
}
}, 3000, secToAskForChain * 1000);
}
@ -124,6 +89,7 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
Utils.printHexStringForByteArray(payload);
byte command = RLP.getCommandCode(payload);
// got HELLO
if (Command.fromInt(command) == HELLO) {
System.out.println("[Recv: HELLO]" );
@ -131,7 +97,8 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
HelloMessage helloMessage = new HelloMessage(rlpList);
System.out.println(helloMessage.toString());
if (peerListener != null) peerListener.console(helloMessage.toString());
sendGetPeers(ctx);
}
// got DISCONNECT
if (Command.fromInt(command) == DISCONNECT) {
@ -142,42 +109,22 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
DisconnectMessage disconnectMessage = new DisconnectMessage(rlpList);
System.out.println(disconnectMessage);
if (peerListener != null) peerListener.console(disconnectMessage.toString());
}
// got PING send pong
if (Command.fromInt(command) == PING) {
System.out.println("[Recv: PING]");
if (peerListener != null) peerListener.console("[Recv: PING]");
sendPong(ctx);
}
// got PONG mark it
if (Command.fromInt(command) == PONG) {
System.out.println("[Recv: PONG]" );
if (peerListener != null) peerListener.console("[Recv: PONG]");
this.lastPongTime = System.currentTimeMillis();
}
// got GETPEERS send peers
if (Command.fromInt(command) == GET_PEERS) {
System.out.println("[Recv: GETPEERS]" );
if (peerListener != null) peerListener.console("[Recv: GETPEERS]");
String answer = "2240089100000134F9013111F84A8456084B1482765FB84072FD5DBC7F458FB0A52354E25234CEA90A51EA09858A21406056D9B9E0826BB153527E4C4CBEC53B46B0245E6E8503EEABDBF0F1789D7C5C78BBF2B1FDD9090CF84A8455417E2D82765FB840CE73F1F1F1F16C1B3FDA7B18EF7BA3CE17B6F1F1F1F141D3C6C654B7AE88B239407FF1F1F1F119025D785727ED017B6ADD21F1F1F1F1000001E321DBC31824BAF84A8436C91C7582765FB840D592C570B5082D357C30E61E3D8F26317BFD7A3A2A00A36CFB7254FEE80830F26DDFBD6A99712552F3D77314DB4AB58B9989F25699C4997A0F62489D4B86CB4DF84A8436CC0A2982765FB840E34C6E3EAC28CFD3DC930A5AEFD9552FEBCD72C33DFC74D8E4C7CF8A7BA71AE53316ADDBD241EB051ED0871C2B62825E66A45DC6A0E752A7F1C22ABEF9ABDE32";
byte[] answerBytes = Hex.decode(answer);
ByteBuf buffer = ctx.alloc().buffer(answerBytes.length);
buffer.writeBytes(answerBytes);
ctx.writeAndFlush(buffer);
// send getpeers
answer = "22 40 08 91 00 00 00 02 C1 10 ";
answerBytes = Hex.decode(answer);
buffer = ctx.alloc().buffer(answerBytes.length);
answerBytes = Utils.hexStringToByteArr(answer);
buffer = ctx.alloc().buffer(answerBytes.length);
buffer.writeBytes(answerBytes);
ctx.writeAndFlush(buffer);
}
// got PEERS
if (Command.fromInt(command) == PEERS) {
System.out.println("[Recv: PEERS]");
@ -187,102 +134,18 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
PeersMessage peersMessage = new PeersMessage(rlpList);
MainData.instance.addPeers(peersMessage.getPeers());
System.out.println(peersMessage);
if (peerListener != null) peerListener.console(peersMessage.toString());
}
// got TRANSACTIONS
if (Command.fromInt(command) == TRANSACTIONS) {
System.out.println("Recv: TRANSACTIONS]");
if (peerListener != null) peerListener.console("Recv: TRANSACTIONS]");
RLPList rlpList = RLP.decode2(payload);
TransactionsMessage transactionsMessage = new TransactionsMessage(rlpList);
MainData.instance.addTransactions(transactionsMessage.getTransactions());
sendDisconnectNice(ctx);
// todo: if you got transactions send it to your peers
System.out.println(transactionsMessage);
if (peerListener != null) peerListener.console(transactionsMessage.toString());
}
// got BLOCKS
if (Command.fromInt(command) == BLOCKS) {
System.out.println("[Recv: BLOCKS]");
if (peerListener != null) peerListener.console("[Recv: BLOCKS]");
timer.cancel();
timer.purge();
timer = null;
RLPList rlpList = RLP.decode2(payload);
ctx.close().sync();
ctx.disconnect().sync();
BlocksMessage blocksMessage = new BlocksMessage(rlpList);
List<Block> blockList = blocksMessage.getBlockDataList();
// If we get one block from a peer
// we ask less swinish
if (blockList.size() <= 1 && secToAskForChain != 10){
System.out.println("Now we ask for a chain each 10 seconds");
secToAskForChain = 10;
chainAskTimer.cancel();
chainAskTimer.purge();
chainAskTimer = new Timer();
chainAskTimer.schedule(new TimerTask() {
public void run() {
System.out.println("[Send: GET_CHAIN]");
sendGetChain(ctx);
}
}, 3000, secToAskForChain * 1000);
}
// If we get more blocks from a peer
// we ask more often
if (blockList.size() > 2 && secToAskForChain != 1){
System.out.println("Now we ask for a chain each 1 seconds");
secToAskForChain = 11;
chainAskTimer.cancel();
chainAskTimer.purge();
chainAskTimer = new Timer();
chainAskTimer.schedule(new TimerTask() {
public void run() {
System.out.println("[Send: GET_CHAIN]");
sendGetChain(ctx);
}
}, 3000, secToAskForChain * 1000);
}
MainData.instance.addBlocks(blockList);
System.out.println(blocksMessage);
if (peerListener != null) peerListener.console(blocksMessage.toString());
}
// got GETCHAIN
if (Command.fromInt(command) == GET_CHAIN) {
System.out.println("[Recv: GET_CHAIN]");
if (peerListener != null) peerListener.console("[Recv: GET_CHAIN]");
RLPList rlpList = RLP.decode2(payload);
GetChainMessage getChainMessage = new GetChainMessage(rlpList);
System.out.println(getChainMessage);
if (peerListener != null) peerListener.console(getChainMessage.toString());
}
// got NOTINCHAIN
if (Command.fromInt(command) == NOT_IN_CHAIN) {
System.out.println("[Recv: NOT_IN_CHAIN]");
if (peerListener != null) peerListener.console("[Recv: NOT_IN_CHAIN]");
RLPList rlpList = RLP.decode2(payload);
NotInChainMessage notInChainMessage = new NotInChainMessage(rlpList);
System.out.println(notInChainMessage);
if (peerListener != null) peerListener.console(notInChainMessage.toString());
}
// got GETTRANSACTIONS
if (Command.fromInt(command) == GET_TRANSACTIONS) {
System.out.println("[Recv: GET_TRANSACTIONS]");
if (peerListener != null) peerListener.console("[Recv: GET_TRANSACTIONS]");
// todo: send the queue of the transactions
}
}
@ -298,8 +161,12 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
this.tearDown = true;
System.out.println("Lost connection to the server");
cause.printStackTrace();
ctx.close().sync();
timer.cancel();
timer.purge();
timer = null;
ctx.close().sync();
ctx.disconnect().sync();
}
private void sendMsg(Message msg, ChannelHandlerContext ctx){
@ -325,6 +192,13 @@ public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
ctx.writeAndFlush(buffer);
}
private void sendDisconnectNice(ChannelHandlerContext ctx){
System.out.println("[Send: DISCONNECT]");
ByteBuf buffer = ctx.alloc().buffer(StaticMessages.DISCONNECT_00.length);
buffer.writeBytes(StaticMessages.DISCONNECT_00);
ctx.writeAndFlush(buffer);
}
private void sendGetPeers(ChannelHandlerContext ctx){
ByteBuf buffer = ctx.alloc().buffer(StaticMessages.GET_PEERS.length);
buffer.writeBytes(StaticMessages.GET_PEERS);

View File

@ -43,15 +43,7 @@ public class PeerTaster {
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
final EthereumProtocolHandler handler;
if (peerListener != null){
handler = new EthereumProtocolHandler(peerListener);
peerListener.console("connecting to: " + host + ":" + port);
}
else
handler = new EthereumProtocolHandler();
final EthereumPeerTasterHandler handler = new EthereumPeerTasterHandler();
b.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
@ -65,9 +57,8 @@ public class PeerTaster {
// Start the client.
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5);
ChannelFuture f = b.connect(host, port).sync(); // (5)
this.channel = f.channel();
MainData.instance.setActivePeer(this);
// Wait until the connection is closed.
f.channel().closeFuture().sync();
@ -76,34 +67,37 @@ public class PeerTaster {
System.out.println("-- ClientPeer: catch (InterruptedException ie) --");
ie.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
try {
workerGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("I am dead");
}
/*
* The wire gets data for signed transactions and
* sends it to the net.
* todo: find a way to set all "send to wire methods" in one place.
*/
public void sendTransaction(Transaction transaction){
transaction.getEncoded();
java.util.List<Transaction> txList = new ArrayList<Transaction>();
txList.add(transaction);
TransactionsMessage transactionsMessage = new TransactionsMessage(txList);
public static void main(String args[]){
byte[] payload = transactionsMessage.getPayload();
PeerTaster peerTaster = new PeerTaster();
ByteBuf buffer = channel.alloc().buffer(payload.length + 8);
buffer.writeBytes(StaticMessages.MAGIC_PACKET);
buffer.writeBytes(Utils.calcPacketSize(payload));
buffer.writeBytes(payload);
System.out.println("Send msg: [ " +
Hex.toHexString(payload) +
" ]");
try {peerTaster.connect("54.211.14.10", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("82.217.72.169", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.201.28.117", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.2.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("0.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
try {peerTaster.connect("54.204.10.41", 30303);} catch (Exception e) {e.printStackTrace();}
System.out.println("End of the roaad");
for (PeerData peer : MainData.instance.getPeers()){
System.out.println(peer.getInetAddress().getHostAddress().toString());
};
channel.writeAndFlush(buffer);
}
}

View File

@ -26,17 +26,18 @@ public class StaticMessages {
(byte)0x00, (byte)0x00, (byte)0x00, (byte)0x02,
(byte)0xC1, (byte)0x16 };
public static final byte[] DISCONNECT_00 = Hex.decode("2240089100000003C20100");
public static final byte[] GET_CHAIN = Hex.decode("2240089100000027F82514A069A7356A245F9DC5B865475ADA5EE4E89B18F93C06503A9DB3B3630E88E9FB4E820100");
public static final byte[] GENESIS_HASH = Hex.decode("c305511e7cb9b33767e50f5e94ecd7b1c51359a04f45183860ec6808d80b0d3f");
public static final byte[] MAGIC_PACKET = Hex.decode("22400891");
static {
byte[] peerIdBytes = HashUtil.randomPeerId();
HELLO_MESSAGE = new HelloMessage((byte)0x10, (byte)0x00, "EthereumJ [v0.5.1]",
// Hey Nick I like it that way ;)
HELLO_MESSAGE = new HelloMessage((byte)0x10, (byte)0x00, "EthereumJ [v0.5.1] by RomanJ ",
(byte)0b00000111, (short)30303, peerIdBytes);
}