push socket creation into connect and listen methods

This commit is contained in:
Andy Prock 2015-12-28 11:12:21 -08:00
parent 0d08f22a66
commit bceff85694
8 changed files with 152 additions and 102 deletions

View File

@ -22,12 +22,14 @@ function TcpServer(connectionListener: (socket: Socket) => void) {
return new TcpServer(connectionListener); return new TcpServer(connectionListener);
} }
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter if (EventEmitter instanceof Function) {
EventEmitter.call(this); EventEmitter.call(this);
}
var self = this; var self = this;
this._socket = new Socket(); this._socket = new Socket();
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter // $FlowFixMe: suppressing this error flow doesn't like EventEmitter
this._socket.on('connect', function() { this._socket.on('connect', function() {
self.emit('listening'); self.emit('listening');
@ -35,7 +37,6 @@ function TcpServer(connectionListener: (socket: Socket) => void) {
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter // $FlowFixMe: suppressing this error flow doesn't like EventEmitter
this._socket.on('connection', function(socket) { this._socket.on('connection', function(socket) {
self._connections++; self._connections++;
self.emit('connection', socket); self.emit('connection', socket);
}); });
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter // $FlowFixMe: suppressing this error flow doesn't like EventEmitter
@ -45,7 +46,6 @@ function TcpServer(connectionListener: (socket: Socket) => void) {
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter // $FlowFixMe: suppressing this error flow doesn't like EventEmitter
this._socket.on('error', function(error) { this._socket.on('error', function(error) {
self.emit('error', error); self.emit('error', error);
self._socket.destroy();
}); });
if (typeof connectionListener === 'function') { if (typeof connectionListener === 'function') {
@ -64,16 +64,21 @@ TcpServer.prototype._debug = function() {
} }
}; };
TcpServer.prototype.listen = function(options: { port: number, hostname: ?string }, callback: ?() => void) : TcpServer { // TODO : determine how to properly overload this with flow
TcpServer.prototype.listen = function() : TcpServer {
var args = this._socket._normalizeConnectArgs(arguments);
var options = args[0];
var callback = args[1];
var port = options.port; var port = options.port;
var hostname = options.hostname || 'localhost'; var host = options.host || 'localhost';
if (callback) { if (callback) {
this.on('listening', callback); this.on('listening', callback);
} }
Sockets.createSocket(this._socket._id); this._socket._registerEvents();
Sockets.listen(this._socket._id, hostname, port); Sockets.listen(this._socket._id, host, port);
return this; return this;
}; };

View File

@ -19,7 +19,7 @@ var Sockets = NativeModules.TcpSockets;
var base64 = require('base64-js'); var base64 = require('base64-js');
var Base64Str = require('./base64-str'); var Base64Str = require('./base64-str');
var noop = function () {}; var noop = function () {};
var usedIds = []; var instances = 0;
var STATE = { var STATE = {
DISCONNECTED: 0, DISCONNECTED: 0,
CONNECTING: 1, CONNECTING: 1,
@ -27,37 +27,34 @@ var STATE = {
}; };
function TcpSocket(options: ?{ id: ?number }) { function TcpSocket(options: ?{ id: ?number }) {
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter if (!(this instanceof TcpSocket)) {
EventEmitter.call(this); return new TcpSocket(options);
}
if (EventEmitter instanceof Function) {
EventEmitter.call(this);
}
if (options && options.id) { if (options && options.id) {
// native generated sockets range from 5000-6000
// e.g. incoming server connections // e.g. incoming server connections
this._id = Number(options.id); this._id = Number(options.id);
if (usedIds.indexOf(this._id) !== -1) { if (this._id <= instances) {
throw new Error('Socket id ' + this._id + 'already in use'); throw new Error('Socket id ' + this._id + 'already in use');
} }
} else { } else {
// javascript generated sockets range from 1-1000 // javascript generated sockets range from 1-1000
this._id = Math.floor((Math.random() * 1000) + 1); this._id = instances++;
while (usedIds.indexOf(this._id) !== -1) {
this._id = Math.floor((Math.random() * 1000) + 1);
}
} }
usedIds.push(this._id);
// these will be set once there is a connection
this.readable = this.writable = false;
this._registerEvents();
// ensure compatibility with node's EventEmitter // ensure compatibility with node's EventEmitter
if (!this.on) { if (!this.on) {
this.on = this.addListener.bind(this); this.on = this.addListener.bind(this);
} }
// these will be set once there is a connection
this.writable = this.readable = false;
this._state = STATE.DISCONNECTED; this._state = STATE.DISCONNECTED;
} }
@ -71,24 +68,21 @@ TcpSocket.prototype._debug = function() {
} }
}; };
TcpSocket.prototype.connect = function(options: ?{ port: ?number | ?string, host: ?string, localAddress: ?string, localPort: ?number }, callback: ?() => void) { // TODO : determine how to properly overload this with flow
if (this._state !== STATE.DISCONNECTED) { TcpSocket.prototype.connect = function(options, callback) : TcpSocket {
throw new Error('Socket is already bound'); this._registerEvents();
if (options === null || typeof options !== 'object') {
// Old API:
// connect(port, [host], [cb])
var args = this._normalizeConnectArgs(arguments);
return TcpSocket.prototype.connect.apply(this, args);
} }
if (typeof callback === 'function') { if (typeof callback === 'function') {
this.once('connect', callback); this.once('connect', callback);
} }
if (!options) {
options = {
host: 'localhost',
port: 0,
localAddress: null,
localPort: null
};
}
var host = options.host || 'localhost'; var host = options.host || 'localhost';
var port = options.port || 0; var port = options.port || 0;
var localAddress = options.localAddress; var localAddress = options.localAddress;
@ -107,7 +101,7 @@ TcpSocket.prototype.connect = function(options: ?{ port: ?number | ?string, host
throw new TypeError('"port" option should be a number or string: ' + port); throw new TypeError('"port" option should be a number or string: ' + port);
} }
port = Number(port); port = +port;
if (!isLegalPort(port)) { if (!isLegalPort(port)) {
throw new RangeError('"port" option should be >= 0 and < 65536: ' + port); throw new RangeError('"port" option should be >= 0 and < 65536: ' + port);
@ -117,8 +111,10 @@ TcpSocket.prototype.connect = function(options: ?{ port: ?number | ?string, host
this._state = STATE.CONNECTING; this._state = STATE.CONNECTING;
this._debug('connecting, host:', host, 'port:', port); this._debug('connecting, host:', host, 'port:', port);
Sockets.createSocket(this._id); this._destroyed = false;
Sockets.connect(this._id, host, Number(port), options); Sockets.connect(this._id, host, Number(port), options);
return this;
}; };
// Check that the port number is not NaN when coerced to a number, // Check that the port number is not NaN when coerced to a number,
@ -181,6 +177,10 @@ TcpSocket.prototype.destroy = function() {
}; };
TcpSocket.prototype._registerEvents = function(): void { TcpSocket.prototype._registerEvents = function(): void {
if (this._subs && this._subs.length > 0) {
return;
}
this._subs = [ this._subs = [
DeviceEventEmitter.addListener( DeviceEventEmitter.addListener(
'tcp-' + this._id + '-connect', this._onConnect.bind(this) 'tcp-' + this._id + '-connect', this._onConnect.bind(this)
@ -207,27 +207,20 @@ TcpSocket.prototype._unregisterEvents = function(): void {
this._subs = []; this._subs = [];
}; };
TcpSocket.prototype._onConnect = function(address: { port: string, address: string, family: string }): void { TcpSocket.prototype._onConnect = function(address: { port: number, address: string, family: string }): void {
this._debug('received', 'connect'); this._debug('received', 'connect');
this.writable = this.readable = true; setConnected(this, address);
this._state = STATE.CONNECTED;
this._address = address;
this._address.port = Number(this._address.port);
this.emit('connect'); this.emit('connect');
}; };
TcpSocket.prototype._onConnection = function(info: { id: number, address: { port: string, address: string, family: string } }): void { TcpSocket.prototype._onConnection = function(info: { id: number, address: { port: number, address: string, family: string } }): void {
this._debug('received', 'connection'); this._debug('received', 'connection');
var socket = new TcpSocket({ id: info.id }); var socket = new TcpSocket({ id: info.id });
socket.writable = this.readable = true; socket._registerEvents();
socket._state = STATE.CONNECTED; setConnected(socket, info.address);
socket._address = info.address;
socket._address.port = Number(socket._address.port);
this.emit('connection', socket); this.emit('connection', socket);
}; };
@ -250,17 +243,14 @@ TcpSocket.prototype._onData = function(data: string): void {
TcpSocket.prototype._onClose = function(hadError: boolean): void { TcpSocket.prototype._onClose = function(hadError: boolean): void {
this._debug('received', 'close'); this._debug('received', 'close');
this._unregisterEvents(); setDisconnected(this, hadError);
this._state = STATE.DISCONNECTED;
this.emit('close', hadError);
}; };
TcpSocket.prototype._onError = function(error: string): void { TcpSocket.prototype._onError = function(error: string): void {
this._debug('received', 'error'); this._debug('received', 'error');
this.emit('error', normalizeError(error)); this.emit('error', normalizeError(error));
this.destroy();
}; };
TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => void) : boolean { TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => void) : boolean {
@ -303,6 +293,22 @@ TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => vo
return true; return true;
}; };
function setConnected(socket: TcpSocket, address: { port: number, address: string, family: string } ) {
socket.writable = socket.readable = true;
socket._state = STATE.CONNECTED;
socket._address = address;
}
function setDisconnected(socket: TcpSocket, hadError: boolean): void {
if (socket._state === STATE.DISCONNECTED) {
return;
}
socket._unregisterEvents();
socket._state = STATE.DISCONNECTED;
socket.emit('close', hadError);
}
function normalizeError(err) { function normalizeError(err) {
if (err) { if (err) {
if (typeof err === 'string') { if (typeof err === 'string') {
@ -313,4 +319,24 @@ function normalizeError(err) {
} }
} }
// Returns an array [options] or [options, cb]
// It is the same as the argument of Socket.prototype.connect().
TcpSocket.prototype._normalizeConnectArgs = function(args) {
var options = {};
if (args[0] !== null && typeof args[0] === 'object') {
// connect(options, [cb])
options = args[0];
} else {
// connect(port, [host], [cb])
options.port = args[0];
if (typeof args[1] === 'string') {
options.host = args[1];
}
}
var cb = args[args.length - 1];
return typeof cb === 'function' ? [options, cb] : [options];
};
module.exports = TcpSocket; module.exports = TcpSocket;

View File

@ -17,10 +17,10 @@ exports.createServer = function(connectionListener: (socket: Socket) => void) :
return new Server(connectionListener); return new Server(connectionListener);
}; };
exports.connect = exports.createConnection = function(options: ?{ port: ?number | ?string, host: ?string, localAddress: ?string, localPort: ?number}, callback: ?() => void) : Socket { // TODO : determine how to properly overload this with flow
exports.connect = exports.createConnection = function() : Socket {
var tcpSocket = new Socket(); var tcpSocket = new Socket();
tcpSocket.connect(options, callback); return Socket.prototype.connect.apply(tcpSocket, tcpSocket._normalizeConnectArgs(arguments));
return tcpSocket;
}; };
exports.isIP = function(input: string) : number { exports.isIP = function(input: string) : number {

View File

@ -7,12 +7,24 @@
# Some modules have their own node_modules with overlap # Some modules have their own node_modules with overlap
.*/node_modules/node-haste/.* .*/node_modules/node-haste/.*
# Ignore react-tools where there are overlaps, but don't ignore anything that # Ugh
# react-native relies on .*/node_modules/babel.*
.*/node_modules/react-tools/src/React.js .*/node_modules/babylon.*
.*/node_modules/react-tools/src/renderers/shared/event/EventPropagators.js .*/node_modules/invariant.*
.*/node_modules/react-tools/src/renderers/shared/event/eventPlugins/ResponderEventPlugin.js
.*/node_modules/react-tools/src/shared/vendor/core/ExecutionEnvironment.js # Ignore react and fbjs where there are overlaps, but don't ignore
# anything that react-native relies on
.*/node_modules/fbjs-haste/.*/__tests__/.*
.*/node_modules/fbjs-haste/__forks__/Map.js
.*/node_modules/fbjs-haste/__forks__/Promise.js
.*/node_modules/fbjs-haste/__forks__/fetch.js
.*/node_modules/fbjs-haste/core/ExecutionEnvironment.js
.*/node_modules/fbjs-haste/core/isEmpty.js
.*/node_modules/fbjs-haste/crypto/crc32.js
.*/node_modules/fbjs-haste/stubs/ErrorUtils.js
.*/node_modules/react-haste/React.js
.*/node_modules/react-haste/renderers/dom/ReactDOM.js
.*/node_modules/react-haste/renderers/shared/event/eventPlugins/ResponderEventPlugin.js
# Ignore commoner tests # Ignore commoner tests
.*/node_modules/commoner/test/.* .*/node_modules/commoner/test/.*
@ -43,9 +55,9 @@ suppress_type=$FlowIssue
suppress_type=$FlowFixMe suppress_type=$FlowFixMe
suppress_type=$FixMe suppress_type=$FixMe
suppress_comment=\\(.\\|\n\\)*\\$FlowFixMe\\($\\|[^(]\\|(\\(>=0\\.\\(1[0-7]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\) suppress_comment=\\(.\\|\n\\)*\\$FlowFixMe\\($\\|[^(]\\|(\\(>=0\\.\\(2[0-0]\\|1[0-9]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\)
suppress_comment=\\(.\\|\n\\)*\\$FlowIssue\\((\\(>=0\\.\\(1[0-7]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\)? #[0-9]+ suppress_comment=\\(.\\|\n\\)*\\$FlowIssue\\((\\(>=0\\.\\(2[0-0]\\|1[0-9]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\)?:? #[0-9]+
suppress_comment=\\(.\\|\n\\)*\\$FlowFixedInNextDeploy suppress_comment=\\(.\\|\n\\)*\\$FlowFixedInNextDeploy
[version] [version]
0.17.0 0.20.1

View File

@ -24,7 +24,7 @@ function randomPort() {
var serverPort = randomPort(); var serverPort = randomPort();
var server = net.createServer(function(socket) { var server = net.createServer(function(socket) {
console.log('server connected'); console.log('server connected on ' + JSON.stringify(socket.address()));
socket.on('data', function (data) { socket.on('data', function (data) {
console.log('Server Received: ' + data); console.log('Server Received: ' + data);
@ -34,21 +34,33 @@ var server = net.createServer(function(socket) {
socket.on('error', function(error) { socket.on('error', function(error) {
console.log('error ' + error); console.log('error ' + error);
}); });
}).listen({ port: serverPort }, function() { }).listen(serverPort, function() {
console.log('opened server on ' + JSON.stringify(server.address())); console.log('opened server on ' + JSON.stringify(server.address()));
}); });
var client = net.createConnection({ port: serverPort }, function() { server.on('error', function(error) {
console.log('error ' + error);
});
var client = net.createConnection(serverPort, function() {
console.log('opened client on ' + JSON.stringify(client.address())); console.log('opened client on ' + JSON.stringify(client.address()));
client.write('Hello, server! Love, Client.'); client.write('Hello, server! Love, Client.');
}); });
client.on('data', function(data) { client.on('data', function(data) {
console.log('Client Received: ' + data); console.log('Client Received: ' + data);
client.destroy(); // kill client after server's response client.destroy(); // kill client after server's response
server.close(); server.close();
}); });
client.on('error', function(error) {
console.log('client error ' + error);
});
client.on('close', function() {
console.log('client close');
});
var rctsockets = React.createClass({ var rctsockets = React.createClass({
render: function() { render: function() {
return ( return (
@ -80,14 +92,4 @@ var styles = StyleSheet.create({
}, },
}); });
// only works for 8-bit chars
function toByteArray(obj) {
var uint = new Uint8Array(obj.length);
for (var i = 0, l = obj.length; i < l; i++){
uint[i] = obj.charCodeAt(i);
}
return new Uint8Array(uint);
}
AppRegistry.registerComponent('rctsockets', () => rctsockets); AppRegistry.registerComponent('rctsockets', () => rctsockets);

View File

@ -31,7 +31,7 @@ typedef enum RCTTCPError RCTTCPError;
- (void)onData:(NSNumber *)clientID data:(NSData *)data; - (void)onData:(NSNumber *)clientID data:(NSData *)data;
- (void)onClose:(TcpSocketClient*)client withError:(NSError *)err; - (void)onClose:(TcpSocketClient*)client withError:(NSError *)err;
- (void)onError:(TcpSocketClient*)client withError:(NSError *)err; - (void)onError:(TcpSocketClient*)client withError:(NSError *)err;
- (NSNumber*)generateRandomId; - (NSNumber*)getNextId;
@end @end
@ -67,7 +67,7 @@ typedef enum RCTTCPError RCTTCPError;
- (BOOL)listen:(NSString *)host port:(int)port error:(NSError **)error; - (BOOL)listen:(NSString *)host port:(int)port error:(NSError **)error;
- (NSDictionary<NSString *, NSString *> *)getAddress; - (NSDictionary<NSString *, id> *)getAddress;
/** /**
* write data * write data

View File

@ -85,22 +85,22 @@ NSString *const RCTTCPErrorDomain = @"RCTTCPErrorDomain";
return result; return result;
} }
- (NSDictionary<NSString *, NSString *> *)getAddress - (NSDictionary<NSString *, id> *)getAddress
{ {
if (_tcpSocket) if (_tcpSocket)
{ {
if (_tcpSocket.isConnected) { if (_tcpSocket.isConnected) {
return @{ @"port": @(_tcpSocket.connectedPort).stringValue, return @{ @"port": @(_tcpSocket.connectedPort),
@"address": _tcpSocket.connectedHost ?: @"unknown", @"address": _tcpSocket.connectedHost ?: @"unknown",
@"family": _tcpSocket.isIPv6?@"IPv6":@"IPv4" }; @"family": _tcpSocket.isIPv6?@"IPv6":@"IPv4" };
} else { } else {
return @{ @"port": @(_tcpSocket.localPort).stringValue, return @{ @"port": @(_tcpSocket.localPort),
@"address": _tcpSocket.localHost ?: @"unknown", @"address": _tcpSocket.localHost ?: @"unknown",
@"family": _tcpSocket.isIPv6?@"IPv6":@"IPv4" }; @"family": _tcpSocket.isIPv6?@"IPv6":@"IPv4" };
} }
} }
return @{ @"port": @"0", return @{ @"port": @(0),
@"address": @"unknown", @"address": @"unknown",
@"family": @"unkown" }; @"family": @"unkown" };
} }
@ -201,7 +201,7 @@ NSString *const RCTTCPErrorDomain = @"RCTTCPErrorDomain";
- (void)socket:(GCDAsyncSocket *)sock didAcceptNewSocket:(GCDAsyncSocket *)newSocket - (void)socket:(GCDAsyncSocket *)sock didAcceptNewSocket:(GCDAsyncSocket *)newSocket
{ {
TcpSocketClient *inComing = [[TcpSocketClient alloc] initWithClientId:[_clientDelegate generateRandomId] TcpSocketClient *inComing = [[TcpSocketClient alloc] initWithClientId:[_clientDelegate getNextId]
andConfig:_clientDelegate andConfig:_clientDelegate
andSocket:newSocket]; andSocket:newSocket];
[_clientDelegate onConnection: inComing [_clientDelegate onConnection: inComing

View File

@ -11,9 +11,13 @@
#import "TcpSockets.h" #import "TcpSockets.h"
#import "TcpSocketClient.h" #import "TcpSocketClient.h"
// offset native ids by 5000
#define COUNTER_OFFSET 5000
@implementation TcpSockets @implementation TcpSockets
{ {
NSMutableDictionary<NSNumber *,TcpSocketClient *> *_clients; NSMutableDictionary<NSNumber *,TcpSocketClient *> *_clients;
int _counter;
} }
RCT_EXPORT_MODULE() RCT_EXPORT_MODULE()
@ -27,11 +31,11 @@ RCT_EXPORT_MODULE()
} }
} }
RCT_EXPORT_METHOD(createSocket:(nonnull NSNumber*)cId) - (TcpSocketClient *)createSocket:(nonnull NSNumber*)cId
{ {
if (!cId) { if (!cId) {
RCTLogError(@"%@.createSocket called with nil id parameter.", [self class]); RCTLogError(@"%@.createSocket called with nil id parameter.", [self class]);
return; return nil;
} }
if (!_clients) { if (!_clients) {
@ -40,10 +44,12 @@ RCT_EXPORT_METHOD(createSocket:(nonnull NSNumber*)cId)
if (_clients[cId]) { if (_clients[cId]) {
RCTLogError(@"%@.createSocket called twice with the same id.", [self class]); RCTLogError(@"%@.createSocket called twice with the same id.", [self class]);
return; return nil;
} }
_clients[cId] = [TcpSocketClient socketClientWithId:cId andConfig:self]; _clients[cId] = [TcpSocketClient socketClientWithId:cId andConfig:self];
return _clients[cId];
} }
RCT_EXPORT_METHOD(connect:(nonnull NSNumber*)cId RCT_EXPORT_METHOD(connect:(nonnull NSNumber*)cId
@ -51,8 +57,10 @@ RCT_EXPORT_METHOD(connect:(nonnull NSNumber*)cId
port:(int)port port:(int)port
withOptions:(NSDictionary *)options) withOptions:(NSDictionary *)options)
{ {
TcpSocketClient* client = [self findClient:cId callback:nil]; TcpSocketClient *client = _clients[cId];
if (!client) return; if (!client) {
client = [self createSocket:cId];
}
NSError *error = nil; NSError *error = nil;
if (![client connect:host port:port withOptions:options error:&error]) if (![client connect:host port:port withOptions:options error:&error])
@ -88,8 +96,10 @@ RCT_EXPORT_METHOD(listen:(nonnull NSNumber*)cId
host:(NSString *)host host:(NSString *)host
port:(int)port) port:(int)port)
{ {
TcpSocketClient* client = [self findClient:cId callback:nil]; TcpSocketClient* client = _clients[cId];
if (!client) return; if (!client) {
client = [self createSocket:cId];
}
NSError *error = nil; NSError *error = nil;
if (![client listen:host port:port error:&error]) if (![client listen:host port:port error:&error])
@ -176,13 +186,8 @@ RCT_EXPORT_METHOD(listen:(nonnull NSNumber*)cId
[_clients removeObjectForKey:cId]; [_clients removeObjectForKey:cId];
} }
-(NSNumber*)generateRandomId { -(NSNumber*)getNextId {
int r = 0; return @(_counter++ + COUNTER_OFFSET);
do {
r = (arc4random() % 1000) + 5001;
} while(_clients[@(r)]);
return @(r);
} }
@end @end