Merge pull request #21 from PeelTechnologies/stream

make sockets duplex streams
This commit is contained in:
Andy Prock 2016-11-03 09:27:29 -07:00 committed by GitHub
commit 79103ab4cd
5 changed files with 84 additions and 29 deletions

View File

@ -8,7 +8,7 @@
'use strict'; 'use strict';
var inherits = require('inherits'); var util = require('util');
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var { var {
NativeModules NativeModules
@ -55,7 +55,7 @@ function TcpServer(connectionListener: (socket: Socket) => void) {
this._connections = 0; this._connections = 0;
} }
inherits(TcpServer, EventEmitter); util.inherits(TcpServer, EventEmitter);
TcpServer.prototype._debug = function() { TcpServer.prototype._debug = function() {
if (__DEV__) { if (__DEV__) {

View File

@ -8,8 +8,12 @@
'use strict'; 'use strict';
var inherits = require('inherits'); global.process = require('process'); // needed to make stream-browserify happy
var EventEmitter = require('events').EventEmitter; var Buffer = global.Buffer = global.Buffer || require('buffer').Buffer;
var util = require('util');
var stream = require('stream-browserify');
// var EventEmitter = require('events').EventEmitter;
var ipRegex = require('ip-regex'); var ipRegex = require('ip-regex');
var { var {
DeviceEventEmitter, DeviceEventEmitter,
@ -31,10 +35,6 @@ function TcpSocket(options: ?{ id: ?number }) {
return new TcpSocket(options); return new TcpSocket(options);
} }
if (EventEmitter instanceof Function) {
EventEmitter.call(this);
}
if (options && options.id) { if (options && options.id) {
// e.g. incoming server connections // e.g. incoming server connections
this._id = Number(options.id); this._id = Number(options.id);
@ -47,6 +47,8 @@ function TcpSocket(options: ?{ id: ?number }) {
this._id = instances++; this._id = instances++;
} }
stream.Duplex.call(this, {});
// ensure compatibility with node's EventEmitter // ensure compatibility with node's EventEmitter
if (!this.on) { if (!this.on) {
this.on = this.addListener.bind(this); this.on = this.addListener.bind(this);
@ -56,9 +58,11 @@ function TcpSocket(options: ?{ id: ?number }) {
this.writable = this.readable = false; this.writable = this.readable = false;
this._state = STATE.DISCONNECTED; this._state = STATE.DISCONNECTED;
this.read(0);
} }
inherits(TcpSocket, EventEmitter); util.inherits(TcpSocket, stream.Duplex);
TcpSocket.prototype._debug = function() { TcpSocket.prototype._debug = function() {
if (__DEV__) { if (__DEV__) {
@ -124,7 +128,33 @@ function isLegalPort(port: number) : boolean {
return false; return false;
} }
return +port === (port >>> 0) && port >= 0 && port <= 0xFFFF; return +port === (port >>> 0) && port >= 0 && port <= 0xFFFF;
} };
TcpSocket.prototype.read = function(n) {
if (n === 0) {
return stream.Readable.prototype.read.call(this, n);
}
this.read = stream.Readable.prototype.read;
this._consuming = true;
return this.read(n);
};
// Just call handle.readStart until we have enough in the buffer
TcpSocket.prototype._read = function(n) {
this._debug('_read');
if (this._state === STATE.CONNECTING) {
this._debug('_read wait for connection');
this.once('connect', () => this._read(n));
} else if (!this._reading) {
// not already reading, start the flow
this._debug('Socket._read resume');
this._reading = true;
this.resume();
}
};
TcpSocket.prototype.setTimeout = function(msecs: number, callback: () => void) { TcpSocket.prototype.setTimeout = function(msecs: number, callback: () => void) {
var self = this; var self = this;
@ -153,6 +183,9 @@ TcpSocket.prototype.address = function() : { port: number, address: string, fami
}; };
TcpSocket.prototype.end = function(data, encoding) { TcpSocket.prototype.end = function(data, encoding) {
stream.Duplex.prototype.end.call(this, data, encoding);
this.writable = false;
if (this._destroyed) { if (this._destroyed) {
return; return;
} }
@ -161,6 +194,11 @@ TcpSocket.prototype.end = function(data, encoding) {
this.write(data, encoding); this.write(data, encoding);
} }
if (this.readable) {
this.read(0);
this.readable = false;
}
this._destroyed = true; this._destroyed = true;
this._debug('ending'); this._debug('ending');
@ -212,6 +250,8 @@ TcpSocket.prototype._onConnect = function(address: { port: number, address: stri
setConnected(this, address); setConnected(this, address);
this.emit('connect'); this.emit('connect');
this.read(0);
}; };
TcpSocket.prototype._onConnection = function(info: { id: number, address: { port: number, address: string, family: string } }): void { TcpSocket.prototype._onConnection = function(info: { id: number, address: { port: number, address: string, family: string } }): void {
@ -232,12 +272,22 @@ TcpSocket.prototype._onData = function(data: string): void {
this._timeout = null; this._timeout = null;
} }
// from base64 string if (data && data.length > 0) {
var buffer = typeof Buffer === 'undefined' // debug('got data');
? base64.toByteArray(data)
: new global.Buffer(data, 'base64');
this.emit('data', buffer); // read success.
// In theory (and in practice) calling readStop right now
// will prevent this from being called again until _read() gets
// called again.
var ret = this.push(new Buffer(data, 'base64'));
if (this._reading && !ret) {
this._reading = false;
this.pause();
}
return;
}
}; };
TcpSocket.prototype._onClose = function(hadError: boolean): void { TcpSocket.prototype._onClose = function(hadError: boolean): void {
@ -253,7 +303,16 @@ TcpSocket.prototype._onError = function(error: string): void {
this.destroy(); this.destroy();
}; };
TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => void) : boolean { TcpSocket.prototype.write = function(chunk, encoding, cb) {
if (typeof chunk !== 'string' && !(Buffer.isBuffer(chunk))) {
throw new TypeError(
'Invalid data, chunk must be a string or buffer, not ' + typeof chunk);
}
return stream.Duplex.prototype.write.apply(this, arguments);
};
TcpSocket.prototype._write = function(buffer: any, encoding: ?String, callback: ?(err: ?Error) => void) : boolean {
var self = this; var self = this;
if (this._state === STATE.DISCONNECTED) { if (this._state === STATE.DISCONNECTED) {
@ -262,17 +321,16 @@ TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => vo
// we're ok, GCDAsyncSocket handles queueing internally // we're ok, GCDAsyncSocket handles queueing internally
} }
var cb = callback || noop; callback = callback || noop;
var str; var str;
if (typeof buffer === 'string') { if (typeof buffer === 'string') {
self._debug('socket.WRITE(): encoding as base64'); self._debug('socket.WRITE(): encoding as base64');
str = Base64Str.encode(buffer); str = Base64Str.encode(buffer);
} else if (typeof Buffer !== 'undefined' && global.Buffer.isBuffer(buffer)) { } else if (Buffer.isBuffer(buffer)) {
str = buffer.toString('base64'); str = buffer.toString('base64');
} else if (buffer instanceof Uint8Array || Array.isArray(buffer)) {
str = base64.fromByteArray(buffer);
} else { } else {
throw new Error('invalid message format'); throw new TypeError(
'Invalid data, chunk must be a string or buffer, not ' + typeof buffer);
} }
Sockets.write(this._id, str, function(err) { Sockets.write(this._id, str, function(err) {
@ -284,10 +342,10 @@ TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => vo
err = normalizeError(err); err = normalizeError(err);
if (err) { if (err) {
self._debug('write failed', err); self._debug('write failed', err);
return cb(err); return callback(err);
} }
cb(); callback();
}); });
return true; return true;
@ -340,8 +398,6 @@ TcpSocket.prototype._normalizeConnectArgs = function(args) {
}; };
// unimplemented net.Socket apis // unimplemented net.Socket apis
TcpSocket.prototype.pause =
TcpSocket.prototype.resume =
TcpSocket.prototype.ref = TcpSocket.prototype.ref =
TcpSocket.prototype.unref = TcpSocket.prototype.unref =
TcpSocket.prototype.setNoDelay = TcpSocket.prototype.setNoDelay =

View File

@ -13,8 +13,6 @@ import {
View View
} from 'react-native'; } from 'react-native';
global.Buffer = global.Buffer || require('buffer').Buffer;
var net = require('net'); var net = require('net');
function randomPort() { function randomPort() {

View File

@ -9,7 +9,6 @@
"net": "react-native-tcp" "net": "react-native-tcp"
}, },
"dependencies": { "dependencies": {
"buffer": "^4.6.0",
"react": "^15.1.0", "react": "^15.1.0",
"react-native": "^0.35.0", "react-native": "^0.35.0",
"react-native-tcp": "../../" "react-native-tcp": "../../"

View File

@ -34,9 +34,11 @@
"homepage": "https://github.com/PeelTechnologies/react-native-tcp", "homepage": "https://github.com/PeelTechnologies/react-native-tcp",
"dependencies": { "dependencies": {
"base64-js": "0.0.8", "base64-js": "0.0.8",
"buffer": "^5.0.0",
"events": "^1.0.2", "events": "^1.0.2",
"inherits": "^2.0.1",
"ip-regex": "^1.0.3", "ip-regex": "^1.0.3",
"process": "^0.11.9",
"stream-browserify": "^2.0.1",
"util": "^0.10.3" "util": "^0.10.3"
}, },
"devDependencies": { "devDependencies": {