X Tutup
The Wayback Machine - https://web.archive.org/web/20220419131953/https://github.com/nodejs/node/commit/527e2147af
Skip to content
Permalink
Browse files
stream: add promises version to utility functions
PR-URL: #33991
Fixes: #33582
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
rickyes authored and ronag committed Jul 9, 2020
1 parent 6ae1b9c commit 527e2147afe43c7b19fd140ebd048c896705da7f
Showing with 193 additions and 8 deletions.
  1. +14 −8 doc/api/stream.md
  2. +36 −0 lib/stream.js
  3. +39 −0 lib/stream/promises.js
  4. +1 −0 node.gyp
  5. +103 −0 test/parallel/test-stream-promises.js
@@ -48,6 +48,13 @@ Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][] and
[`stream.Readable.from()`][].

### Streams Promises API

The `stream/promises` API provides an alternative set of asynchronous utility
functions for streams that return `Promise` objects rather than using
callbacks. The API is accessible via `require('stream/promises')`
or `require('stream').promises`.

### Object mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
@@ -1597,10 +1604,10 @@ Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

The `finished` API is promisify-able as well;
The `finished` API provides promise version:

```js
const finished = util.promisify(stream.finished);
const { finished } = require('stream/promises');
const rs = fs.createReadStream('archive.tar');
@@ -1684,10 +1691,10 @@ pipeline(
);
```

The `pipeline` API is promisify-able as well:
The `pipeline` API provides promise version:

```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');
async function run() {
await pipeline(
@@ -1704,7 +1711,7 @@ run().catch(console.error);
The `pipeline` API also supports async generators:

```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');
const fs = require('fs');
async function run() {
@@ -2927,9 +2934,9 @@ handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
the handling of backpressure and backpressure-related errors:

```js
const { pipeline } = require('stream');
const util = require('util');
const fs = require('fs');
const { pipeline } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');
const writable = fs.createWriteStream('./file');
@@ -2943,7 +2950,6 @@ pipeline(iterator, writable, (err, value) => {
});
// Promise Pattern
const pipelinePromise = util.promisify(pipeline);
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
@@ -21,10 +21,21 @@

'use strict';

const {
ObjectDefineProperty,
} = primordials;

const {
promisify: { custom: customPromisify },
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

// Lazy loaded
let promises = null;

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
@@ -38,6 +49,31 @@ Stream.PassThrough = require('_stream_passthrough');
Stream.pipeline = pipeline;
Stream.finished = eos;

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises;
}
});

ObjectDefineProperty(pipeline, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.pipeline;
}
});

ObjectDefineProperty(eos, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.finished;
}
});

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

@@ -0,0 +1,39 @@
'use strict';

const {
Promise,
} = primordials;

let pl;
let eos;

function pipeline(...streams) {
if (!pl) pl = require('internal/streams/pipeline');
return new Promise((resolve, reject) => {
pl(...streams, (err, value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
});
}

function finished(stream, opts) {
if (!eos) eos = require('internal/streams/end-of-stream');
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

module.exports = {
finished,
pipeline,
};
@@ -78,6 +78,7 @@
'lib/readline.js',
'lib/repl.js',
'lib/stream.js',
'lib/stream/promises.js',
'lib/_stream_readable.js',
'lib/_stream_writable.js',
'lib/_stream_duplex.js',
@@ -0,0 +1,103 @@
'use strict';

const common = require('../common');
const stream = require('stream');
const {
Readable,
Writable,
promises,
} = stream;
const {
finished,
pipeline,
} = require('stream/promises');
const fs = require('fs');
const assert = require('assert');
const { promisify } = require('util');

assert.strictEqual(promises.pipeline, pipeline);
assert.strictEqual(promises.finished, finished);
assert.strictEqual(pipeline, promisify(stream.pipeline));
assert.strictEqual(finished, promisify(stream.finished));

// pipeline success
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c')
];

const read = new Readable({
read() { }
});

const write = new Writable({
write(data, enc, cb) {
processed.push(data);
cb();
}
});

write.on('finish', () => {
finished = true;
});

for (let i = 0; i < expected.length; i++) {
read.push(expected[i]);
}
read.push(null);

pipeline(read, write).then(common.mustCall((value) => {
assert.ok(finished);
assert.deepStrictEqual(processed, expected);
}));
}

// pipeline error
{
const read = new Readable({
read() { }
});

const write = new Writable({
write(data, enc, cb) {
cb();
}
});

read.push('data');
setImmediate(() => read.destroy());

pipeline(read, write).catch(common.mustCall((err) => {
assert.ok(err, 'should have an error');
}));
}

// finished success
{
async function run() {
const rs = fs.createReadStream(__filename);

let ended = false;
rs.resume();
rs.on('end', () => {
ended = true;
});
await finished(rs);
assert(ended);
}

run().then(common.mustCall());
}

// finished error
{
const rs = fs.createReadStream('file-does-not-exist');

assert.rejects(finished(rs), {
code: 'ENOENT'
}).then(common.mustCall());
}

0 comments on commit 527e214

Please sign in to comment.
X Tutup