Skip to content

Commit 12ff355

Browse files
committed
streams: add CompressionStream and DecompressionStream
Signed-off-by: James M Snell <[email protected]>
1 parent 413e9d8 commit 12ff355

File tree

4 files changed

+282
-0
lines changed

4 files changed

+282
-0
lines changed

doc/api/webstreams.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,5 +1118,57 @@ added: REPLACEME
11181118
* `chunk` {any}
11191119
* Returns: {number}
11201120
1121+
### Class: `CompressionStream`
1122+
<!-- YAML
1123+
added: REPLACEME
1124+
-->
1125+
1126+
#### `new CompressionStream(format)`
1127+
<!-- YAML
1128+
added: REPLACEME
1129+
-->
1130+
1131+
* `format` {string} One of either `'deflate'` or `'gzip'`.
1132+
1133+
#### `compressionStream.readable`
1134+
<!-- YAML
1135+
added: REPLACEME
1136+
-->
1137+
1138+
* Type: {ReadableStream}
1139+
1140+
#### `compressionStream.writable`
1141+
<!-- YAML
1142+
added: REPLACEME
1143+
-->
1144+
1145+
* Type: {WritableStream}
1146+
1147+
### Class: `DecompressionStream`
1148+
<!-- YAML
1149+
added: REPLACEME
1150+
-->
1151+
1152+
#### `new DecompressionStream(format)`
1153+
<!-- YAML
1154+
added: REPLACEME
1155+
-->
1156+
1157+
* `format` {string} One of either `'deflate'` or `'gzip'`.
1158+
1159+
#### `decompressionStream.readable`
1160+
<!-- YAML
1161+
added: REPLACEME
1162+
-->
1163+
1164+
* Type: {ReadableStream}
1165+
1166+
#### `deccompressionStream.writable`
1167+
<!-- YAML
1168+
added: REPLACEME
1169+
-->
1170+
1171+
* Type: {WritableStream}
1172+
11211173
[Streams]: stream.md
11221174
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
'use strict';
2+
3+
const {
4+
ObjectDefineProperties,
5+
Symbol,
6+
} = primordials;
7+
8+
const {
9+
codes: {
10+
ERR_INVALID_ARG_VALUE,
11+
ERR_INVALID_THIS,
12+
},
13+
} = require('internal/errors');
14+
15+
const {
16+
newReadableWritablePairFromDuplex,
17+
} = require('internal/webstreams/adapters');
18+
19+
const {
20+
customInspect,
21+
} = require('internal/webstreams/util');
22+
23+
const {
24+
customInspectSymbol: kInspect,
25+
} = require('internal/util');
26+
27+
const {
28+
createDeflate,
29+
createInflate,
30+
createGzip,
31+
createGunzip,
32+
} = require('zlib');
33+
34+
const kHandle = Symbol('kHandle');
35+
const kTransform = Symbol('kTransform');
36+
const kType = Symbol('kType');
37+
38+
/**
39+
* @typedef {import('./readablestream').ReadableStream} ReadableStream
40+
* @typedef {import('./writablestream').WritableStream} WritableStream
41+
*/
42+
43+
function isCompressionStream(value) {
44+
return typeof value?.[kHandle] === 'object' &&
45+
value?.[kType] === 'CompressionStream';
46+
}
47+
48+
function isDecompressionStream(value) {
49+
return typeof value?.[kHandle] === 'object' &&
50+
value?.[kType] === 'DecompressionStream';
51+
}
52+
53+
class CompressionStream {
54+
/**
55+
* @param {'deflate'|'gzip'} format
56+
*/
57+
constructor(format) {
58+
this[kType] = 'CompressionStream';
59+
switch (format) {
60+
case 'deflate':
61+
this[kHandle] = createDeflate();
62+
break;
63+
case 'gzip':
64+
this[kHandle] = createGzip();
65+
break;
66+
default:
67+
throw new ERR_INVALID_ARG_VALUE('format', format);
68+
}
69+
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
70+
}
71+
72+
/**
73+
* @readonly
74+
* @type {ReadableStream}
75+
*/
76+
get readable() {
77+
if (!isCompressionStream(this))
78+
throw new ERR_INVALID_THIS('CompressionStream');
79+
return this[kTransform].readable;
80+
}
81+
82+
/**
83+
* @readonly
84+
* @type {WritableStream}
85+
*/
86+
get writable() {
87+
if (!isCompressionStream(this))
88+
throw new ERR_INVALID_THIS('CompressionStream');
89+
return this[kTransform].writable;
90+
}
91+
92+
[kInspect](depth, options) {
93+
if (!isCompressionStream(this))
94+
throw new ERR_INVALID_THIS('CompressionStream');
95+
customInspect(depth, options, 'CompressionStream', {
96+
readable: this[kTransform].readable,
97+
writable: this[kTransform].writable,
98+
});
99+
}
100+
}
101+
102+
class DecompressionStream {
103+
/**
104+
* @param {'deflate'|'gzip'} format
105+
*/
106+
constructor(format) {
107+
this[kType] = 'DecompressionStream';
108+
switch (format) {
109+
case 'deflate':
110+
this[kHandle] = createInflate();
111+
break;
112+
case 'gzip':
113+
this[kHandle] = createGunzip();
114+
break;
115+
default:
116+
throw new ERR_INVALID_ARG_VALUE('format', format);
117+
}
118+
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
119+
}
120+
121+
/**
122+
* @readonly
123+
* @type {ReadableStream}
124+
*/
125+
get readable() {
126+
if (!isDecompressionStream(this))
127+
throw new ERR_INVALID_THIS('DecompressionStream');
128+
return this[kTransform].readable;
129+
}
130+
131+
/**
132+
* @readonly
133+
* @type {WritableStream}
134+
*/
135+
get writable() {
136+
if (!isDecompressionStream(this))
137+
throw new ERR_INVALID_THIS('DecompressionStream');
138+
return this[kTransform].writable;
139+
}
140+
141+
[kInspect](depth, options) {
142+
if (!isDecompressionStream(this))
143+
throw new ERR_INVALID_THIS('DecompressionStream');
144+
customInspect(depth, options, 'DecompressionStream', {
145+
readable: this[kTransform].readable,
146+
writable: this[kTransform].writable,
147+
});
148+
}
149+
}
150+
151+
ObjectDefineProperties(CompressionStream.prototype, {
152+
readable: { enumerable: true },
153+
writable: { enumerable: true },
154+
});
155+
156+
ObjectDefineProperties(DecompressionStream.prototype, {
157+
readable: { enumerable: true },
158+
writable: { enumerable: true },
159+
});
160+
161+
module.exports = {
162+
CompressionStream,
163+
DecompressionStream,
164+
};

lib/stream/web.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ const {
3131
CountQueuingStrategy,
3232
} = require('internal/webstreams/queuingstrategies');
3333

34+
const {
35+
CompressionStream,
36+
DecompressionStream,
37+
} = require('internal/webstreams/compression');
38+
3439
module.exports = {
3540
ReadableStream,
3641
ReadableStreamDefaultReader,
@@ -45,4 +50,6 @@ module.exports = {
4550
WritableStreamDefaultController,
4651
ByteLengthQueuingStrategy,
4752
CountQueuingStrategy,
53+
CompressionStream,
54+
DecompressionStream,
4855
};
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Flags: --expose-internals --no-warnings
2+
'use strict';
3+
4+
const common = require('../common');
5+
6+
const {
7+
CompressionStream,
8+
DecompressionStream,
9+
} = require('internal/webstreams/compression');
10+
11+
const assert = require('assert');
12+
const dec = new TextDecoder();
13+
14+
async function test(format) {
15+
const gzip = new CompressionStream(format);
16+
const gunzip = new DecompressionStream(format);
17+
18+
gzip.readable.pipeTo(gunzip.writable).then(common.mustCall());
19+
20+
const reader = gunzip.readable.getReader();
21+
const writer = gzip.writable.getWriter();
22+
23+
await Promise.all([
24+
reader.read().then(({ value, done }) => {
25+
assert.strictEqual(dec.decode(value), 'hello');
26+
}),
27+
reader.read().then(({ done }) => assert(done)),
28+
writer.write('hello'),
29+
writer.close(),
30+
]);
31+
}
32+
33+
Promise.all(['gzip', 'deflate'].map((i) => test(i))).then(common.mustCall());
34+
35+
[1, 'hello', false, {}].forEach((i) => {
36+
assert.throws(() => new CompressionStream(i), {
37+
code: 'ERR_INVALID_ARG_VALUE',
38+
});
39+
assert.throws(() => new DecompressionStream(i), {
40+
code: 'ERR_INVALID_ARG_VALUE',
41+
});
42+
});
43+
44+
assert.throws(
45+
() => Reflect.get(CompressionStream.prototype, 'readable', {}), {
46+
code: 'ERR_INVALID_THIS',
47+
});
48+
assert.throws(
49+
() => Reflect.get(CompressionStream.prototype, 'writable', {}), {
50+
code: 'ERR_INVALID_THIS',
51+
});
52+
assert.throws(
53+
() => Reflect.get(DecompressionStream.prototype, 'readable', {}), {
54+
code: 'ERR_INVALID_THIS',
55+
});
56+
assert.throws(
57+
() => Reflect.get(DecompressionStream.prototype, 'writable', {}), {
58+
code: 'ERR_INVALID_THIS',
59+
});

0 commit comments

Comments
 (0)