@@ -13,8 +13,10 @@ const {
1313
1414const {
1515 ERR_INVALID_ARG_TYPE ,
16- ERR_OUT_OF_RANGE ,
1716 ERR_METHOD_NOT_IMPLEMENTED ,
17+ ERR_OUT_OF_RANGE ,
18+ ERR_STREAM_DESTROYED ,
19+ ERR_SYSTEM_ERROR ,
1820} = require ( 'internal/errors' ) . codes ;
1921const {
2022 deprecate,
@@ -392,22 +394,75 @@ WriteStream.prototype.open = openWriteFs;
392394
393395WriteStream . prototype . _construct = _construct ;
394396
397+ function writeAll ( data , size , pos , cb , retries = 0 ) {
398+ this [ kFs ] . write ( this . fd , data , 0 , size , pos , ( er , bytesWritten , buffer ) => {
399+ // No data currently available and operation should be retried later.
400+ if ( er ?. code === 'EAGAIN' ) {
401+ er = null ;
402+ bytesWritten = 0 ;
403+ }
404+
405+ if ( this . destroyed || er ) {
406+ return cb ( er || new ERR_STREAM_DESTROYED ( 'write' ) ) ;
407+ }
408+
409+ this . bytesWritten += bytesWritten ;
410+
411+ retries = bytesWritten ? 0 : retries + 1 ;
412+ size -= bytesWritten ;
413+ pos += bytesWritten ;
414+
415+ // Try writing non-zero number of bytes up to 5 times.
416+ if ( retries > 5 ) {
417+ cb ( new ERR_SYSTEM_ERROR ( 'write failed' ) ) ;
418+ } else if ( size ) {
419+ writeAll . call ( this , buffer . slice ( bytesWritten ) , size , pos , cb , retries ) ;
420+ } else {
421+ cb ( ) ;
422+ }
423+ } ) ;
424+ }
425+
426+ function writevAll ( chunks , size , pos , cb , retries = 0 ) {
427+ this [ kFs ] . writev ( this . fd , chunks , this . pos , ( er , bytesWritten , buffers ) => {
428+ // No data currently available and operation should be retried later.
429+ if ( er ?. code === 'EAGAIN' ) {
430+ er = null ;
431+ bytesWritten = 0 ;
432+ }
433+
434+ if ( this . destroyed || er ) {
435+ return cb ( er || new ERR_STREAM_DESTROYED ( 'writev' ) ) ;
436+ }
437+
438+ this . bytesWritten += bytesWritten ;
439+
440+ retries = bytesWritten ? 0 : retries + 1 ;
441+ size -= bytesWritten ;
442+ pos += bytesWritten ;
443+
444+ // Try writing non-zero number of bytes up to 5 times.
445+ if ( retries > 5 ) {
446+ cb ( new ERR_SYSTEM_ERROR ( 'writev failed' ) ) ;
447+ } else if ( size ) {
448+ writevAll . call ( this , [ Buffer . concat ( buffers ) . slice ( bytesWritten ) ] , size , pos , cb , retries ) ;
449+ } else {
450+ cb ( ) ;
451+ }
452+ } ) ;
453+ }
454+
395455WriteStream . prototype . _write = function ( data , encoding , cb ) {
396456 this [ kIsPerformingIO ] = true ;
397- this [ kFs ] . write ( this . fd , data , 0 , data . length , this . pos , ( er , bytes ) => {
457+ writeAll . call ( this , data , data . length , this . pos , ( er ) => {
398458 this [ kIsPerformingIO ] = false ;
399459 if ( this . destroyed ) {
400460 // Tell ._destroy() that it's safe to close the fd now.
401461 cb ( er ) ;
402462 return this . emit ( kIoDone , er ) ;
403463 }
404464
405- if ( er ) {
406- return cb ( er ) ;
407- }
408-
409- this . bytesWritten += bytes ;
410- cb ( ) ;
465+ cb ( er ) ;
411466 } ) ;
412467
413468 if ( this . pos !== undefined )
@@ -427,20 +482,15 @@ WriteStream.prototype._writev = function(data, cb) {
427482 }
428483
429484 this [ kIsPerformingIO ] = true ;
430- this [ kFs ] . writev ( this . fd , chunks , this . pos , ( er , bytes ) => {
485+ writevAll . call ( this , chunks , size , this . pos , ( er ) => {
431486 this [ kIsPerformingIO ] = false ;
432487 if ( this . destroyed ) {
433488 // Tell ._destroy() that it's safe to close the fd now.
434489 cb ( er ) ;
435490 return this . emit ( kIoDone , er ) ;
436491 }
437492
438- if ( er ) {
439- return cb ( er ) ;
440- }
441-
442- this . bytesWritten += bytes ;
443- cb ( ) ;
493+ cb ( er ) ;
444494 } ) ;
445495
446496 if ( this . pos !== undefined )
0 commit comments