@@ -30,7 +30,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
3030 super ().__init__ (extra , loop )
3131 self ._set_extra (sock )
3232 self ._sock = sock
33- self ._protocol = protocol
33+ self .set_protocol ( protocol )
3434 self ._server = server
3535 self ._buffer = None # None or bytearray.
3636 self ._read_fut = None
@@ -159,16 +159,26 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
159159
160160 def __init__ (self , loop , sock , protocol , waiter = None ,
161161 extra = None , server = None ):
162+ self ._loop_reading_cb = None
163+ self ._paused = True
162164 super ().__init__ (loop , sock , protocol , waiter , extra , server )
163- self . _paused = False
165+
164166 self ._reschedule_on_resume = False
167+ self ._loop .call_soon (self ._loop_reading )
168+ self ._paused = False
165169
166- if protocols ._is_buffered_protocol (protocol ):
167- self ._loop_reading = self ._loop_reading__get_buffer
170+ def set_protocol (self , protocol ):
171+ if isinstance (protocol , protocols .BufferedProtocol ):
172+ self ._loop_reading_cb = self ._loop_reading__get_buffer
168173 else :
169- self ._loop_reading = self ._loop_reading__data_received
174+ self ._loop_reading_cb = self ._loop_reading__data_received
170175
171- self ._loop .call_soon (self ._loop_reading )
176+ super ().set_protocol (protocol )
177+
178+ if self .is_reading ():
179+ # reset reading callback / buffers / self._read_fut
180+ self .pause_reading ()
181+ self .resume_reading ()
172182
173183 def is_reading (self ):
174184 return not self ._paused and not self ._closing
@@ -179,6 +189,13 @@ def pause_reading(self):
179189 self ._paused = True
180190
181191 if self ._read_fut is not None and not self ._read_fut .done ():
192+ # TODO: This is an ugly hack to cancel the current read future
193+ # *and* avoid potential race conditions, as read cancellation
194+ # goes through `future.cancel()` and `loop.call_soon()`.
195+ # We then use this special attribute in the reader callback to
196+ # exit *immediately* without doing any cleanup/rescheduling.
197+ self ._read_fut .__asyncio_cancelled_on_pause__ = True
198+
182199 self ._read_fut .cancel ()
183200 self ._read_fut = None
184201 self ._reschedule_on_resume = True
@@ -210,7 +227,14 @@ def _loop_reading__on_eof(self):
210227 if not keep_open :
211228 self .close ()
212229
213- def _loop_reading__data_received (self , fut = None ):
230+ def _loop_reading (self , fut = None ):
231+ self ._loop_reading_cb (fut )
232+
233+ def _loop_reading__data_received (self , fut ):
234+ if (fut is not None and
235+ getattr (fut , '__asyncio_cancelled_on_pause__' , False )):
236+ return
237+
214238 if self ._paused :
215239 self ._reschedule_on_resume = True
216240 return
@@ -253,14 +277,18 @@ def _loop_reading__data_received(self, fut=None):
253277 if not self ._closing :
254278 raise
255279 else :
256- self ._read_fut .add_done_callback (self ._loop_reading )
280+ self ._read_fut .add_done_callback (self ._loop_reading__data_received )
257281 finally :
258282 if data :
259283 self ._protocol .data_received (data )
260284 elif data == b'' :
261285 self ._loop_reading__on_eof ()
262286
263- def _loop_reading__get_buffer (self , fut = None ):
287+ def _loop_reading__get_buffer (self , fut ):
288+ if (fut is not None and
289+ getattr (fut , '__asyncio_cancelled_on_pause__' , False )):
290+ return
291+
264292 if self ._paused :
265293 self ._reschedule_on_resume = True
266294 return
@@ -310,7 +338,9 @@ def _loop_reading__get_buffer(self, fut=None):
310338 return
311339
312340 try :
313- buf = self ._protocol .get_buffer ()
341+ buf = self ._protocol .get_buffer (- 1 )
342+ if not len (buf ):
343+ raise RuntimeError ('get_buffer() returned an empty buffer' )
314344 except Exception as exc :
315345 self ._fatal_error (
316346 exc , 'Fatal error: protocol.get_buffer() call failed.' )
@@ -319,7 +349,7 @@ def _loop_reading__get_buffer(self, fut=None):
319349 try :
320350 # schedule a new read
321351 self ._read_fut = self ._loop ._proactor .recv_into (self ._sock , buf )
322- self ._read_fut .add_done_callback (self ._loop_reading )
352+ self ._read_fut .add_done_callback (self ._loop_reading__get_buffer )
323353 except ConnectionAbortedError as exc :
324354 if not self ._closing :
325355 self ._fatal_error (exc , 'Fatal read error on pipe transport' )
0 commit comments