metro-memory-fs: implement createReadStream()

Reviewed By: mjesun

Differential Revision: D7124071

fbshipit-source-id: a267e35bb32d540cec915ac38f5ab11e2096f33e
This commit is contained in:
Jean Lauliac 2018-03-05 07:33:09 -08:00 committed by Facebook Github Bot
parent b7e3e046cd
commit e565f10a30
2 changed files with 278 additions and 43 deletions

View File

@ -57,11 +57,11 @@ it('can write then read a file as buffer', () => {
expect(fs.readFileSync('/foo.txt')).toEqual(new Buffer([1, 2, 3, 4]));
});
it('can write a file with a stream', done => {
describe('createWriteStream', () => {
it('can write a file', done => {
const st = fs.createWriteStream('/foo.txt');
let opened = false;
let closed = false;
st.on('open', () => (opened = true));
st.on('close', () => (closed = true));
st.write('test');
@ -74,11 +74,10 @@ it('can write a file with a stream', done => {
});
});
it('can write a file with a stream, as buffer', done => {
it('can write a file, as buffer', done => {
const st = fs.createWriteStream('/foo.txt');
let opened = false;
let closed = false;
st.on('open', () => (opened = true));
st.on('close', () => (closed = true));
st.write(Buffer.from('test'));
@ -91,12 +90,11 @@ it('can write a file with a stream, as buffer', done => {
});
});
it('can write a file with a stream, with a starting position', done => {
it('can write a file, with a starting position', done => {
fs.writeFileSync('/foo.txt', 'test bar');
const st = fs.createWriteStream('/foo.txt', {start: 5, flags: 'r+'});
let opened = false;
let closed = false;
st.on('open', () => (opened = true));
st.on('close', () => (closed = true));
st.write('beep');
@ -107,6 +105,131 @@ it('can write a file with a stream, with a starting position', done => {
done();
});
});
});
describe('createReadStream', () => {
const REF_STR = 'foo bar baz glo beep boop';
beforeEach(() => {
fs.writeFileSync('/foo.txt', REF_STR);
});
it('reads a file', async () => {
const str = await readWithReadStream(null);
expect(str).toBe(REF_STR);
});
it('reads a file, with a starting position', async () => {
const str = await readWithReadStream({start: 4});
expect(str).toBe(REF_STR.substring(4));
});
it('reads a file, with an ending position', async () => {
const str = await readWithReadStream({end: 14});
// The `end` option is inclusive, but it's exclusive for `substring`,
// hence the difference between 14 and 15.
expect(str).toBe(REF_STR.substring(0, 15));
});
it('reads a file, with starting and ending positions', async () => {
const str = await readWithReadStream({start: 8, end: 14});
// The `end` option is inclusive, but it's exclusive for `substring`,
// hence the difference between 14 and 15.
expect(str).toBe(REF_STR.substring(8, 15));
});
it('reads a file, with custom flags and mode', async () => {
const str = await readWithReadStream(
{flags: 'wx+', mode: 0o600},
'/glo.txt',
);
expect(str).toBe('');
// Does not work yet, statSync needs to be fixed to support `mode`.
// expect(fs.statSync('/glo.txt').mode).toBe(0o600);
});
function readWithReadStream(options, filePath = '/foo.txt') {
return new Promise(resolve => {
const st = fs.createReadStream(
filePath,
options != null ? {...options, encoding: 'utf8'} : 'utf8',
);
let opened = false;
let closed = false;
st.on('open', () => (opened = true));
st.on('close', () => (closed = true));
expect((st: any).path).toBe(filePath);
let str = '';
st.on('data', chunk => {
expect(opened).toBe(true);
str += chunk;
});
st.on('end', () => {
expect(closed).toBe(true);
resolve(str);
});
});
}
it('reads a file as buffer', done => {
const st = fs.createReadStream('/foo.txt');
let buffer = new Buffer(0);
st.on('data', chunk => {
buffer = Buffer.concat([buffer, chunk]);
});
st.on('end', () => {
expect(buffer.toString('utf8')).toBe(REF_STR);
done();
});
});
it('reads a file with a custom fd', done => {
fs.writeFileSync('/bar.txt', 'tadam');
const fd = fs.openSync('/bar.txt', 'r');
const st = fs.createReadStream('/foo.txt', {fd, encoding: 'utf8'});
let opened = false;
let closed = false;
st.on('open', () => (opened = true));
st.on('close', () => (closed = true));
expect((st: any).path).toBe('/foo.txt');
let str = '';
st.on('data', chunk => {
str += chunk;
});
st.on('end', () => {
expect(opened).toBe(false);
expect(closed).toBe(true);
expect(str).toBe('tadam');
done();
});
});
it('reads a file with a custom fd, no auto-close', done => {
fs.writeFileSync('/bar.txt', 'tadam');
const fd = fs.openSync('/bar.txt', 'r');
const st = fs.createReadStream('/foo.txt', {
fd,
encoding: 'utf8',
autoClose: false,
});
let opened = false;
let closed = false;
st.on('open', () => (opened = true));
st.on('close', () => (closed = true));
expect((st: any).path).toBe('/foo.txt');
let str = '';
st.on('data', chunk => {
str += chunk;
});
st.on('end', () => {
expect(opened).toBe(false);
expect(closed).toBe(false);
expect(str).toBe('tadam');
fs.closeSync(fd);
done();
});
});
});
it('truncates a file that already exist', () => {
fs.writeFileSync('/foo.txt', 'test');

View File

@ -416,6 +416,49 @@ class MemoryFs {
return new Stats(node);
};
createReadStream = (
filePath: string | Buffer,
options?:
| {
autoClose?: ?boolean,
encoding?: ?Encoding,
end?: ?number,
fd?: ?number,
flags?: ?string,
highWaterMark?: ?number,
mode?: ?number,
start?: ?number,
}
| Encoding,
) => {
let autoClose, encoding, fd, flags, mode, start, end, highWaterMark;
if (typeof options === 'string') {
encoding = options;
} else if (options != null) {
({autoClose, encoding, fd, flags, mode, start} = options);
({end, highWaterMark} = options);
}
let st = null;
if (fd == null) {
fd = this._open(pathStr(filePath), flags || 'r', mode);
process.nextTick(() => (st: any).emit('open', fd));
}
const ffd = fd;
const {readSync} = this;
const ropt = {filePath, encoding, fd, highWaterMark, start, end, readSync};
const rst = new ReadFileSteam(ropt);
st = rst;
if (autoClose !== false) {
const doClose = () => {
this.closeSync(ffd);
rst.emit('close');
};
rst.on('end', doClose);
rst.on('error', doClose);
}
return rst;
};
createWriteStream = (
filePath: string | Buffer,
options?:
@ -433,14 +476,16 @@ class MemoryFs {
if (typeof options !== 'string' && options != null) {
({autoClose, fd, flags, mode, start} = options);
}
let st = null;
if (fd == null) {
fd = this._open(pathStr(filePath), flags || 'w', mode);
process.nextTick(() => (st: any).emit('open', fd));
}
if (start != null) {
this._write(fd, new Buffer(0), 0, 0, start);
}
const ffd = fd;
const st = new stream.Writable({
const rst = new stream.Writable({
write: (buffer, encoding, callback) => {
try {
this._write(ffd, buffer, 0, buffer.length);
@ -455,7 +500,7 @@ class MemoryFs {
try {
if (autoClose !== false) {
this.closeSync(ffd);
st.emit('close');
rst.emit('close');
}
} catch (error) {
callback(error);
@ -464,9 +509,9 @@ class MemoryFs {
callback();
},
});
st = rst;
(st: any).path = filePath;
(st: any).bytesWritten = 0;
process.nextTick(() => st.emit('open', ffd));
return st;
};
@ -702,6 +747,73 @@ class Stats {
}
}
type ReadSync = (
fd: number,
buffer: Buffer,
offset: number,
length: number,
position: ?number,
) => number;
class ReadFileSteam extends stream.Readable {
_buffer: Buffer;
_fd: number;
_positions: ?{current: number, last: number};
_readSync: ReadSync;
bytesRead: number;
path: string | Buffer;
constructor(options: {
filePath: string | Buffer,
encoding: ?Encoding,
end: ?number,
fd: number,
highWaterMark: ?number,
readSync: ReadSync,
start: ?number,
}) {
const {highWaterMark, fd} = options;
// eslint-disable-next-line lint/flow-no-fixme
// $FlowFixMe: Readable does accept null of undefined for that value.
super({highWaterMark});
this.bytesRead = 0;
this.path = options.filePath;
this._readSync = options.readSync;
this._fd = fd;
this._buffer = new Buffer(1024);
const {start, end} = options;
if (start != null) {
this._readSync(fd, new Buffer(0), 0, 0, start);
}
if (end != null) {
this._positions = {current: start || 0, last: end + 1};
}
}
_read(size) {
let bytesRead;
const {_buffer} = this;
do {
const length = this._getLengthToRead();
const position = this._positions && this._positions.current;
bytesRead = this._readSync(this._fd, _buffer, 0, length, position);
if (this._positions != null) {
this._positions.current += bytesRead;
}
this.bytesRead += bytesRead;
} while (this.push(bytesRead > 0 ? _buffer.slice(0, bytesRead) : null));
}
_getLengthToRead() {
const {_positions, _buffer} = this;
if (_positions == null) {
return _buffer.length;
}
const leftToRead = Math.max(0, _positions.last - _positions.current);
return Math.min(_buffer.length, leftToRead);
}
}
function checkPathLength(entNames, filePath) {
if (entNames.length > 32) {
throw makeError(