@@ -727,35 +727,44 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
727727 ondrain ( ) ;
728728 }
729729
730+ function pause ( ) {
731+ // If the user unpiped during `dest.write()`, it is possible
732+ // to get stuck in a permanently paused state if that write
733+ // also returned false.
734+ // => Check whether `dest` is still a piping destination.
735+ if ( ! cleanedUp ) {
736+ if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
737+ debug ( 'false write response, pause' , 0 ) ;
738+ state . awaitDrainWriters = dest ;
739+ state . multiAwaitDrain = false ;
740+ } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
741+ debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
742+ state . awaitDrainWriters . add ( dest ) ;
743+ }
744+ src . pause ( ) ;
745+ }
746+ if ( ! ondrain ) {
747+ // When the dest drains, it reduces the awaitDrain counter
748+ // on the source. This would be more elegant with a .once()
749+ // handler in flow(), but adding and removing repeatedly is
750+ // too slow.
751+ ondrain = pipeOnDrain ( src , dest ) ;
752+ dest . on ( 'drain' , ondrain ) ;
753+ }
754+ }
755+
730756 src . on ( 'data' , ondata ) ;
757+
758+ if ( dest . writableNeedDrain === true ) {
759+ pause ( ) ;
760+ }
761+
731762 function ondata ( chunk ) {
732763 debug ( 'ondata' ) ;
733764 const ret = dest . write ( chunk ) ;
734765 debug ( 'dest.write' , ret ) ;
735766 if ( ret === false ) {
736- // If the user unpiped during `dest.write()`, it is possible
737- // to get stuck in a permanently paused state if that write
738- // also returned false.
739- // => Check whether `dest` is still a piping destination.
740- if ( ! cleanedUp ) {
741- if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
742- debug ( 'false write response, pause' , 0 ) ;
743- state . awaitDrainWriters = dest ;
744- state . multiAwaitDrain = false ;
745- } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
746- debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
747- state . awaitDrainWriters . add ( dest ) ;
748- }
749- src . pause ( ) ;
750- }
751- if ( ! ondrain ) {
752- // When the dest drains, it reduces the awaitDrain counter
753- // on the source. This would be more elegant with a .once()
754- // handler in flow(), but adding and removing repeatedly is
755- // too slow.
756- ondrain = pipeOnDrain ( src , dest ) ;
757- dest . on ( 'drain' , ondrain ) ;
758- }
767+ pause ( ) ;
759768 }
760769 }
761770
0 commit comments