diff --git a/tests/test_context.py b/tests/test_context.py index 03733756..2b2329f9 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -512,9 +512,10 @@ async def write_over(): proto.transport.write(b'q' * 16384) count += 1 else: - proto.transport.write(b'q' * 16384) proto.transport.set_write_buffer_limits(high=256, low=128) - count += 1 + while not proto.transport.get_write_buffer_size(): + proto.transport.write(b'q' * 16384) + count += 1 return count s = self.loop.run_in_executor(None, accept) diff --git a/tests/test_dns.py b/tests/test_dns.py index f42069b7..106ef580 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -217,6 +217,10 @@ def test_getaddrinfo_22(self): self._test_getaddrinfo(payload, 80) self._test_getaddrinfo(payload, 80, type=socket.SOCK_STREAM) + def test_getaddrinfo_broadcast(self): + self._test_getaddrinfo('', 80) + self._test_getaddrinfo('', 80, type=socket.SOCK_STREAM) + ###### def test_getnameinfo_1(self): diff --git a/tests/test_fs_event.py b/tests/test_fs_event.py index 743589b5..90369d1a 100644 --- a/tests/test_fs_event.py +++ b/tests/test_fs_event.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import os.path import tempfile @@ -6,95 +7,81 @@ from uvloop.loop import FileSystemEvent -class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase): - async def _file_writer(self): - f = await self.q.get() - while True: - f.write('hello uvloop\n') - f.flush() - x = await self.q.get() - if x is None: - return - - def fs_event_setup(self): - self.change_event_count = 0 - self.fname = '' - self.q = asyncio.Queue() - - def event_cb(self, ev_fname: bytes, evt: FileSystemEvent): - _d, fn = os.path.split(self.fname) - self.assertEqual(ev_fname, fn) - self.assertEqual(evt, FileSystemEvent.CHANGE) - self.change_event_count += 1 - if self.change_event_count < 4: - self.q.put_nowait(0) - else: - self.q.put_nowait(None) +class Test_UV_FS_Event(tb.UVTestCase): + def setUp(self): + super().setUp() + self.exit_stack = contextlib.ExitStack() + self.tmp_dir = self.exit_stack.enter_context( + tempfile.TemporaryDirectory() + ) + + def tearDown(self): + self.exit_stack.close() + super().tearDown() def test_fs_event_change(self): - self.fs_event_setup() - - async def run(write_task): - self.q.put_nowait(tf) - try: - await asyncio.wait_for(write_task, 4) - except asyncio.TimeoutError: - write_task.cancel() - - with tempfile.NamedTemporaryFile('wt') as tf: - self.fname = tf.name.encode() - h = self.loop._monitor_fs(tf.name, self.event_cb) + change_event_count = 0 + filename = "fs_event_change.txt" + path = os.path.join(self.tmp_dir, filename) + q = asyncio.Queue() + + with open(path, 'wt') as f: + async def file_writer(): + while True: + f.write('hello uvloop\n') + f.flush() + x = await q.get() + if x is None: + return + + def event_cb(ev_fname: bytes, evt: FileSystemEvent): + nonlocal change_event_count + self.assertEqual(ev_fname, filename.encode()) + self.assertEqual(evt, FileSystemEvent.CHANGE) + change_event_count += 1 + if change_event_count < 4: + q.put_nowait(0) + else: + q.put_nowait(None) + + h = self.loop._monitor_fs(path, event_cb) + self.loop.run_until_complete( + asyncio.sleep(0.1) # let monitor start + ) self.assertFalse(h.cancelled()) - self.loop.run_until_complete(run( - self.loop.create_task(self._file_writer()))) + self.loop.run_until_complete(asyncio.wait_for(file_writer(), 4)) h.cancel() self.assertTrue(h.cancelled()) - self.assertEqual(self.change_event_count, 4) - - -class Test_UV_FS_EVENT_RENAME(tb.UVTestCase): - async def _file_renamer(self): - await self.q.get() - os.rename(os.path.join(self.dname, self.changed_name), - os.path.join(self.dname, self.changed_name + "-new")) - await self.q.get() - - def fs_event_setup(self): - self.dname = '' - self.changed_name = "hello_fs_event.txt" - self.changed_set = {self.changed_name, self.changed_name + '-new'} - self.q = asyncio.Queue() - - def event_cb(self, ev_fname: bytes, evt: FileSystemEvent): - ev_fname = ev_fname.decode() - self.assertEqual(evt, FileSystemEvent.RENAME) - self.changed_set.remove(ev_fname) - if len(self.changed_set) == 0: - self.q.put_nowait(None) + self.assertEqual(change_event_count, 4) def test_fs_event_rename(self): - self.fs_event_setup() - - async def run(write_task): - self.q.put_nowait(0) - try: - await asyncio.wait_for(write_task, 4) - except asyncio.TimeoutError: - write_task.cancel() - - with tempfile.TemporaryDirectory() as td_name: - self.dname = td_name - f = open(os.path.join(td_name, self.changed_name), 'wt') + orig_name = "hello_fs_event.txt" + new_name = "hello_fs_event_rename.txt" + changed_set = {orig_name, new_name} + event = asyncio.Event() + + async def file_renamer(): + os.rename(os.path.join(self.tmp_dir, orig_name), + os.path.join(self.tmp_dir, new_name)) + await event.wait() + + def event_cb(ev_fname: bytes, evt: FileSystemEvent): + ev_fname = ev_fname.decode() + self.assertEqual(evt, FileSystemEvent.RENAME) + changed_set.discard(ev_fname) + if len(changed_set) == 0: + event.set() + + with open(os.path.join(self.tmp_dir, orig_name), 'wt') as f: f.write('hello!') - f.close() - h = self.loop._monitor_fs(td_name, self.event_cb) - self.assertFalse(h.cancelled()) + h = self.loop._monitor_fs(self.tmp_dir, event_cb) + self.loop.run_until_complete(asyncio.sleep(0.5)) # let monitor start + self.assertFalse(h.cancelled()) - self.loop.run_until_complete(run( - self.loop.create_task(self._file_renamer()))) - h.cancel() - self.assertTrue(h.cancelled()) + self.loop.run_until_complete(asyncio.wait_for(file_renamer(), 4)) + h.cancel() + self.assertTrue(h.cancelled()) - self.assertEqual(len(self.changed_set), 0) + self.assertEqual(len(changed_set), 0) diff --git a/tests/test_process.py b/tests/test_process.py index bfcbba17..45036256 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -685,6 +685,12 @@ async def cancel_make_transport(): self.loop.run_until_complete(cancel_make_transport()) def test_cancel_post_init(self): + if sys.version_info >= (3, 13) and self.implementation == 'asyncio': + # https://github.com/python/cpython/issues/103847#issuecomment-3736561321 + # This test started to flake on CPython 3.13 and later, + # so we skip it for asyncio tests until the issue is resolved. + self.skipTest('flaky test on CPython 3.13+') + async def cancel_make_transport(): coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol, *self.PROGRAM_BLOCKED) diff --git a/tests/test_tcp.py b/tests/test_tcp.py index 80584a86..382b3814 100644 --- a/tests/test_tcp.py +++ b/tests/test_tcp.py @@ -405,7 +405,7 @@ async def client(addr): self.assertEqual(await reader.readexactly(2), b'OK') re = r'(a bytes-like object)|(must be byte-ish)' - if sys.version_info >= (3, 14): + if sys.version_info >= (3, 13, 9): re += r'|(must be a bytes, bytearray, or memoryview object)' with self.assertRaisesRegex(TypeError, re): writer.write('AAAA') @@ -1226,21 +1226,16 @@ def resume_writing(self): t, p = await self.loop.create_connection(Protocol, *addr) t.write(b'q' * 512) - self.assertEqual(t.get_write_buffer_size(), 512) - t.set_write_buffer_limits(low=16385) - self.assertFalse(paused) self.assertEqual(t.get_write_buffer_limits(), (16385, 65540)) with self.assertRaisesRegex(ValueError, 'high.*must be >= low'): t.set_write_buffer_limits(high=0, low=1) t.set_write_buffer_limits(high=1024, low=128) - self.assertFalse(paused) self.assertEqual(t.get_write_buffer_limits(), (128, 1024)) t.set_write_buffer_limits(high=256, low=128) - self.assertTrue(paused) self.assertEqual(t.get_write_buffer_limits(), (128, 256)) t.close() diff --git a/tests/test_udp.py b/tests/test_udp.py index dd739650..1b7953f2 100644 --- a/tests/test_udp.py +++ b/tests/test_udp.py @@ -378,6 +378,22 @@ def test_udp_sendto_dns(self): s_transport.close() self.loop.run_until_complete(asyncio.sleep(0.01)) + def test_udp_sendto_broadcast(self): + coro = self.loop.create_datagram_endpoint( + asyncio.DatagramProtocol, + local_addr=('127.0.0.1', 0), + family=socket.AF_INET) + + s_transport, server = self.loop.run_until_complete(coro) + + try: + s_transport.sendto(b'aaaa', ('', 80)) + except ValueError as exc: + raise AssertionError('sendto raises {}.'.format(exc)) + + s_transport.close() + self.loop.run_until_complete(asyncio.sleep(0.01)) + def test_send_after_close(self): coro = self.loop.create_datagram_endpoint( asyncio.DatagramProtocol, diff --git a/uvloop/cbhandles.pyx b/uvloop/cbhandles.pyx index 2914b42e..00070816 100644 --- a/uvloop/cbhandles.pyx +++ b/uvloop/cbhandles.pyx @@ -9,8 +9,10 @@ cdef class Handle: cdef inline _set_loop(self, Loop loop): self.loop = loop if UVLOOP_DEBUG: - loop._debug_cb_handles_total += 1 - loop._debug_cb_handles_count += 1 + system.__atomic_fetch_add( + &loop._debug_cb_handles_total, 1, system.__ATOMIC_RELAXED) + system.__atomic_fetch_add( + &loop._debug_cb_handles_count, 1, system.__ATOMIC_RELAXED) if loop._debug: self._source_traceback = extract_stack() @@ -21,7 +23,8 @@ cdef class Handle: def __dealloc__(self): if UVLOOP_DEBUG and self.loop is not None: - self.loop._debug_cb_handles_count -= 1 + system.__atomic_fetch_sub( + &self.loop._debug_cb_handles_count, 1, system.__ATOMIC_RELAXED) if self.loop is None: raise RuntimeError('Handle.loop is None in Handle.__dealloc__') @@ -174,8 +177,10 @@ cdef class TimerHandle: self._cancelled = 0 if UVLOOP_DEBUG: - self.loop._debug_cb_timer_handles_total += 1 - self.loop._debug_cb_timer_handles_count += 1 + system.__atomic_fetch_add( + &self.loop._debug_cb_timer_handles_total, 1, system.__ATOMIC_RELAXED) + system.__atomic_fetch_add( + &self.loop._debug_cb_timer_handles_count, 1, system.__ATOMIC_RELAXED) if context is None: context = Context_CopyCurrent() @@ -205,7 +210,8 @@ cdef class TimerHandle: def __dealloc__(self): if UVLOOP_DEBUG: - self.loop._debug_cb_timer_handles_count -= 1 + system.__atomic_fetch_sub( + &self.loop._debug_cb_timer_handles_count, 1, system.__ATOMIC_RELAXED) if self.timer is not None: raise RuntimeError('active TimerHandle is deallacating') diff --git a/uvloop/handles/stream.pxd b/uvloop/handles/stream.pxd index 8ca87437..71c8288e 100644 --- a/uvloop/handles/stream.pxd +++ b/uvloop/handles/stream.pxd @@ -1,3 +1,9 @@ +cdef enum ProtocolType: + SIMPLE = 0 # User Protocol doesn't support asyncio.BufferedProtocol + BUFFERED = 1 # User Protocol supports asyncio.BufferedProtocol + SSL_PROTOCOL = 2 # Our own SSLProtocol + + cdef class UVStream(UVBaseTransport): cdef: uv.uv_shutdown_t _shutdown_req @@ -5,7 +11,7 @@ cdef class UVStream(UVBaseTransport): bint __reading bint __read_error_close - bint __buffered + ProtocolType __protocol_type object _protocol_get_buffer object _protocol_buffer_updated @@ -16,6 +22,8 @@ cdef class UVStream(UVBaseTransport): Py_buffer _read_pybuf bint _read_pybuf_acquired + cpdef write(self, object buf) + # All "inline" methods are final cdef inline _init(self, Loop loop, object protocol, Server server, @@ -39,8 +47,8 @@ cdef class UVStream(UVBaseTransport): # _exec_write() is the method that does the actual send, and _try_write() # is a fast-path used in _exec_write() to send a single chunk. - cdef inline _exec_write(self) - cdef inline _try_write(self, object data) + cdef inline bint _exec_write(self) except -1 + cdef inline Py_ssize_t _try_write(self, object data) except -2 cdef _close(self) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index c585360f..f8c7f694 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -1,4 +1,4 @@ -cdef enum: +cdef enum: __PREALLOCED_BUFS = 4 @@ -213,14 +213,15 @@ cdef class UVStream(UVBaseTransport): self.__shutting_down = 0 self.__reading = 0 self.__read_error_close = 0 - self.__buffered = 0 - self._eof = 0 - self._buffer = [] - self._buffer_size = 0 + self.__protocol_type = ProtocolType.SIMPLE self._protocol_get_buffer = None self._protocol_buffer_updated = None + self._eof = 0 + self._buffer = [] + self._buffer_size = 0 + self._read_pybuf_acquired = False cdef _set_protocol(self, object protocol): @@ -229,22 +230,24 @@ cdef class UVStream(UVBaseTransport): UVBaseTransport._set_protocol(self, protocol) - if (hasattr(protocol, 'get_buffer') and + if isinstance(protocol, SSLProtocol): + self.__protocol_type = ProtocolType.SSL_PROTOCOL + elif (hasattr(protocol, 'get_buffer') and not isinstance(protocol, aio_Protocol)): try: self._protocol_get_buffer = protocol.get_buffer self._protocol_buffer_updated = protocol.buffer_updated - self.__buffered = 1 + self.__protocol_type = ProtocolType.BUFFERED except AttributeError: pass else: - self.__buffered = 0 + self.__protocol_type = ProtocolType.SIMPLE cdef _clear_protocol(self): UVBaseTransport._clear_protocol(self) self._protocol_get_buffer = None self._protocol_buffer_updated = None - self.__buffered = 0 + self.__protocol_type = ProtocolType.SIMPLE cdef inline _shutdown(self): cdef int err @@ -294,14 +297,14 @@ cdef class UVStream(UVBaseTransport): if self.__reading: return - if self.__buffered: - err = uv.uv_read_start(self._handle, - __uv_stream_buffered_alloc, - __uv_stream_buffered_on_read) - else: + if self.__protocol_type == ProtocolType.SIMPLE: err = uv.uv_read_start(self._handle, __loop_alloc_buffer, __uv_stream_on_read) + else: + err = uv.uv_read_start( self._handle, + __uv_stream_buffered_alloc, + __uv_stream_buffered_on_read) if err < 0: exc = convert_error(err) self._fatal_error(exc, True) @@ -341,13 +344,15 @@ cdef class UVStream(UVBaseTransport): else: self.__reading_stopped() - cdef inline _try_write(self, object data): + cdef inline Py_ssize_t _try_write(self, object data) except -2: + # Returns number of bytes written. + # -1 - in case of fatal errors cdef: - ssize_t written + Py_ssize_t written bint used_buf = 0 Py_buffer py_buf void* buf - size_t blen + Py_ssize_t blen int saved_errno int fd @@ -368,6 +373,8 @@ cdef class UVStream(UVBaseTransport): blen = py_buf.len if blen == 0: + if used_buf: + PyBuffer_Release(&py_buf) # Empty data, do nothing. return 0 @@ -392,24 +399,20 @@ cdef class UVStream(UVBaseTransport): PyBuffer_Release(&py_buf) if written < 0: - if saved_errno == errno.EAGAIN or \ - saved_errno == system.EWOULDBLOCK: - return -1 + if saved_errno in (errno.EAGAIN, system.EWOULDBLOCK): + return 0 else: exc = convert_error(-saved_errno) self._fatal_error(exc, True) - return + return -1 if UVLOOP_DEBUG: self._loop._debug_stream_write_tries += 1 - if written == blen: - return 0 - return written cdef inline _buffer_write(self, object data): - cdef int dlen + cdef Py_ssize_t dlen if not PyBytes_CheckExact(data): data = memoryview(data).cast('b') @@ -422,19 +425,19 @@ cdef class UVStream(UVBaseTransport): self._buffer.append(data) cdef inline _initiate_write(self): + cdef bint all_sent + if (not self._protocol_paused and - (self._handle).write_queue_size == 0 and - self._buffer_size > self._high_water): + (self._handle).write_queue_size == 0): # Fast-path. If: # - the protocol isn't yet paused, # - there is no data in libuv buffers for this stream, - # - the protocol will be paused if we continue to buffer data # # Then: # - Try to write all buffered data right now. all_sent = self._exec_write() if UVLOOP_DEBUG: - if self._buffer_size != 0 or self._buffer != []: + if self._buffer_size != 0 or self._buffer: raise RuntimeError( '_buffer_size is not 0 after a successful _exec_write') @@ -450,20 +453,23 @@ cdef class UVStream(UVBaseTransport): self._maybe_pause_protocol() self._loop._queue_write(self) - cdef inline _exec_write(self): + cdef inline bint _exec_write(self) except -1: + # Returns True if all data from self._buffers has been sent, + # False - otherwise cdef: int err - int buf_len + Py_ssize_t buf_len + Py_ssize_t sent _StreamWriteContext ctx = None if self._closed: # If the handle is closed, just return, it's too # late to do anything. - return + return False buf_len = len(self._buffer) if not buf_len: - return + return True if (self._handle).write_queue_size == 0: # libuv internal write buffers for this stream are empty. @@ -473,34 +479,16 @@ cdef class UVStream(UVBaseTransport): data = self._buffer[0] sent = self._try_write(data) - if sent is None: - # A `self._fatal_error` was called. - # It might not raise an exception under some - # conditions. - self._buffer_size = 0 - self._buffer.clear() - if not self._closing: - # This should never happen. - raise RuntimeError( - 'stream is open after UVStream._try_write ' - 'returned None') - return - - if sent == 0: - # All data was successfully written. + if sent == len(data): + # The most likely and latency sensitive outcome goes first, + # all data was successfully written. self._buffer_size = 0 self._buffer.clear() # on_write will call "maybe_resume_protocol". self._on_write() return True - if sent > 0: - if UVLOOP_DEBUG: - if sent == len(data): - raise RuntimeError( - '_try_write sent all data and returned ' - 'non-zero') - + elif sent > 0: if PyBytes_CheckExact(data): # Cast bytes to memoryview to avoid copying # data that wasn't sent. @@ -510,6 +498,19 @@ cdef class UVStream(UVBaseTransport): self._buffer_size -= sent self._buffer[0] = data + elif sent == -1: + # A `self._fatal_error` was called. + # It might not raise an exception under some + # conditions. + self._buffer_size = 0 + self._buffer.clear() + if not self._closing: + # This should never happen. + raise RuntimeError( + 'stream is open after UVStream._try_write ' + 'returned None') + return False + # At this point it's either data was sent partially, # or an EAGAIN has happened. @@ -543,7 +544,7 @@ cdef class UVStream(UVBaseTransport): self._fatal_error(ex, True) self._buffer.clear() self._buffer_size = 0 - return + return False elif err != uv.UV_EAGAIN: ctx.close() @@ -551,7 +552,7 @@ cdef class UVStream(UVBaseTransport): self._fatal_error(exc, True) self._buffer.clear() self._buffer_size = 0 - return + return False # fall through @@ -575,9 +576,10 @@ cdef class UVStream(UVBaseTransport): exc = convert_error(err) self._fatal_error(exc, True) - return + return False self._maybe_resume_protocol() + return False cdef size_t _get_write_buffer_size(self): if self._handle is NULL: @@ -671,7 +673,7 @@ cdef class UVStream(UVBaseTransport): self.__reading, id(self)) - def write(self, object buf): + cpdef write(self, object buf): self._ensure_alive() if self._eof: @@ -921,9 +923,24 @@ cdef void __uv_stream_buffered_alloc( "UVStream alloc buffer callback") == 0: return + cdef UVStream sc = stream.data + + # Fast pass for our own SSLProtocol + # avoid python calls, memoryviews, context enter/exit, etc + if sc.__protocol_type == ProtocolType.SSL_PROTOCOL: + try: + (sc._protocol).get_buffer_impl( + suggested_size, &uvbuf.base, &uvbuf.len) + return + except BaseException as exc: + # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. + # We'll do it later in __uv_stream_buffered_on_read when we + # receive UV_ENOBUFS. + uvbuf.len = 0 + uvbuf.base = NULL + return + cdef: - UVStream sc = stream.data - Loop loop = sc._loop Py_buffer* pybuf = &sc._read_pybuf int got_buf = 0 @@ -984,7 +1001,12 @@ cdef void __uv_stream_buffered_on_read( return try: - if nread > 0 and not sc._read_pybuf_acquired: + # When our own SSLProtocol is used, we get buffer pointer directly, + # through SSLProtocol.get_buffer_impl, not through Py_Buffer interface. + # Therefore sc._read_pybuf_acquired is always False for SSLProtocol. + if (nread > 0 and + sc.__protocol_type == ProtocolType.BUFFERED and + not sc._read_pybuf_acquired): # From libuv docs: # nread is > 0 if there is data available or < 0 on error. When # we’ve reached EOF, nread will be set to UV_EOF. When @@ -1005,12 +1027,20 @@ cdef void __uv_stream_buffered_on_read( if UVLOOP_DEBUG: loop._debug_stream_read_cb_total += 1 - run_in_context1(sc.context, sc._protocol_buffer_updated, nread) + if sc.__protocol_type == ProtocolType.SSL_PROTOCOL: + Context_Enter(sc.context) + try: + (sc._protocol).buffer_updated_impl(nread) + finally: + Context_Exit(sc.context) + else: + run_in_context1(sc.context, sc._protocol_buffer_updated, nread) except BaseException as exc: if UVLOOP_DEBUG: loop._debug_stream_read_cb_errors_total += 1 sc._fatal_error(exc, False) finally: - sc._read_pybuf_acquired = 0 - PyBuffer_Release(pybuf) + if sc._read_pybuf_acquired: + sc._read_pybuf_acquired = 0 + PyBuffer_Release(pybuf) diff --git a/uvloop/handles/udp.pyx b/uvloop/handles/udp.pyx index ef20c3f7..eac1bca5 100644 --- a/uvloop/handles/udp.pyx +++ b/uvloop/handles/udp.pyx @@ -208,6 +208,10 @@ cdef class UDPTransport(UVBaseTransport): if addr is None: saddr = NULL else: + # resolve special hostname to the broadcast address before use + if self._family == uv.AF_INET and addr[0] == '': + addr = (b'255.255.255.255', addr[1]) + try: __convert_pyaddr_to_sockaddr(self._family, addr, &saddr_st) diff --git a/uvloop/includes/stdlib.pxi b/uvloop/includes/stdlib.pxi index 02605a08..5fff4ad8 100644 --- a/uvloop/includes/stdlib.pxi +++ b/uvloop/includes/stdlib.pxi @@ -37,7 +37,6 @@ cdef aio_wait = asyncio.wait cdef aio_wrap_future = asyncio.wrap_future cdef aio_logger = asyncio.log.logger cdef aio_iscoroutine = asyncio.iscoroutine -cdef aio_iscoroutinefunction = asyncio.iscoroutinefunction cdef aio_BaseProtocol = asyncio.BaseProtocol cdef aio_Protocol = asyncio.Protocol cdef aio_isfuture = getattr(asyncio, 'isfuture', None) @@ -65,6 +64,7 @@ cdef gc_disable = gc.disable cdef iter_chain = itertools.chain cdef inspect_isgenerator = inspect.isgenerator +cdef inspect_iscoroutinefunction = inspect.iscoroutinefunction cdef int has_IPV6_V6ONLY = hasattr(socket, 'IPV6_V6ONLY') cdef int IPV6_V6ONLY = getattr(socket, 'IPV6_V6ONLY', -1) diff --git a/uvloop/includes/system.pxd b/uvloop/includes/system.pxd index 367fedd1..89d0e327 100644 --- a/uvloop/includes/system.pxd +++ b/uvloop/includes/system.pxd @@ -94,3 +94,11 @@ cdef extern from "includes/fork_handler.h": void setForkHandler(OnForkHandler handler) void resetForkHandler() void setMainThreadID(uint64_t id) + + +cdef extern from * nogil: + uint64_t __atomic_fetch_add(uint64_t *ptr, uint64_t val, int memorder) + uint64_t __atomic_fetch_sub(uint64_t *ptr, uint64_t val, int memorder) + + cdef enum: + __ATOMIC_RELAXED diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index 01e39ae1..89566ea3 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -50,7 +50,6 @@ cdef class Loop: object _default_executor object _ready set _queued_streams, _executing_streams - Py_ssize_t _ready_len set _servers diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index 2ed1f272..577d45a4 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -92,31 +92,27 @@ cdef inline socket_dec_io_ref(sock): cdef inline run_in_context(context, method): - # This method is internally used to workaround a reference issue that in - # certain circumstances, inlined context.run() will not hold a reference to - # the given method instance, which - if deallocated - will cause segfault. - # See also: edgedb/edgedb#2222 - Py_INCREF(method) + Context_Enter(context) try: - return context.run(method) + return method() finally: - Py_DECREF(method) + Context_Exit(context) cdef inline run_in_context1(context, method, arg): - Py_INCREF(method) + Context_Enter(context) try: - return context.run(method, arg) + return method(arg) finally: - Py_DECREF(method) + Context_Exit(context) cdef inline run_in_context2(context, method, arg1, arg2): - Py_INCREF(method) + Context_Enter(context) try: - return context.run(method, arg1, arg2) + return method(arg1, arg2) finally: - Py_DECREF(method) + Context_Exit(context) # Used for deprecation and removal of `loop.create_datagram_endpoint()`'s @@ -181,7 +177,6 @@ cdef class Loop: self._queued_streams = set() self._executing_streams = set() self._ready = col_deque() - self._ready_len = 0 self.handler_async = UVAsync.new( self, self._on_wake, self) @@ -440,7 +435,7 @@ cdef class Loop: self.handler_async.send() cdef _on_wake(self): - if ((self._ready_len > 0 or self._stopping) and + if ((len(self._ready) > 0 or self._stopping) and not self.handler_idle.running): self.handler_idle.start() @@ -481,8 +476,7 @@ cdef class Loop: if len(self._queued_streams): self._exec_queued_writes() - self._ready_len = len(self._ready) - if self._ready_len == 0 and self.handler_idle.running: + if len(self._ready) == 0 and self.handler_idle.running: self.handler_idle.stop() if self._stopping: @@ -570,7 +564,6 @@ cdef class Loop: for cb_handle in self._ready: cb_handle.cancel() self._ready.clear() - self._ready_len = 0 if self._polls: for poll_handle in self._polls.values(): @@ -672,7 +665,6 @@ cdef class Loop: cdef inline _append_ready_handle(self, Handle handle): self._check_closed() self._ready.append(handle) - self._ready_len += 1 cdef inline _call_soon_handle(self, Handle handle): self._append_ready_handle(handle) @@ -2731,7 +2723,7 @@ cdef class Loop: return transport, protocol def run_in_executor(self, executor, func, *args): - if aio_iscoroutine(func) or aio_iscoroutinefunction(func): + if aio_iscoroutine(func) or inspect_iscoroutinefunction(func): raise TypeError("coroutines cannot be used with run_in_executor()") self._check_closed() @@ -2910,7 +2902,7 @@ cdef class Loop: 'the main thread') if (aio_iscoroutine(callback) - or aio_iscoroutinefunction(callback)): + or inspect_iscoroutinefunction(callback)): raise TypeError( "coroutines cannot be used with add_signal_handler()") diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index 3da10f00..edc0f502 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -53,13 +53,13 @@ cdef class SSLProtocol: object _sslobj object _sslobj_read object _sslobj_write + object _sslobj_pending object _incoming object _incoming_write object _outgoing object _outgoing_read char* _ssl_buffer size_t _ssl_buffer_len - object _ssl_buffer_view SSLProtocolState _state size_t _conn_lost AppProtocolState _app_state @@ -84,55 +84,61 @@ cdef class SSLProtocol: object _handshake_timeout_handle object _shutdown_timeout_handle - cdef _set_app_protocol(self, app_protocol) - cdef _wakeup_waiter(self, exc=*) - cdef _get_extra_info(self, name, default=*) - cdef _set_state(self, SSLProtocolState new_state) + # Instead of doing python calls, c methods *_impl are called directly + # from stream.pyx + + cdef inline get_buffer_impl(self, size_t n, char** buf, size_t* buf_size) + cdef inline buffer_updated_impl(self, size_t nbytes) + + cdef inline _set_app_protocol(self, app_protocol) + cdef inline _wakeup_waiter(self, exc=*) + cdef inline _get_extra_info(self, name, default=*) + cdef inline _set_state(self, SSLProtocolState new_state) # Handshake flow - cdef _start_handshake(self) - cdef _check_handshake_timeout(self) - cdef _do_handshake(self) - cdef _on_handshake_complete(self, handshake_exc) + cdef inline _start_handshake(self) + cdef inline _check_handshake_timeout(self) + cdef inline _do_handshake(self) + cdef inline _on_handshake_complete(self, handshake_exc) # Shutdown flow - cdef _start_shutdown(self, object context=*) - cdef _check_shutdown_timeout(self) - cdef _do_read_into_void(self, object context) - cdef _do_flush(self, object context=*) - cdef _do_shutdown(self, object context=*) - cdef _on_shutdown_complete(self, shutdown_exc) - cdef _abort(self, exc) + cdef inline _start_shutdown(self, object context=*) + cdef inline _check_shutdown_timeout(self) + cdef inline _do_read_into_void(self, object context) + cdef inline _do_flush(self, object context=*) + cdef inline _do_shutdown(self, object context=*) + cdef inline _on_shutdown_complete(self, shutdown_exc) + cdef inline _abort(self, exc) # Outgoing flow - cdef _write_appdata(self, list_of_data, object context) - cdef _do_write(self) - cdef _process_outgoing(self) + cdef inline _write_appdata(self, list_of_data, object context) + cdef inline _do_write(self) + cdef inline _process_outgoing(self) # Incoming flow - cdef _do_read(self) - cdef _do_read__buffered(self) - cdef _do_read__copied(self) - cdef _call_eof_received(self, object context=*) + cdef inline _do_read(self) + cdef inline _do_read__buffered(self) + cdef inline _do_read__copied(self) + cdef inline _call_eof_received(self, object context=*) # Flow control for writes from APP socket - cdef _control_app_writing(self, object context=*) - cdef size_t _get_write_buffer_size(self) - cdef _set_write_buffer_limits(self, high=*, low=*) + cdef inline _control_app_writing(self, object context=*) + cdef inline size_t _get_write_buffer_size(self) + cdef inline _set_write_buffer_limits(self, high=*, low=*) # Flow control for reads to APP socket - cdef _pause_reading(self) - cdef _resume_reading(self, object context) + cdef inline _pause_reading(self) + cdef inline _resume_reading(self, object context) # Flow control for reads from SSL socket - cdef _control_ssl_reading(self) - cdef _set_read_buffer_limits(self, high=*, low=*) - cdef size_t _get_read_buffer_size(self) - cdef _fatal_error(self, exc, message=*) + cdef inline _control_ssl_reading(self) + cdef inline _set_read_buffer_limits(self, high=*, low=*) + cdef inline size_t _get_read_buffer_size(self) + cdef inline _fatal_error(self, exc, message=*) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 42bb7644..f76474e6 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -204,11 +204,8 @@ cdef class SSLProtocol: self._ssl_buffer = PyMem_RawMalloc(self._ssl_buffer_len) if not self._ssl_buffer: raise MemoryError() - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE) def __dealloc__(self): - self._ssl_buffer_view = None PyMem_RawFree(self._ssl_buffer) self._ssl_buffer = NULL self._ssl_buffer_len = 0 @@ -358,7 +355,7 @@ cdef class SSLProtocol: self._handshake_timeout_handle.cancel() self._handshake_timeout_handle = None - def get_buffer(self, n): + cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size): cdef size_t want = n if want > SSL_READ_MAX_SIZE: want = SSL_READ_MAX_SIZE @@ -367,11 +364,11 @@ cdef class SSLProtocol: if not self._ssl_buffer: raise MemoryError() self._ssl_buffer_len = want - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, want, PyBUF_WRITE) - return self._ssl_buffer_view - def buffer_updated(self, nbytes): + buf[0] = self._ssl_buffer + buf_size[0] = self._ssl_buffer_len + + cdef buffer_updated_impl(self, size_t nbytes): self._incoming_write(PyMemoryView_FromMemory( self._ssl_buffer, nbytes, PyBUF_WRITE)) @@ -387,6 +384,18 @@ cdef class SSLProtocol: elif self._state == SHUTDOWN: self._do_shutdown() + def get_buffer(self, size_t n): + # This pure python call is still used by some very peculiar test cases + cdef: + char* buf + size_t buf_size + + self.get_buffer_impl(n, &buf, &buf_size) + return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE) + + def buffer_updated(self, size_t nbytes): + self.buffer_updated_impl(nbytes) + def eof_received(self): """Called when the other end of the low-level stream is half-closed. @@ -480,6 +489,7 @@ cdef class SSLProtocol: server_hostname=self._server_hostname) self._sslobj_read = self._sslobj.read self._sslobj_write = self._sslobj.write + self._sslobj_pending = self._sslobj.pending except Exception as ex: self._on_handshake_complete(ex) else: @@ -696,7 +706,10 @@ cdef class SSLProtocol: if not self._ssl_writing_paused: data = self._outgoing_read() if len(data): - self._transport.write(data) + if isinstance(self._transport, UVStream): + (self._transport).write(data) + else: + self._transport.write(data) # Incoming flow @@ -719,48 +732,91 @@ cdef class SSLProtocol: cdef _do_read__buffered(self): cdef: - Py_buffer pybuf - bint pybuf_inited = False - size_t wants, offset = 0 - int count = 1 - object buf + Py_ssize_t total_pending = (self._incoming.pending + + self._sslobj_pending()) + # Ask for a little extra in case when decrypted data is bigger + # than original + object app_buffer = self._app_protocol_get_buffer( + total_pending + 256) + Py_ssize_t app_buffer_size = len(app_buffer) + + if app_buffer_size == 0: + return - buf = self._app_protocol_get_buffer(self._get_read_buffer_size()) - wants = len(buf) + cdef: + Py_ssize_t last_bytes_read = -1 + Py_ssize_t total_bytes_read = 0 + Py_buffer pybuf + bint pybuf_initialized = False try: - count = self._sslobj_read(wants, buf) - - if count > 0: - offset = count - if offset < wants: - PyObject_GetBuffer(buf, &pybuf, PyBUF_WRITABLE) - pybuf_inited = True - while offset < wants: - buf = PyMemoryView_FromMemory( - (pybuf.buf) + offset, - wants - offset, + # SSLObject.read may not return all available data in one go. + # We have to keep calling read until it throw SSLWantReadError. + # However, throwing SSLWantReadError is very expensive even in + # the master trunk of cpython. + # See https://github.com/python/cpython/issues/123954 + + # One way to reduce reliance on SSLWantReadError is to check + # self._incoming.pending > 0 and SSLObject.pending() > 0. + # SSLObject.read may still throw SSLWantReadError even when + # self._incoming.pending > 0 and SSLObject.pending() == 0, + # but this should happen relatively rarely, only when ssl frame + # is partially received. + + # This optimization works really well especially for peers + # exchanging small messages and wanting to have minimal latency. + + # self._incoming.pending means how much data hasn't + # been processed by ssl yet (read: "still encrypted"). The final + # unencrypted data size maybe different. + + # self._sslobj.pending() means how much data has been already + # decrypted and can be directly read with SSLObject.read. + + # Run test_create_server_ssl_over_ssl to reproduce different cases + # for this method. + while total_pending > 0: + if total_bytes_read > 0: + if not pybuf_initialized: + PyObject_GetBuffer(app_buffer, &pybuf, PyBUF_WRITABLE) + pybuf_initialized = True + + app_buffer = PyMemoryView_FromMemory( + (pybuf.buf) + total_bytes_read, + app_buffer_size - total_bytes_read, PyBUF_WRITE) - count = self._sslobj_read(wants - offset, buf) - if count > 0: - offset += count - else: - break - else: + + last_bytes_read = self._sslobj_read( + app_buffer_size - total_bytes_read, app_buffer) + total_bytes_read += last_bytes_read + + if last_bytes_read == 0: + break + + # User buffer may not fit all available data. + if total_bytes_read == app_buffer_size: self._loop._call_soon_handle( new_MethodHandle(self._loop, "SSLProtocol._do_read", - self._do_read, + self._do_read, None, # current context is good self)) + break + + total_pending = (self._incoming.pending + + self._sslobj_pending()) except ssl_SSLAgainErrors as exc: pass finally: - if pybuf_inited: + if pybuf_initialized: PyBuffer_Release(&pybuf) - if offset > 0: - self._app_protocol_buffer_updated(offset) - if not count: + + if total_bytes_read > 0: + self._app_protocol_buffer_updated(total_bytes_read) + + # SSLObject.read() may return 0 instead of throwing SSLWantReadError + # This indicates that we reached EOF + if last_bytes_read == 0: # close_notify self._call_eof_received() self._start_shutdown() @@ -772,7 +828,8 @@ cdef class SSLProtocol: bint zero = True, one = False try: - while True: + while (self._incoming.pending > 0 or + self._sslobj_pending() > 0): chunk = self._sslobj_read(SSL_READ_MAX_SIZE) if not chunk: break