@@ -51,6 +51,8 @@ const {
5151} = require ( 'internal/streams/iter/utils' ) ;
5252
5353const {
54+ drainableProtocol,
55+ kSyncWriteAcceptedOnFalse,
5456 kValidatedTransform,
5557} = require ( 'internal/streams/iter/types' ) ;
5658
@@ -828,13 +830,33 @@ async function pipeTo(source, ...args) {
828830 const hasWriteSync = typeof writer . writeSync === 'function' ;
829831 const hasWritevSync = typeof writer . writevSync === 'function' ;
830832 const hasEndSync = typeof writer . endSync === 'function' ;
833+ const syncFalseCanBeAccepted = writer [ kSyncWriteAcceptedOnFalse ] === true ;
834+
835+ function syncFalseWasAccepted ( ) {
836+ return syncFalseCanBeAccepted && writer . desiredSize === 0 ;
837+ }
838+
839+ function waitForSyncBackpressure ( ) {
840+ const ondrain = writer [ drainableProtocol ] ;
841+ return ondrain ?. call ( writer ) ;
842+ }
843+
844+ async function writeBatchAfterAcceptedBackpressure ( batch , startIndex ) {
845+ await waitForSyncBackpressure ( ) ;
846+ await writeBatchAsyncFallback ( batch , startIndex ) ;
847+ }
848+
831849 // Async fallback for writeBatch when sync write fails partway through.
832850 // Continues writing from batch[startIndex] using async write().
833851 async function writeBatchAsyncFallback ( batch , startIndex ) {
834852 for ( let i = startIndex ; i < batch . length ; i ++ ) {
835853 const chunk = batch [ i ] ;
836854 if ( hasWriteSync && writer . writeSync ( chunk ) ) {
837855 // Sync retry succeeded
856+ } else if ( syncFalseWasAccepted ( ) ) {
857+ totalBytes += TypedArrayPrototypeGetByteLength ( chunk ) ;
858+ await waitForSyncBackpressure ( ) ;
859+ continue ;
838860 } else {
839861 const result = writer . write (
840862 chunk , signal ? { __proto__ : null , signal } : undefined ) ;
@@ -852,6 +874,12 @@ async function pipeTo(source, ...args) {
852874 function writeBatch ( batch ) {
853875 if ( hasWritev && batch . length > 1 ) {
854876 if ( ! hasWritevSync || ! writer . writevSync ( batch ) ) {
877+ if ( hasWritevSync && syncFalseWasAccepted ( ) ) {
878+ for ( let i = 0 ; i < batch . length ; i ++ ) {
879+ totalBytes += TypedArrayPrototypeGetByteLength ( batch [ i ] ) ;
880+ }
881+ return waitForSyncBackpressure ( ) ;
882+ }
855883 const opts = signal ? { __proto__ : null , signal } : undefined ;
856884 return PromisePrototypeThen ( writer . writev ( batch , opts ) , ( ) => {
857885 for ( let i = 0 ; i < batch . length ; i ++ ) {
@@ -867,6 +895,10 @@ async function pipeTo(source, ...args) {
867895 for ( let i = 0 ; i < batch . length ; i ++ ) {
868896 const chunk = batch [ i ] ;
869897 if ( ! hasWriteSync || ! writer . writeSync ( chunk ) ) {
898+ if ( hasWriteSync && syncFalseWasAccepted ( ) ) {
899+ totalBytes += TypedArrayPrototypeGetByteLength ( chunk ) ;
900+ return writeBatchAfterAcceptedBackpressure ( batch , i + 1 ) ;
901+ }
870902 // Sync path failed at index i - fall back to async for the rest.
871903 // Count bytes for chunks already written synchronously (0..i-1).
872904 return writeBatchAsyncFallback ( batch , i ) ;
0 commit comments