diff --git a/Lib/http/__init__.py b/Lib/http/__init__.py index bf8d7d68868..f26a009086b 100644 --- a/Lib/http/__init__.py +++ b/Lib/http/__init__.py @@ -96,7 +96,7 @@ def __new__(cls, value, phrase, description=''): 'Precondition in headers is false') REQUEST_ENTITY_TOO_LARGE = (413, 'Request Entity Too Large', 'Entity is too large') - REQUEST_URI_TOO_LONG = (414, 'Request-URI Too Long', + REQUEST_URI_TOO_LONG = (414, 'URI Too Long', 'URI is too long') UNSUPPORTED_MEDIA_TYPE = (415, 'Unsupported Media Type', 'Entity body in unsupported format') diff --git a/Lib/pty.py b/Lib/pty.py index 8d8ce40df54..1d97994abef 100644 --- a/Lib/pty.py +++ b/Lib/pty.py @@ -40,6 +40,9 @@ def master_open(): Open a pty master and return the fd, and the filename of the slave end. Deprecated, use openpty() instead.""" + import warnings + warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 + try: master_fd, slave_fd = os.openpty() except (AttributeError, OSError): @@ -69,6 +72,9 @@ def slave_open(tty_name): opened filedescriptor. Deprecated, use openpty() instead.""" + import warnings + warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 + result = os.open(tty_name, os.O_RDWR) try: from fcntl import ioctl, I_PUSH @@ -101,32 +107,14 @@ def fork(): master_fd, slave_fd = openpty() pid = os.fork() if pid == CHILD: - # Establish a new session. - os.setsid() os.close(master_fd) - - # Slave becomes stdin/stdout/stderr of child. - os.dup2(slave_fd, STDIN_FILENO) - os.dup2(slave_fd, STDOUT_FILENO) - os.dup2(slave_fd, STDERR_FILENO) - if slave_fd > STDERR_FILENO: - os.close(slave_fd) - - # Explicitly open the tty to make it become a controlling tty. - tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR) - os.close(tmp_fd) + os.login_tty(slave_fd) else: os.close(slave_fd) # Parent and child process. return pid, master_fd -def _writen(fd, data): - """Write all the data to a descriptor.""" - while data: - n = os.write(fd, data) - data = data[n:] - def _read(fd): """Default read function.""" return os.read(fd, 1024) @@ -136,9 +124,42 @@ def _copy(master_fd, master_read=_read, stdin_read=_read): Copies pty master -> standard output (master_read) standard input -> pty master (stdin_read)""" - fds = [master_fd, STDIN_FILENO] - while fds: - rfds, _wfds, _xfds = select(fds, [], []) + if os.get_blocking(master_fd): + # If we write more than tty/ndisc is willing to buffer, we may block + # indefinitely. So we set master_fd to non-blocking temporarily during + # the copy operation. + os.set_blocking(master_fd, False) + try: + _copy(master_fd, master_read=master_read, stdin_read=stdin_read) + finally: + # restore blocking mode for backwards compatibility + os.set_blocking(master_fd, True) + return + high_waterlevel = 4096 + stdin_avail = master_fd != STDIN_FILENO + stdout_avail = master_fd != STDOUT_FILENO + i_buf = b'' + o_buf = b'' + while 1: + rfds = [] + wfds = [] + if stdin_avail and len(i_buf) < high_waterlevel: + rfds.append(STDIN_FILENO) + if stdout_avail and len(o_buf) < high_waterlevel: + rfds.append(master_fd) + if stdout_avail and len(o_buf) > 0: + wfds.append(STDOUT_FILENO) + if len(i_buf) > 0: + wfds.append(master_fd) + + rfds, wfds, _xfds = select(rfds, wfds, []) + + if STDOUT_FILENO in wfds: + try: + n = os.write(STDOUT_FILENO, o_buf) + o_buf = o_buf[n:] + except OSError: + stdout_avail = False if master_fd in rfds: # Some OSes signal EOF by returning an empty byte string, @@ -150,19 +171,22 @@ def _copy(master_fd, master_read=_read, stdin_read=_read): if not data: # Reached EOF. return # Assume the child process has exited and is # unreachable, so we clean up. - else: - os.write(STDOUT_FILENO, data) + o_buf += data + + if master_fd in wfds: + n = os.write(master_fd, i_buf) + i_buf = i_buf[n:] - if STDIN_FILENO in rfds: + if stdin_avail and STDIN_FILENO in rfds: data = stdin_read(STDIN_FILENO) if not data: - fds.remove(STDIN_FILENO) + stdin_avail = False else: - _writen(master_fd, data) + i_buf += data def spawn(argv, master_read=_read, stdin_read=_read): """Create a spawned process.""" - if type(argv) == type(''): + if isinstance(argv, str): argv = (argv,) sys.audit('pty.spawn', argv) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 2905e3eac36..35b2723de3b 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -127,10 +127,7 @@ class will essentially render the service "deaf" while one request is import selectors import os import sys -try: - import threading -except ImportError: - import dummy_threading as threading +import threading from io import BufferedIOBase from time import monotonic as time @@ -144,6 +141,8 @@ class will essentially render the service "deaf" while one request is __all__.extend(["UnixStreamServer","UnixDatagramServer", "ThreadingUnixStreamServer", "ThreadingUnixDatagramServer"]) + if hasattr(os, "fork"): + __all__.extend(["ForkingUnixStreamServer", "ForkingUnixDatagramServer"]) # poll/select have the advantage of not requiring any extra file descriptor, # contrarily to epoll/kqueue (also, they require a single syscall). @@ -190,6 +189,7 @@ class BaseServer: - address_family - socket_type - allow_reuse_address + - allow_reuse_port Instance variables: @@ -294,8 +294,7 @@ def handle_request(self): selector.register(self, selectors.EVENT_READ) while True: - ready = selector.select(timeout) - if ready: + if selector.select(timeout): return self._handle_request_noblock() else: if timeout is not None: @@ -428,6 +427,7 @@ class TCPServer(BaseServer): - socket_type - request_queue_size (only for stream sockets) - allow_reuse_address + - allow_reuse_port Instance variables: @@ -445,6 +445,8 @@ class TCPServer(BaseServer): allow_reuse_address = False + allow_reuse_port = False + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) @@ -464,8 +466,15 @@ def server_bind(self): May be overridden. """ - if self.allow_reuse_address: + if self.allow_reuse_address and hasattr(socket, "SO_REUSEADDR"): self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # Since Linux 6.12.9, SO_REUSEPORT is not allowed + # on other address families than AF_INET/AF_INET6. + if ( + self.allow_reuse_port and hasattr(socket, "SO_REUSEPORT") + and self.address_family in (socket.AF_INET, socket.AF_INET6) + ): + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() @@ -522,6 +531,8 @@ class UDPServer(TCPServer): allow_reuse_address = False + allow_reuse_port = False + socket_type = socket.SOCK_DGRAM max_packet_size = 8192 @@ -723,6 +734,11 @@ class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass + if hasattr(os, "fork"): + class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass + + class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass + class BaseRequestHandler: """Base class for request handler classes. diff --git a/Lib/tempfile.py b/Lib/tempfile.py index 3aceb3f70fd..8036e93cd6d 100644 --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -46,7 +46,6 @@ import sys as _sys import types as _types import weakref as _weakref - import _thread _allocate_lock = _thread.allocate_lock @@ -181,7 +180,7 @@ def _candidate_tempdir_list(): return dirlist -def _get_default_tempdir(): +def _get_default_tempdir(dirlist=None): """Calculate the default directory to use for temporary files. This routine should be called exactly once. @@ -191,7 +190,8 @@ def _get_default_tempdir(): service, the name of the test file must be randomized.""" namer = _RandomNameSequence() - dirlist = _candidate_tempdir_list() + if dirlist is None: + dirlist = _candidate_tempdir_list() for dir in dirlist: if dir != _os.curdir: @@ -204,8 +204,7 @@ def _get_default_tempdir(): fd = _os.open(filename, _bin_openflags, 0o600) try: try: - with _io.open(fd, 'wb', closefd=False) as fp: - fp.write(b'blat') + _os.write(fd, b'blat') finally: _os.close(fd) finally: @@ -245,6 +244,7 @@ def _get_candidate_names(): def _mkstemp_inner(dir, pre, suf, flags, output_type): """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" + dir = _os.path.abspath(dir) names = _get_candidate_names() if output_type is bytes: names = map(_os.fsencode, names) @@ -265,11 +265,27 @@ def _mkstemp_inner(dir, pre, suf, flags, output_type): continue else: raise - return (fd, _os.path.abspath(file)) + return fd, file raise FileExistsError(_errno.EEXIST, "No usable temporary file name found") +def _dont_follow_symlinks(func, path, *args): + # Pass follow_symlinks=False, unless not supported on this platform. + if func in _os.supports_follow_symlinks: + func(path, *args, follow_symlinks=False) + elif not _os.path.islink(path): + func(path, *args) + +def _resetperms(path): + try: + chflags = _os.chflags + except AttributeError: + pass + else: + _dont_follow_symlinks(chflags, path, 0) + _dont_follow_symlinks(_os.chmod, path, 0o700) + # User visible interfaces. @@ -377,7 +393,7 @@ def mkdtemp(suffix=None, prefix=None, dir=None): continue else: raise - return file + return _os.path.abspath(file) raise FileExistsError(_errno.EEXIST, "No usable temporary directory name found") @@ -419,42 +435,42 @@ class _TemporaryFileCloser: underlying file object, without adding a __del__ method to the temporary file.""" - file = None # Set here since __del__ checks it + cleanup_called = False close_called = False - def __init__(self, file, name, delete=True): + def __init__(self, file, name, delete=True, delete_on_close=True): self.file = file self.name = name self.delete = delete + self.delete_on_close = delete_on_close - # NT provides delete-on-close as a primitive, so we don't need - # the wrapper to do anything special. We still use it so that - # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile. - if _os.name != 'nt': - # Cache the unlinker so we don't get spurious errors at - # shutdown when the module-level "os" is None'd out. Note - # that this must be referenced as self.unlink, because the - # name TemporaryFileWrapper may also get None'd out before - # __del__ is called. - - def close(self, unlink=_os.unlink): - if not self.close_called and self.file is not None: - self.close_called = True - try: + def cleanup(self, windows=(_os.name == 'nt'), unlink=_os.unlink): + if not self.cleanup_called: + self.cleanup_called = True + try: + if not self.close_called: + self.close_called = True self.file.close() - finally: - if self.delete: + finally: + # Windows provides delete-on-close as a primitive, in which + # case the file was deleted by self.file.close(). + if self.delete and not (windows and self.delete_on_close): + try: unlink(self.name) + except FileNotFoundError: + pass - # Need to ensure the file is deleted on __del__ - def __del__(self): - self.close() - - else: - def close(self): - if not self.close_called: - self.close_called = True + def close(self): + if not self.close_called: + self.close_called = True + try: self.file.close() + finally: + if self.delete and self.delete_on_close: + self.cleanup() + + def __del__(self): + self.cleanup() class _TemporaryFileWrapper: @@ -465,11 +481,11 @@ class _TemporaryFileWrapper: remove the file when it is no longer needed. """ - def __init__(self, file, name, delete=True): + def __init__(self, file, name, delete=True, delete_on_close=True): self.file = file self.name = name - self.delete = delete - self._closer = _TemporaryFileCloser(file, name, delete) + self._closer = _TemporaryFileCloser(file, name, delete, + delete_on_close) def __getattr__(self, name): # Attribute lookups are delegated to the underlying file @@ -500,7 +516,7 @@ def __enter__(self): # deleted when used in a with statement def __exit__(self, exc, value, tb): result = self.file.__exit__(exc, value, tb) - self.close() + self._closer.cleanup() return result def close(self): @@ -519,10 +535,10 @@ def __iter__(self): for line in self.file: yield line - def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, newline=None, suffix=None, prefix=None, - dir=None, delete=True, *, errors=None): + dir=None, delete=True, *, errors=None, + delete_on_close=True): """Create and return a temporary file. Arguments: 'prefix', 'suffix', 'dir' -- as for mkstemp. @@ -530,13 +546,20 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, 'buffering' -- the buffer size argument to io.open (default -1). 'encoding' -- the encoding argument to io.open (default None) 'newline' -- the newline argument to io.open (default None) - 'delete' -- whether the file is deleted on close (default True). + 'delete' -- whether the file is automatically deleted (default True). + 'delete_on_close' -- if 'delete', whether the file is deleted on close + (default True) or otherwise either on context manager exit + (if context manager was used) or on object finalization. . 'errors' -- the errors argument to io.open (default None) The file is created as mkstemp() would do it. Returns an object with a file-like interface; the name of the file is accessible as its 'name' attribute. The file will be automatically deleted when it is closed unless the 'delete' argument is set to False. + + On POSIX, NamedTemporaryFiles cannot be automatically deleted if + the creating process is terminated abruptly with a SIGKILL signal. + Windows can delete the file even in this case. """ prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) @@ -545,21 +568,33 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, # Setting O_TEMPORARY in the flags causes the OS to delete # the file when it is closed. This is only supported by Windows. - if _os.name == 'nt' and delete: + if _os.name == 'nt' and delete and delete_on_close: flags |= _os.O_TEMPORARY if "b" not in mode: encoding = _io.text_encoding(encoding) - (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type) + name = None + def opener(*args): + nonlocal name + fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type) + return fd try: - file = _io.open(fd, mode, buffering=buffering, - newline=newline, encoding=encoding, errors=errors) - - return _TemporaryFileWrapper(file, name, delete) - except BaseException: - _os.unlink(name) - _os.close(fd) + file = _io.open(dir, mode, buffering=buffering, + newline=newline, encoding=encoding, errors=errors, + opener=opener) + try: + raw = getattr(file, 'buffer', file) + raw = getattr(raw, 'raw', raw) + raw.name = name + return _TemporaryFileWrapper(file, name, delete, delete_on_close) + except: + file.close() + raise + except: + if name is not None and not ( + _os.name == 'nt' and delete and delete_on_close): + _os.unlink(name) raise if _os.name != 'posix' or _sys.platform == 'cygwin': @@ -598,9 +633,20 @@ def TemporaryFile(mode='w+b', buffering=-1, encoding=None, flags = _bin_openflags if _O_TMPFILE_WORKS: - try: + fd = None + def opener(*args): + nonlocal fd flags2 = (flags | _os.O_TMPFILE) & ~_os.O_CREAT fd = _os.open(dir, flags2, 0o600) + return fd + try: + file = _io.open(dir, mode, buffering=buffering, + newline=newline, encoding=encoding, + errors=errors, opener=opener) + raw = getattr(file, 'buffer', file) + raw = getattr(raw, 'raw', raw) + raw.name = fd + return file except IsADirectoryError: # Linux kernel older than 3.11 ignores the O_TMPFILE flag: # O_TMPFILE is read as O_DIRECTORY. Trying to open a directory @@ -617,26 +663,27 @@ def TemporaryFile(mode='w+b', buffering=-1, encoding=None, # fails with NotADirectoryError, because O_TMPFILE is read as # O_DIRECTORY. pass - else: - try: - return _io.open(fd, mode, buffering=buffering, - newline=newline, encoding=encoding, - errors=errors) - except: - _os.close(fd) - raise # Fallback to _mkstemp_inner(). - (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type) - try: - _os.unlink(name) - return _io.open(fd, mode, buffering=buffering, - newline=newline, encoding=encoding, errors=errors) - except: - _os.close(fd) - raise + fd = None + def opener(*args): + nonlocal fd + fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type) + try: + _os.unlink(name) + except BaseException as e: + _os.close(fd) + raise + return fd + file = _io.open(dir, mode, buffering=buffering, + newline=newline, encoding=encoding, errors=errors, + opener=opener) + raw = getattr(file, 'buffer', file) + raw = getattr(raw, 'raw', raw) + raw.name = fd + return file -class SpooledTemporaryFile: +class SpooledTemporaryFile(_io.IOBase): """Temporary file wrapper, specialized to switch from BytesIO or StringIO to a real file when it exceeds a certain size or when a fileno is needed. @@ -701,6 +748,16 @@ def __exit__(self, exc, value, tb): def __iter__(self): return self._file.__iter__() + def __del__(self): + if not self.closed: + _warnings.warn( + "Unclosed file {!r}".format(self), + ResourceWarning, + stacklevel=2, + source=self + ) + self.close() + def close(self): self._file.close() @@ -744,15 +801,30 @@ def name(self): def newlines(self): return self._file.newlines + def readable(self): + return self._file.readable() + def read(self, *args): return self._file.read(*args) + def read1(self, *args): + return self._file.read1(*args) + + def readinto(self, b): + return self._file.readinto(b) + + def readinto1(self, b): + return self._file.readinto1(b) + def readline(self, *args): return self._file.readline(*args) def readlines(self, *args): return self._file.readlines(*args) + def seekable(self): + return self._file.seekable() + def seek(self, *args): return self._file.seek(*args) @@ -761,11 +833,14 @@ def tell(self): def truncate(self, size=None): if size is None: - self._file.truncate() + return self._file.truncate() else: if size > self._max_size: self.rollover() - self._file.truncate(size) + return self._file.truncate(size) + + def writable(self): + return self._file.writable() def write(self, s): file = self._file @@ -774,10 +849,17 @@ def write(self, s): return rv def writelines(self, iterable): - file = self._file - rv = file.writelines(iterable) - self._check(file) - return rv + if self._max_size == 0 or self._rolled: + return self._file.writelines(iterable) + + it = iter(iterable) + for line in it: + self.write(line) + if self._rolled: + return self._file.writelines(it) + + def detach(self): + return self._file.detach() class TemporaryDirectory: @@ -789,53 +871,74 @@ class TemporaryDirectory: ... Upon exiting the context, the directory and everything contained - in it are removed. + in it are removed (unless delete=False is passed or an exception + is raised during cleanup and ignore_cleanup_errors is not True). + + Optional Arguments: + suffix - A str suffix for the directory name. (see mkdtemp) + prefix - A str prefix for the directory name. (see mkdtemp) + dir - A directory to create this temp dir in. (see mkdtemp) + ignore_cleanup_errors - False; ignore exceptions during cleanup? + delete - True; whether the directory is automatically deleted. """ def __init__(self, suffix=None, prefix=None, dir=None, - ignore_cleanup_errors=False): + ignore_cleanup_errors=False, *, delete=True): self.name = mkdtemp(suffix, prefix, dir) self._ignore_cleanup_errors = ignore_cleanup_errors + self._delete = delete self._finalizer = _weakref.finalize( self, self._cleanup, self.name, warn_message="Implicitly cleaning up {!r}".format(self), - ignore_errors=self._ignore_cleanup_errors) + ignore_errors=self._ignore_cleanup_errors, delete=self._delete) @classmethod - def _rmtree(cls, name, ignore_errors=False): - def onerror(func, path, exc_info): - if issubclass(exc_info[0], PermissionError): - def resetperms(path): - try: - _os.chflags(path, 0) - except AttributeError: - pass - _os.chmod(path, 0o700) + def _rmtree(cls, name, ignore_errors=False, repeated=False): + def onexc(func, path, exc): + if isinstance(exc, PermissionError): + if repeated and path == name: + if ignore_errors: + return + raise try: if path != name: - resetperms(_os.path.dirname(path)) - resetperms(path) + _resetperms(_os.path.dirname(path)) + _resetperms(path) try: _os.unlink(path) - # PermissionError is raised on FreeBSD for directories - except (IsADirectoryError, PermissionError): + except IsADirectoryError: cls._rmtree(path, ignore_errors=ignore_errors) + except PermissionError: + # The PermissionError handler was originally added for + # FreeBSD in directories, but it seems that it is raised + # on Windows too. + # bpo-43153: Calling _rmtree again may + # raise NotADirectoryError and mask the PermissionError. + # So we must re-raise the current PermissionError if + # path is not a directory. + if not _os.path.isdir(path) or _os.path.isjunction(path): + if ignore_errors: + return + raise + cls._rmtree(path, ignore_errors=ignore_errors, + repeated=(path == name)) except FileNotFoundError: pass - elif issubclass(exc_info[0], FileNotFoundError): + elif isinstance(exc, FileNotFoundError): pass else: if not ignore_errors: raise - _shutil.rmtree(name, onerror=onerror) + _shutil.rmtree(name, onexc=onexc) @classmethod - def _cleanup(cls, name, warn_message, ignore_errors=False): - cls._rmtree(name, ignore_errors=ignore_errors) - _warnings.warn(warn_message, ResourceWarning) + def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True): + if delete: + cls._rmtree(name, ignore_errors=ignore_errors) + _warnings.warn(warn_message, ResourceWarning) def __repr__(self): return "<{} {!r}>".format(self.__class__.__name__, self.name) @@ -844,7 +947,8 @@ def __enter__(self): return self.name def __exit__(self, exc, value, tb): - self.cleanup() + if self._delete: + self.cleanup() def cleanup(self): if self._finalizer.detach() or _os.path.exists(self.name): diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py index 1bb74c6d969..b3dd906cc72 100644 --- a/Lib/test/datetimetester.py +++ b/Lib/test/datetimetester.py @@ -398,6 +398,8 @@ def test_aware_datetime(self): self.assertEqual(tz.dst(t), t.replace(tzinfo=tz).dst()) + @unittest.skip('TODO: RUSTPYTHON') + # _pycodecs was deleted def test_pickle(self): for tz in self.ACDT, self.EST, timezone.min, timezone.max: for pickler, unpickler, proto in pickle_choices: diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index b1c1f6abffb..c6e0e6fc3f3 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -1,10 +1,16 @@ -from test import support -from test.support import verbose, reap_children +import unittest +from test.support import ( + is_android, is_apple_mobile, is_emscripten, is_wasi, reap_children, verbose +) from test.support.import_helper import import_module +from test.support.os_helper import TESTFN, unlink # Skip these tests if termios is not available import_module('termios') +if is_android or is_apple_mobile or is_emscripten or is_wasi: + raise unittest.SkipTest("pty is not available on this platform") + import errno import os import pty @@ -14,21 +20,12 @@ import signal import socket import io # readline -import unittest - -import struct -import fcntl import warnings TEST_STRING_1 = b"I wish to buy a fish license.\n" TEST_STRING_2 = b"For my pet fish, Eric.\n" -try: - _TIOCGWINSZ = tty.TIOCGWINSZ - _TIOCSWINSZ = tty.TIOCSWINSZ - _HAVE_WINSZ = True -except AttributeError: - _HAVE_WINSZ = False +_HAVE_WINSZ = hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ") if verbose: def debug(msg): @@ -82,90 +79,79 @@ def expectedFailureIfStdinIsTTY(fun): pass return fun -def _get_term_winsz(fd): - s = struct.pack("HHHH", 0, 0, 0, 0) - return fcntl.ioctl(fd, _TIOCGWINSZ, s) -def _set_term_winsz(fd, winsz): - fcntl.ioctl(fd, _TIOCSWINSZ, winsz) +def write_all(fd, data): + written = os.write(fd, data) + if written != len(data): + # gh-73256, gh-110673: It should never happen, but check just in case + raise Exception(f"short write: os.write({fd}, {len(data)} bytes) " + f"wrote {written} bytes") # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing # because pty code is not too portable. class PtyTest(unittest.TestCase): def setUp(self): - old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) - self.addCleanup(signal.signal, signal.SIGALRM, old_alarm) - old_sighup = signal.signal(signal.SIGHUP, self.handle_sighup) self.addCleanup(signal.signal, signal.SIGHUP, old_sighup) - # isatty() and close() can hang on some platforms. Set an alarm - # before running the test to make sure we don't hang forever. - self.addCleanup(signal.alarm, 0) - signal.alarm(10) - - # Save original stdin window size - self.stdin_rows = None - self.stdin_cols = None + # Save original stdin window size. + self.stdin_dim = None if _HAVE_WINSZ: try: - stdin_dim = os.get_terminal_size(pty.STDIN_FILENO) - self.stdin_rows = stdin_dim.lines - self.stdin_cols = stdin_dim.columns - old_stdin_winsz = struct.pack("HHHH", self.stdin_rows, - self.stdin_cols, 0, 0) - self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz) - except OSError: + self.stdin_dim = tty.tcgetwinsize(pty.STDIN_FILENO) + self.addCleanup(tty.tcsetwinsize, pty.STDIN_FILENO, + self.stdin_dim) + except tty.error: pass - def handle_sig(self, sig, frame): - self.fail("isatty hung") - @staticmethod def handle_sighup(signum, frame): pass @expectedFailureIfStdinIsTTY + @unittest.skip('TODO: RUSTPYTHON') + # module 'tty' has no attribute 'tcgetwinsize' def test_openpty(self): try: mode = tty.tcgetattr(pty.STDIN_FILENO) except tty.error: - # not a tty or bad/closed fd + # Not a tty or bad/closed fd. debug("tty.tcgetattr(pty.STDIN_FILENO) failed") mode = None - new_stdin_winsz = None - if self.stdin_rows is not None and self.stdin_cols is not None: + new_dim = None + if self.stdin_dim: try: # Modify pty.STDIN_FILENO window size; we need to # check if pty.openpty() is able to set pty slave # window size accordingly. - debug("Setting pty.STDIN_FILENO window size") - debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})") - target_stdin_rows = self.stdin_rows + 1 - target_stdin_cols = self.stdin_cols + 1 - debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})") - target_stdin_winsz = struct.pack("HHHH", target_stdin_rows, - target_stdin_cols, 0, 0) - _set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz) + debug("Setting pty.STDIN_FILENO window size.") + debug(f"original size: (row, col) = {self.stdin_dim}") + target_dim = (self.stdin_dim[0] + 1, self.stdin_dim[1] + 1) + debug(f"target size: (row, col) = {target_dim}") + tty.tcsetwinsize(pty.STDIN_FILENO, target_dim) # Were we able to set the window size # of pty.STDIN_FILENO successfully? - new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO) - self.assertEqual(new_stdin_winsz, target_stdin_winsz, + new_dim = tty.tcgetwinsize(pty.STDIN_FILENO) + self.assertEqual(new_dim, target_dim, "pty.STDIN_FILENO window size unchanged") - except OSError: - warnings.warn("Failed to set pty.STDIN_FILENO window size") + except OSError as e: + logging.getLogger(__name__).warning( + "Failed to set pty.STDIN_FILENO window size.", exc_info=e, + ) pass try: debug("Calling pty.openpty()") try: - master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz) + master_fd, slave_fd, slave_name = pty.openpty(mode, new_dim, + True) except TypeError: master_fd, slave_fd = pty.openpty() - debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'") + slave_name = None + debug(f"Got {master_fd=}, {slave_fd=}, {slave_name=}") except OSError: # " An optional feature could not be imported " ... ? raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.") @@ -181,8 +167,8 @@ def test_openpty(self): if mode: self.assertEqual(tty.tcgetattr(slave_fd), mode, "openpty() failed to set slave termios") - if new_stdin_winsz: - self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz, + if new_dim: + self.assertEqual(tty.tcgetwinsize(slave_fd), new_dim, "openpty() failed to set slave window size") # Ensure the fd is non-blocking in case there's nothing to read. @@ -200,18 +186,19 @@ def test_openpty(self): os.set_blocking(master_fd, blocking) debug("Writing to slave_fd") - os.write(slave_fd, TEST_STRING_1) + write_all(slave_fd, TEST_STRING_1) s1 = _readline(master_fd) self.assertEqual(b'I wish to buy a fish license.\n', normalize_output(s1)) debug("Writing chunked output") - os.write(slave_fd, TEST_STRING_2[:5]) - os.write(slave_fd, TEST_STRING_2[5:]) + write_all(slave_fd, TEST_STRING_2[:5]) + write_all(slave_fd, TEST_STRING_2[5:]) s2 = _readline(master_fd) self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2)) - @support.requires_fork() + @unittest.skip('TODO: RUSTPYTHON') + # module 'tty' has no attribute 'tcgetwinsize' def test_fork(self): debug("calling pty.fork()") pid, master_fd = pty.fork() @@ -294,6 +281,8 @@ def test_fork(self): ##else: ## raise TestFailed("Read from master_fd did not raise exception") + @unittest.skip('TODO: RUSTPYTHON') + # module 'tty' has no attribute 'tcgetwinsize' def test_master_read(self): # XXX(nnorwitz): this test leaks fds when there is an error. debug("Calling pty.openpty()") @@ -313,8 +302,29 @@ def test_master_read(self): self.assertEqual(data, b"") + @unittest.skip('TODO: RUSTPYTHON') + # module 'tty' has no attribute 'tcgetwinsize' def test_spawn_doesnt_hang(self): - pty.spawn([sys.executable, '-c', 'print("hi there")']) + self.addCleanup(unlink, TESTFN) + with open(TESTFN, 'wb') as f: + STDOUT_FILENO = 1 + dup_stdout = os.dup(STDOUT_FILENO) + os.dup2(f.fileno(), STDOUT_FILENO) + buf = b'' + def master_read(fd): + nonlocal buf + data = os.read(fd, 1024) + buf += data + return data + try: + pty.spawn([sys.executable, '-c', 'print("hi there")'], + master_read) + finally: + os.dup2(dup_stdout, STDOUT_FILENO) + os.close(dup_stdout) + self.assertEqual(buf, b'hi there\r\n') + with open(TESTFN, 'rb') as f: + self.assertEqual(f.read(), b'hi there\r\n') class SmallPtyTests(unittest.TestCase): """These tests don't spawn children or hang.""" @@ -332,8 +342,8 @@ def setUp(self): self.orig_pty_waitpid = pty.waitpid self.fds = [] # A list of file descriptors to close. self.files = [] - self.select_rfds_lengths = [] - self.select_rfds_results = [] + self.select_input = [] + self.select_output = [] self.tcsetattr_mode_setting = None def tearDown(self): @@ -368,11 +378,10 @@ def _socketpair(self): self.files.extend(socketpair) return socketpair - def _mock_select(self, rfds, wfds, xfds, timeout=0): + def _mock_select(self, rfds, wfds, xfds): # This will raise IndexError when no more expected calls exist. - # This ignores the timeout - self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) - return self.select_rfds_results.pop(0), [], [] + self.assertEqual((rfds, wfds, xfds), self.select_input.pop(0)) + return self.select_output.pop(0) def _make_mock_fork(self, pid): def mock_fork(): @@ -392,14 +401,16 @@ def test__copy_to_each(self): masters = [s.fileno() for s in socketpair] # Feed data. Smaller than PIPEBUF. These writes will not block. - os.write(masters[1], b'from master') - os.write(write_to_stdin_fd, b'from stdin') + write_all(masters[1], b'from master') + write_all(write_to_stdin_fd, b'from stdin') - # Expect two select calls, the last one will cause IndexError + # Expect three select calls, the last one will cause IndexError pty.select = self._mock_select - self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - self.select_rfds_lengths.append(2) + self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) + self.select_output.append(([mock_stdin_fd, masters[0]], [], [])) + self.select_input.append(([mock_stdin_fd, masters[0]], [mock_stdout_fd, masters[0]], [])) + self.select_output.append(([], [mock_stdout_fd, masters[0]], [])) + self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) with self.assertRaises(IndexError): pty._copy(masters[0]) @@ -410,28 +421,6 @@ def test__copy_to_each(self): self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin') - def test__copy_eof_on_all(self): - """Test the empty read EOF case on both master_fd and stdin.""" - read_from_stdout_fd, mock_stdout_fd = self._pipe() - pty.STDOUT_FILENO = mock_stdout_fd - mock_stdin_fd, write_to_stdin_fd = self._pipe() - pty.STDIN_FILENO = mock_stdin_fd - socketpair = self._socketpair() - masters = [s.fileno() for s in socketpair] - - socketpair[1].close() - os.close(write_to_stdin_fd) - - pty.select = self._mock_select - self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - # We expect that both fds were removed from the fds list as they - # both encountered an EOF before the second select call. - self.select_rfds_lengths.append(0) - - # We expect the function to return without error. - self.assertEqual(pty._copy(masters[0]), None) - def test__restore_tty_mode_normal_return(self): """Test that spawn resets the tty mode no when _copy returns normally.""" diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index cb744b41e8b..578bee3172a 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -13,9 +13,10 @@ import time import unittest from test import support -from test.support import os_helper +from test.support import ( + is_apple, is_apple_mobile, os_helper, threading_helper +) from test.support.script_helper import assert_python_ok, spawn_python -from test.support import threading_helper try: import _testcapi except ImportError: @@ -122,6 +123,8 @@ def __repr__(self): self.assertEqual(signal.getsignal(signal.SIGHUP), hup) self.assertEqual(0, argument.repr_count) + @unittest.skipIf(sys.platform.startswith("netbsd"), + "gh-124083: strsignal is not supported on NetBSD") def test_strsignal(self): self.assertIn("Interrupt", signal.strsignal(signal.SIGINT)) self.assertIn("Terminated", signal.strsignal(signal.SIGTERM)) @@ -189,8 +192,7 @@ def test_valid_signals(self): self.assertNotIn(signal.NSIG, s) self.assertLess(len(s), signal.NSIG) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_issue9324(self): # Updated for issue #10003, adding SIGBREAK handler = lambda x, y: None @@ -699,7 +701,7 @@ def handler(signum, frame): @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") class SiginterruptTest(unittest.TestCase): - def readpipe_interrupted(self, interrupt): + def readpipe_interrupted(self, interrupt, timeout=support.SHORT_TIMEOUT): """Perform a read during which a signal will arrive. Return True if the read is interrupted by the signal and raises an exception. Return False if it returns normally. @@ -747,7 +749,7 @@ def handler(signum, frame): # wait until the child process is loaded and has started first_line = process.stdout.readline() - stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT) + stdout, stderr = process.communicate(timeout=timeout) except subprocess.TimeoutExpired: process.kill() return False @@ -778,7 +780,7 @@ def test_siginterrupt_off(self): # If a signal handler is installed and siginterrupt is called with # a false value for the second argument, when that signal arrives, it # does not interrupt a syscall that's in progress. - interrupted = self.readpipe_interrupted(False) + interrupted = self.readpipe_interrupted(False, timeout=2) self.assertFalse(interrupted) @@ -834,16 +836,16 @@ def test_itimer_real(self): self.assertEqual(self.hndl_called, True) # Issue 3864, unknown if this affects earlier versions of freebsd also - @unittest.skipIf(sys.platform in ('netbsd5',), + @unittest.skipIf(sys.platform in ('netbsd5',) or is_apple_mobile, 'itimer not reliable (does not mix well with threading) on some BSDs.') def test_itimer_virtual(self): self.itimer = signal.ITIMER_VIRTUAL signal.signal(signal.SIGVTALRM, self.sig_vtalrm) - signal.setitimer(self.itimer, 0.3, 0.2) + signal.setitimer(self.itimer, 0.001, 0.001) for _ in support.busy_retry(support.LONG_TIMEOUT): # use up some virtual time by doing real work - _ = pow(12345, 67890, 10000019) + _ = sum(i * i for i in range(10**5)) if signal.getitimer(self.itimer) == (0.0, 0.0): # sig_vtalrm handler stopped this itimer break @@ -860,7 +862,7 @@ def test_itimer_prof(self): for _ in support.busy_retry(support.LONG_TIMEOUT): # do some work - _ = pow(12345, 67890, 10000019) + _ = sum(i * i for i in range(10**5)) if signal.getitimer(self.itimer) == (0.0, 0.0): # sig_prof handler stopped this itimer break @@ -1326,15 +1328,18 @@ def test_stress_delivery_simultaneous(self): def handler(signum, frame): sigs.append(signum) - self.setsig(signal.SIGUSR1, handler) + # On Android, SIGUSR1 is unreliable when used in close proximity to + # another signal – see Android/testbed/app/src/main/python/main.py. + # So we use a different signal. + self.setsig(signal.SIGUSR2, handler) self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL expected_sigs = 0 while expected_sigs < N: # Hopefully the SIGALRM will be received somewhere during - # initial processing of SIGUSR1. + # initial processing of SIGUSR2. signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) - os.kill(os.getpid(), signal.SIGUSR1) + os.kill(os.getpid(), signal.SIGUSR2) expected_sigs += 2 # Wait for handlers to run to avoid signal coalescing @@ -1346,8 +1351,8 @@ def handler(signum, frame): # Python handler self.assertEqual(len(sigs), N, "Some signals were lost") - @unittest.skip("TODO: RUSTPYTHON; hang") - @unittest.skipIf(sys.platform == "darwin", "crashes due to system bug (FB13453490)") + @unittest.skip('TODO: RUSTPYTHON; hang') + @unittest.skipIf(is_apple, "crashes due to system bug (FB13453490)") @unittest.skipUnless(hasattr(signal, "SIGUSR1"), "test needs SIGUSR1") @threading_helper.requires_working_threading() @@ -1414,8 +1419,7 @@ def test_sigint(self): with self.assertRaises(KeyboardInterrupt): signal.raise_signal(signal.SIGINT) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipIf(sys.platform != "win32", "Windows specific test") def test_invalid_argument(self): try: @@ -1439,8 +1443,7 @@ def handler(a, b): signal.raise_signal(signal.SIGINT) self.assertTrue(is_ok) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test__thread_interrupt_main(self): # See https://github.com/python/cpython/issues/102397 code = """if 1: diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index cdbf341b9cb..6235c8e74cf 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -8,7 +8,6 @@ import select import signal import socket -import tempfile import threading import unittest import socketserver @@ -21,6 +20,8 @@ test.support.requires("network") +test.support.requires_working_socket(module=True) + TEST_STR = b"hello world\n" HOST = socket_helper.HOST @@ -28,14 +29,9 @@ HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS, 'requires Unix sockets') -HAVE_FORKING = hasattr(os, "fork") +HAVE_FORKING = test.support.has_fork_support requires_forking = unittest.skipUnless(HAVE_FORKING, 'requires forking') -def signal_alarm(n): - """Call signal.alarm when it exists (i.e. not on Windows).""" - if hasattr(signal, 'alarm'): - signal.alarm(n) - # Remember real select() to avoid interferences with mocking _real_select = select.select @@ -46,14 +42,6 @@ def receive(sock, n, timeout=test.support.SHORT_TIMEOUT): else: raise RuntimeError("timed out on %r" % (sock,)) -if HAVE_UNIX_SOCKETS and HAVE_FORKING: - class ForkingUnixStreamServer(socketserver.ForkingMixIn, - socketserver.UnixStreamServer): - pass - - class ForkingUnixDatagramServer(socketserver.ForkingMixIn, - socketserver.UnixDatagramServer): - pass @test.support.requires_fork() # TODO: RUSTPYTHON, os.fork is currently only supported on Unix-based systems @contextlib.contextmanager @@ -75,12 +63,10 @@ class SocketServerTest(unittest.TestCase): """Test all socket servers.""" def setUp(self): - signal_alarm(60) # Kill deadlocks after 60 seconds. self.port_seed = 0 self.test_files = [] def tearDown(self): - signal_alarm(0) # Didn't deadlock. reap_children() for fn in self.test_files: @@ -96,8 +82,7 @@ def pickaddr(self, proto): else: # XXX: We need a way to tell AF_UNIX to pick its own name # like AF_INET provides port==0. - dir = None - fn = tempfile.mktemp(prefix='unix_socket.', dir=dir) + fn = socket_helper.create_unix_domain_name() self.test_files.append(fn) return fn @@ -211,7 +196,7 @@ def test_ThreadingUnixStreamServer(self): @requires_forking def test_ForkingUnixStreamServer(self): with simple_subprocess(self): - self.run_server(ForkingUnixStreamServer, + self.run_server(socketserver.ForkingUnixStreamServer, socketserver.StreamRequestHandler, self.stream_examine) @@ -247,7 +232,7 @@ def test_ThreadingUnixDatagramServer(self): @requires_unix_sockets @requires_forking def test_ForkingUnixDatagramServer(self): - self.run_server(ForkingUnixDatagramServer, + self.run_server(socketserver.ForkingUnixDatagramServer, socketserver.DatagramRequestHandler, self.dgram_examine) diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 997c98c2c62..4d058652723 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -800,8 +800,7 @@ def test_env(self): stdout, stderr = p.communicate() self.assertEqual(stdout, b"orange") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipUnless(sys.platform == "win32", "Windows only issue") def test_win32_duplicate_envs(self): newenv = os.environ.copy() @@ -1292,8 +1291,7 @@ def test_universal_newlines_communicate_stdin_stdout_stderr(self): # to stderr at exit of subprocess. self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_universal_newlines_communicate_encodings(self): # Check that universal newlines mode works for various encodings, # in particular for encodings in the UTF-16 and UTF-32 families. @@ -2276,16 +2274,14 @@ def test_group_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, group=65535) - # TODO: RUSTPYTHON, observed gids do not match expected gids - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; observed gids do not match expected gids @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') def test_extra_groups(self): gid = os.getegid() group_list = [65534 if gid != 65534 else 65533] self._test_extra_groups_impl(gid=gid, group_list=group_list) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') def test_extra_groups_empty_list(self): self._test_extra_groups_impl(gid=os.getegid(), group_list=[]) @@ -2321,8 +2317,8 @@ def _test_extra_groups_impl(self, *, gid, group_list): subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[name_group]) - @unittest.skip("TODO: RUSTPYTHON; clarify failure condition") # No skip necessary, this test won't make it to a setgroup() call. + @unittest.skip('TODO: RUSTPYTHON; clarify failure condition') def test_extra_groups_invalid_gid_t_values(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1]) @@ -2449,8 +2445,7 @@ def raise_it(): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=raise_it) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_preexec_gc_module_failure(self): # This tests the code that disables garbage collection if the child # process will execute any Python. @@ -3021,7 +3016,7 @@ def kill_p2(): p1.stdout.close() p2.stdout.close() - @unittest.skip("TODO: RUSTPYTHON, flaky test") + @unittest.skip('TODO: RUSTPYTHON; flaky test') def test_close_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -3149,11 +3144,11 @@ def test_close_fds_when_max_fd_is_lowered(self): msg="Some fds were left open.") - @unittest.skip("TODO: RUSTPYTHON, flaky test") # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file # descriptor of a pipe closed in the parent process is valid in the # child process according to fstat(), but the mode of the file # descriptor is invalid, and read or write raise an error. + @unittest.skip('TODO: RUSTPYTHON; flaky test') @support.requires_mac_ver(10, 5) def test_pass_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -3566,8 +3561,7 @@ def test_communicate_repeated_call_after_stdout_close(self): except subprocess.TimeoutExpired: pass - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_preexec_at_exit(self): code = f"""if 1: import atexit diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 5674839cd43..5e43fb82be2 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -11,6 +11,9 @@ import stat import types import weakref +import gc +import shutil +import subprocess from unittest import mock import unittest @@ -60,16 +63,10 @@ def test_infer_return_type_multiples_and_none(self): tempfile._infer_return_type(b'', None, '') def test_infer_return_type_pathlib(self): - self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) + self.assertIs(str, tempfile._infer_return_type(os_helper.FakePath('/'))) def test_infer_return_type_pathlike(self): - class Path: - def __init__(self, path): - self.path = path - - def __fspath__(self): - return self.path - + Path = os_helper.FakePath self.assertIs(str, tempfile._infer_return_type(Path('/'))) self.assertIs(bytes, tempfile._infer_return_type(Path(b'/'))) self.assertIs(str, tempfile._infer_return_type('', Path(''))) @@ -90,14 +87,10 @@ class BaseTestCase(unittest.TestCase): b_check = re.compile(br"^[a-z0-9_-]{8}$") def setUp(self): - self._warnings_manager = warnings_helper.check_warnings() - self._warnings_manager.__enter__() + self.enterContext(warnings_helper.check_warnings()) warnings.filterwarnings("ignore", category=RuntimeWarning, message="mktemp", module=__name__) - def tearDown(self): - self._warnings_manager.__exit__(None, None, None) - def nameCheck(self, name, dir, pre, suf): (ndir, nbase) = os.path.split(name) npre = nbase[:len(pre)] @@ -198,8 +191,7 @@ def supports_iter(self): if i == 20: break - @unittest.skipUnless(hasattr(os, 'fork'), - "os.fork is required for this test") + @support.requires_fork() def test_process_awareness(self): # ensure that the random source differs between # child and parent. @@ -290,19 +282,14 @@ def our_candidate_list(): def raise_OSError(*args, **kwargs): raise OSError() - with support.swap_attr(io, "open", raise_OSError): - # test again with failing io.open() + with support.swap_attr(os, "open", raise_OSError): + # test again with failing os.open() with self.assertRaises(FileNotFoundError): tempfile._get_default_tempdir() self.assertEqual(os.listdir(our_temp_directory), []) - def bad_writer(*args, **kwargs): - fp = orig_open(*args, **kwargs) - fp.write = raise_OSError - return fp - - with support.swap_attr(io, "open", bad_writer) as orig_open: - # test again with failing write() + with support.swap_attr(os, "write", raise_OSError): + # test again with failing os.write() with self.assertRaises(FileNotFoundError): tempfile._get_default_tempdir() self.assertEqual(os.listdir(our_temp_directory), []) @@ -342,6 +329,9 @@ def _mock_candidate_names(*names): class TestBadTempdir: + @unittest.skipIf( + support.is_emscripten, "Emscripten cannot remove write bits." + ) def test_read_only_directory(self): with _inside_empty_temp_dir(): oldmode = mode = os.stat(tempfile.tempdir).st_mode @@ -447,11 +437,12 @@ def test_choose_directory(self): dir = tempfile.mkdtemp() try: self.do_create(dir=dir).write(b"blat") - self.do_create(dir=pathlib.Path(dir)).write(b"blat") + self.do_create(dir=os_helper.FakePath(dir)).write(b"blat") finally: support.gc_collect() # For PyPy or other GCs. os.rmdir(dir) + @os_helper.skip_unless_working_chmod def test_file_mode(self): # _mkstemp_inner creates files with the proper mode @@ -465,8 +456,8 @@ def test_file_mode(self): expected = user * (1 + 8 + 64) self.assertEqual(mode, expected) - @support.requires_fork() @unittest.skipUnless(has_spawnl, 'os.spawnl not available') + @support.requires_subprocess() def test_noinherit(self): # _mkstemp_inner file handles are not inherited by child processes @@ -684,7 +675,7 @@ def test_choose_directory(self): dir = tempfile.mkdtemp() try: self.do_create(dir=dir) - self.do_create(dir=pathlib.Path(dir)) + self.do_create(dir=os_helper.FakePath(dir)) finally: os.rmdir(dir) @@ -785,10 +776,11 @@ def test_choose_directory(self): dir = tempfile.mkdtemp() try: os.rmdir(self.do_create(dir=dir)) - os.rmdir(self.do_create(dir=pathlib.Path(dir))) + os.rmdir(self.do_create(dir=os_helper.FakePath(dir))) finally: os.rmdir(dir) + @os_helper.skip_unless_working_chmod def test_mode(self): # mkdtemp creates directories with the proper mode @@ -806,6 +798,33 @@ def test_mode(self): finally: os.rmdir(dir) + @unittest.skipUnless(os.name == "nt", "Only on Windows.") + def test_mode_win32(self): + # Use icacls.exe to extract the users with some level of access + # Main thing we are testing is that the BUILTIN\Users group has + # no access. The exact ACL is going to vary based on which user + # is running the test. + dir = self.do_create() + try: + out = subprocess.check_output(["icacls.exe", dir], encoding="oem").casefold() + finally: + os.rmdir(dir) + + dir = dir.casefold() + users = set() + found_user = False + for line in out.strip().splitlines(): + acl = None + # First line of result includes our directory + if line.startswith(dir): + acl = line.removeprefix(dir).strip() + elif line and line[:1].isspace(): + acl = line.strip() + if acl: + users.add(acl.partition(":")[0]) + + self.assertNotIn(r"BUILTIN\Users".casefold(), users) + def test_collision_with_existing_file(self): # mkdtemp tries another name when a file with # the chosen name already exists @@ -853,6 +872,15 @@ def test_for_tempdir_is_bytes_issue40701_api_warts(self): finally: tempfile.tempdir = orig_tempdir + def test_path_is_absolute(self): + # Test that the path returned by mkdtemp with a relative `dir` + # argument is absolute + try: + path = tempfile.mkdtemp(dir=".") + self.assertTrue(os.path.isabs(path)) + finally: + os.rmdir(path) + class TestMktemp(BaseTestCase): """Test mktemp().""" @@ -978,6 +1006,7 @@ def test_del_on_close(self): try: with tempfile.NamedTemporaryFile(dir=dir) as f: f.write(b'blat') + self.assertEqual(os.listdir(dir), []) self.assertFalse(os.path.exists(f.name), "NamedTemporaryFile %s exists after close" % f.name) finally: @@ -1017,18 +1046,101 @@ def use_closed(): pass self.assertRaises(ValueError, use_closed) - def test_no_leak_fd(self): - # Issue #21058: don't leak file descriptor when io.open() fails - closed = [] - os_close = os.close - def close(fd): - closed.append(fd) - os_close(fd) + def test_context_man_not_del_on_close_if_delete_on_close_false(self): + # Issue gh-58451: tempfile.NamedTemporaryFile is not particularly useful + # on Windows + # A NamedTemporaryFile is NOT deleted when closed if + # delete_on_close=False, but is deleted on context manager exit + dir = tempfile.mkdtemp() + try: + with tempfile.NamedTemporaryFile(dir=dir, + delete=True, + delete_on_close=False) as f: + f.write(b'blat') + f_name = f.name + f.close() + with self.subTest(): + # Testing that file is not deleted on close + self.assertTrue(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} is incorrectly " + f"deleted on closure when delete_on_close=False") + + with self.subTest(): + # Testing that file is deleted on context manager exit + self.assertFalse(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} exists " + f"after context manager exit") + + finally: + os.rmdir(dir) + + def test_context_man_ok_to_delete_manually(self): + # In the case of delete=True, a NamedTemporaryFile can be manually + # deleted in a with-statement context without causing an error. + dir = tempfile.mkdtemp() + try: + with tempfile.NamedTemporaryFile(dir=dir, + delete=True, + delete_on_close=False) as f: + f.write(b'blat') + f.close() + os.unlink(f.name) + + finally: + os.rmdir(dir) + + def test_context_man_not_del_if_delete_false(self): + # A NamedTemporaryFile is not deleted if delete = False + dir = tempfile.mkdtemp() + f_name = "" + try: + # Test that delete_on_close=True has no effect if delete=False. + with tempfile.NamedTemporaryFile(dir=dir, delete=False, + delete_on_close=True) as f: + f.write(b'blat') + f_name = f.name + self.assertTrue(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} exists after close") + finally: + os.unlink(f_name) + os.rmdir(dir) + + def test_del_by_finalizer(self): + # A NamedTemporaryFile is deleted when finalized in the case of + # delete=True, delete_on_close=False, and no with-statement is used. + def my_func(dir): + f = tempfile.NamedTemporaryFile(dir=dir, delete=True, + delete_on_close=False) + tmp_name = f.name + f.write(b'blat') + # Testing extreme case, where the file is not explicitly closed + # f.close() + return tmp_name + # Make sure that the garbage collector has finalized the file object. + gc.collect() + dir = tempfile.mkdtemp() + try: + tmp_name = my_func(dir) + self.assertFalse(os.path.exists(tmp_name), + f"NamedTemporaryFile {tmp_name!r} " + f"exists after finalizer ") + finally: + os.rmdir(dir) - with mock.patch('os.close', side_effect=close): - with mock.patch('io.open', side_effect=ValueError): - self.assertRaises(ValueError, tempfile.NamedTemporaryFile) - self.assertEqual(len(closed), 1) + def test_correct_finalizer_work_if_already_deleted(self): + # There should be no error in the case of delete=True, + # delete_on_close=False, no with-statement is used, and the file is + # deleted manually. + def my_func(dir)->str: + f = tempfile.NamedTemporaryFile(dir=dir, delete=True, + delete_on_close=False) + tmp_name = f.name + f.write(b'blat') + f.close() + os.unlink(tmp_name) + return tmp_name + # Make sure that the garbage collector has finalized the file object. + gc.collect() def test_bad_mode(self): dir = tempfile.mkdtemp() @@ -1039,6 +1151,24 @@ def test_bad_mode(self): tempfile.NamedTemporaryFile(mode=2, dir=dir) self.assertEqual(os.listdir(dir), []) + def test_bad_encoding(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with self.assertRaises(LookupError): + tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir) + self.assertEqual(os.listdir(dir), []) + + def test_unexpected_error(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \ + mock.patch('io.open', mock.mock_open()) as mock_open: + mock_ntf.side_effect = KeyboardInterrupt() + with self.assertRaises(KeyboardInterrupt): + tempfile.NamedTemporaryFile(dir=dir) + mock_open().close.assert_called() + self.assertEqual(os.listdir(dir), []) + # How to test the mode and bufsize parameters? class TestSpooledTemporaryFile(BaseTestCase): @@ -1059,6 +1189,31 @@ def test_basic(self): f = self.do_create(max_size=100, pre="a", suf=".txt") self.assertFalse(f._rolled) + def test_is_iobase(self): + # SpooledTemporaryFile should implement io.IOBase + self.assertIsInstance(self.do_create(), io.IOBase) + + def test_iobase_interface(self): + # SpooledTemporaryFile should implement the io.IOBase interface. + # Ensure it has all the required methods and properties. + iobase_attrs = { + # From IOBase + 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__', + '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable', + 'readline', 'readlines', 'seekable', 'tell', 'writable', + 'writelines', + # From BufferedIOBase (binary mode) and TextIOBase (text mode) + 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1', + 'encoding', 'errors', 'newlines', + } + spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile)) + missing_attrs = iobase_attrs - spooledtempfile_attrs + self.assertFalse( + missing_attrs, + 'SpooledTemporaryFile missing attributes from ' + 'IOBase/BufferedIOBase/TextIOBase' + ) + def test_del_on_close(self): # A SpooledTemporaryFile is deleted when closed dir = tempfile.mkdtemp() @@ -1069,11 +1224,40 @@ def test_del_on_close(self): self.assertTrue(f._rolled) filename = f.name f.close() - self.assertFalse(isinstance(filename, str) and os.path.exists(filename), - "SpooledTemporaryFile %s exists after close" % filename) + self.assertEqual(os.listdir(dir), []) + if not isinstance(filename, int): + self.assertFalse(os.path.exists(filename), + "SpooledTemporaryFile %s exists after close" % filename) finally: os.rmdir(dir) + def test_del_unrolled_file(self): + # The unrolled SpooledTemporaryFile should raise a ResourceWarning + # when deleted since the file was not explicitly closed. + f = self.do_create(max_size=10) + f.write(b'foo') + self.assertEqual(f.name, None) # Unrolled so no filename/fd + with self.assertWarns(ResourceWarning): + f.__del__() + + @unittest.skipIf( + support.is_emscripten, "Emscripten cannot fstat renamed files." + ) + def test_del_rolled_file(self): + # The rolled file should be deleted when the SpooledTemporaryFile + # object is deleted. This should raise a ResourceWarning since the file + # was not explicitly closed. + f = self.do_create(max_size=2) + f.write(b'foo') + name = f.name # This is a fd on posix+cygwin, a filename everywhere else + self.assertTrue(os.path.exists(name)) + with self.assertWarns(ResourceWarning): + f.__del__() + self.assertFalse( + os.path.exists(name), + "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name + ) + def test_rewrite_small(self): # A SpooledTemporaryFile can be written to multiple within the max_size f = self.do_create(max_size=30) @@ -1104,6 +1288,34 @@ def test_writelines(self): buf = f.read() self.assertEqual(buf, b'xyz') + def test_writelines_rollover(self): + # Verify writelines rolls over before exhausting the iterator + f = self.do_create(max_size=2) + + def it(): + yield b'xy' + self.assertFalse(f._rolled) + yield b'z' + self.assertTrue(f._rolled) + + f.writelines(it()) + pos = f.seek(0) + self.assertEqual(pos, 0) + buf = f.read() + self.assertEqual(buf, b'xyz') + + def test_writelines_fast_path(self): + f = self.do_create(max_size=2) + f.write(b'abc') + self.assertTrue(f._rolled) + + f.writelines([b'd', b'e', b'f']) + pos = f.seek(0) + self.assertEqual(pos, 0) + buf = f.read() + self.assertEqual(buf, b'abcdef') + + def test_writelines_sequential(self): # A SpooledTemporaryFile should hold exactly max_size bytes, and roll # over afterward @@ -1284,6 +1496,9 @@ def use_closed(): pass self.assertRaises(ValueError, use_closed) + @unittest.skipIf( + support.is_emscripten, "Emscripten cannot fstat renamed files." + ) def test_truncate_with_size_parameter(self): # A SpooledTemporaryFile can be truncated to zero size f = tempfile.SpooledTemporaryFile(max_size=10) @@ -1357,19 +1572,34 @@ def roundtrip(input, *args, **kwargs): roundtrip("\u039B", "w+", encoding="utf-16") roundtrip("foo\r\n", "w+", newline="") - def test_no_leak_fd(self): - # Issue #21058: don't leak file descriptor when io.open() fails - closed = [] - os_close = os.close - def close(fd): - closed.append(fd) - os_close(fd) - - with mock.patch('os.close', side_effect=close): - with mock.patch('io.open', side_effect=ValueError): - self.assertRaises(ValueError, tempfile.TemporaryFile) - self.assertEqual(len(closed), 1) + def test_bad_mode(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with self.assertRaises(ValueError): + tempfile.TemporaryFile(mode='wr', dir=dir) + with self.assertRaises(TypeError): + tempfile.TemporaryFile(mode=2, dir=dir) + self.assertEqual(os.listdir(dir), []) + + def test_bad_encoding(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with self.assertRaises(LookupError): + tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir) + self.assertEqual(os.listdir(dir), []) + def test_unexpected_error(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with mock.patch('tempfile._O_TMPFILE_WORKS', False), \ + mock.patch('os.unlink') as mock_unlink, \ + mock.patch('os.open') as mock_open, \ + mock.patch('os.close') as mock_close: + mock_unlink.side_effect = KeyboardInterrupt() + with self.assertRaises(KeyboardInterrupt): + tempfile.TemporaryFile(dir=dir) + mock_close.assert_called() + self.assertEqual(os.listdir(dir), []) # Helper for test_del_on_shutdown @@ -1437,7 +1667,7 @@ def test_explicit_cleanup(self): finally: os.rmdir(dir) - def test_explict_cleanup_ignore_errors(self): + def test_explicit_cleanup_ignore_errors(self): """Test that cleanup doesn't return an error when ignoring them.""" with tempfile.TemporaryDirectory() as working_dir: temp_dir = self.do_create( @@ -1461,6 +1691,28 @@ def test_explict_cleanup_ignore_errors(self): temp_path.exists(), f"TemporaryDirectory {temp_path!s} exists after cleanup") + @unittest.skipUnless(os.name == "nt", "Only on Windows.") + def test_explicit_cleanup_correct_error(self): + with tempfile.TemporaryDirectory() as working_dir: + temp_dir = self.do_create(dir=working_dir) + with open(os.path.join(temp_dir.name, "example.txt"), 'wb'): + # Previously raised NotADirectoryError on some OSes + # (e.g. Windows). See bpo-43153. + with self.assertRaises(PermissionError): + temp_dir.cleanup() + + @unittest.skipUnless(os.name == "nt", "Only on Windows.") + def test_cleanup_with_used_directory(self): + with tempfile.TemporaryDirectory() as working_dir: + temp_dir = self.do_create(dir=working_dir) + subdir = os.path.join(temp_dir.name, "subdir") + os.mkdir(subdir) + with os_helper.change_cwd(subdir): + # Previously raised RecursionError on some OSes + # (e.g. Windows). See bpo-35144. + with self.assertRaises(PermissionError): + temp_dir.cleanup() + @os_helper.skip_unless_symlink def test_cleanup_with_symlink_to_a_directory(self): # cleanup() should not follow symlinks to directories (issue #12464) @@ -1482,6 +1734,105 @@ def test_cleanup_with_symlink_to_a_directory(self): "were deleted") d2.cleanup() + @os_helper.skip_unless_symlink + @unittest.skip('TODO: RUSTPYTHON') + # FileNotFoundError: [Errno 2] No such file or directory: '/tmp/.../symlink' + def test_cleanup_with_symlink_modes(self): + # cleanup() should not follow symlinks when fixing mode bits (#91133) + with self.do_create(recurse=0) as d2: + file1 = os.path.join(d2, 'file1') + open(file1, 'wb').close() + dir1 = os.path.join(d2, 'dir1') + os.mkdir(dir1) + for mode in range(8): + mode <<= 6 + with self.subTest(mode=format(mode, '03o')): + def test(target, target_is_directory): + d1 = self.do_create(recurse=0) + symlink = os.path.join(d1.name, 'symlink') + os.symlink(target, symlink, + target_is_directory=target_is_directory) + try: + os.chmod(symlink, mode, follow_symlinks=False) + except NotImplementedError: + pass + try: + os.chmod(symlink, mode) + except FileNotFoundError: + pass + os.chmod(d1.name, mode) + d1.cleanup() + self.assertFalse(os.path.exists(d1.name)) + + with self.subTest('nonexisting file'): + test('nonexisting', target_is_directory=False) + with self.subTest('nonexisting dir'): + test('nonexisting', target_is_directory=True) + + with self.subTest('existing file'): + os.chmod(file1, mode) + old_mode = os.stat(file1).st_mode + test(file1, target_is_directory=False) + new_mode = os.stat(file1).st_mode + self.assertEqual(new_mode, old_mode, + '%03o != %03o' % (new_mode, old_mode)) + + with self.subTest('existing dir'): + os.chmod(dir1, mode) + old_mode = os.stat(dir1).st_mode + test(dir1, target_is_directory=True) + new_mode = os.stat(dir1).st_mode + self.assertEqual(new_mode, old_mode, + '%03o != %03o' % (new_mode, old_mode)) + + @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') + @os_helper.skip_unless_symlink + def test_cleanup_with_symlink_flags(self): + # cleanup() should not follow symlinks when fixing flags (#91133) + flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK + self.check_flags(flags) + + with self.do_create(recurse=0) as d2: + file1 = os.path.join(d2, 'file1') + open(file1, 'wb').close() + dir1 = os.path.join(d2, 'dir1') + os.mkdir(dir1) + def test(target, target_is_directory): + d1 = self.do_create(recurse=0) + symlink = os.path.join(d1.name, 'symlink') + os.symlink(target, symlink, + target_is_directory=target_is_directory) + try: + os.chflags(symlink, flags, follow_symlinks=False) + except NotImplementedError: + pass + try: + os.chflags(symlink, flags) + except FileNotFoundError: + pass + os.chflags(d1.name, flags) + d1.cleanup() + self.assertFalse(os.path.exists(d1.name)) + + with self.subTest('nonexisting file'): + test('nonexisting', target_is_directory=False) + with self.subTest('nonexisting dir'): + test('nonexisting', target_is_directory=True) + + with self.subTest('existing file'): + os.chflags(file1, flags) + old_flags = os.stat(file1).st_flags + test(file1, target_is_directory=False) + new_flags = os.stat(file1).st_flags + self.assertEqual(new_flags, old_flags) + + with self.subTest('existing dir'): + os.chflags(dir1, flags) + old_flags = os.stat(dir1).st_flags + test(dir1, target_is_directory=True) + new_flags = os.stat(dir1).st_flags + self.assertEqual(new_flags, old_flags) + @support.cpython_only def test_del_on_collection(self): # A TemporaryDirectory is deleted when garbage collected @@ -1654,9 +2005,27 @@ def test_modes(self): d.cleanup() self.assertFalse(os.path.exists(d.name)) - @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags') + def check_flags(self, flags): + # skip the test if these flags are not supported (ex: FreeBSD 13) + filename = os_helper.TESTFN + try: + open(filename, "w").close() + try: + os.chflags(filename, flags) + except OSError as exc: + # "OSError: [Errno 45] Operation not supported" + self.skipTest(f"chflags() doesn't support flags " + f"{flags:#b}: {exc}") + else: + os.chflags(filename, 0) + finally: + os_helper.unlink(filename) + + @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') def test_flags(self): flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK + self.check_flags(flags) + d = self.do_create(recurse=3, dirs=2, files=2) with d: # Change files and directories flags recursively. @@ -1667,6 +2036,11 @@ def test_flags(self): d.cleanup() self.assertFalse(os.path.exists(d.name)) + def test_delete_false(self): + with tempfile.TemporaryDirectory(delete=False) as working_dir: + pass + self.assertTrue(os.path.exists(working_dir)) + shutil.rmtree(working_dir) if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_tty.py b/Lib/test/test_tty.py new file mode 100644 index 00000000000..81d628881c6 --- /dev/null +++ b/Lib/test/test_tty.py @@ -0,0 +1,98 @@ +import os +import unittest +from test.support.import_helper import import_module + +termios = import_module('termios') +tty = import_module('tty') + + +@unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") +class TestTty(unittest.TestCase): + + def setUp(self): + master_fd, self.fd = os.openpty() + self.addCleanup(os.close, master_fd) + self.stream = self.enterContext(open(self.fd, 'wb', buffering=0)) + self.fd = self.stream.fileno() + self.mode = termios.tcgetattr(self.fd) + self.addCleanup(termios.tcsetattr, self.fd, termios.TCSANOW, self.mode) + self.addCleanup(termios.tcsetattr, self.fd, termios.TCSAFLUSH, self.mode) + + def check_cbreak(self, mode): + self.assertEqual(mode[3] & termios.ECHO, 0) + self.assertEqual(mode[3] & termios.ICANON, 0) + self.assertEqual(mode[6][termios.VMIN], 1) + self.assertEqual(mode[6][termios.VTIME], 0) + + def check_raw(self, mode): + self.check_cbreak(mode) + self.assertEqual(mode[0] & termios.ISTRIP, 0) + self.assertEqual(mode[0] & termios.ICRNL, 0) + self.assertEqual(mode[1] & termios.OPOST, 0) + self.assertEqual(mode[2] & termios.PARENB, termios.CS8 & termios.PARENB) + self.assertEqual(mode[2] & termios.CSIZE, termios.CS8 & termios.CSIZE) + self.assertEqual(mode[2] & termios.CS8, termios.CS8) + self.assertEqual(mode[3] & termios.ECHO, 0) + self.assertEqual(mode[3] & termios.ICANON, 0) + self.assertEqual(mode[3] & termios.ISIG, 0) + self.assertEqual(mode[6][termios.VMIN], 1) + self.assertEqual(mode[6][termios.VTIME], 0) + + def test_cfmakeraw(self): + mode = termios.tcgetattr(self.fd) + self.assertEqual(mode, self.mode) + tty.cfmakeraw(mode) + self.check_raw(mode) + self.assertEqual(mode[4], self.mode[4]) + self.assertEqual(mode[5], self.mode[5]) + + def test_cfmakecbreak(self): + mode = termios.tcgetattr(self.fd) + self.assertEqual(mode, self.mode) + tty.cfmakecbreak(mode) + self.check_cbreak(mode) + self.assertEqual(mode[1], self.mode[1]) + self.assertEqual(mode[2], self.mode[2]) + self.assertEqual(mode[4], self.mode[4]) + self.assertEqual(mode[5], self.mode[5]) + mode[tty.IFLAG] |= termios.ICRNL + tty.cfmakecbreak(mode) + self.assertEqual(mode[tty.IFLAG] & termios.ICRNL, termios.ICRNL, + msg="ICRNL should not be cleared by cbreak") + mode[tty.IFLAG] &= ~termios.ICRNL + tty.cfmakecbreak(mode) + self.assertEqual(mode[tty.IFLAG] & termios.ICRNL, 0, + msg="ICRNL should not be set by cbreak") + + @unittest.skip('TODO: RUSTPYTHON') + # TypeError: Expected type 'int' but 'FileIO' found. + def test_setraw(self): + mode0 = termios.tcgetattr(self.fd) + mode1 = tty.setraw(self.fd) + self.assertEqual(mode1, mode0) + mode2 = termios.tcgetattr(self.fd) + self.check_raw(mode2) + mode3 = tty.setraw(self.fd, termios.TCSANOW) + self.assertEqual(mode3, mode2) + tty.setraw(self.stream) + tty.setraw(fd=self.fd, when=termios.TCSANOW) + + @unittest.skip('TODO: RUSTPYTHON') + # TypeError: Expected type 'int' but 'FileIO' found. + def test_setcbreak(self): + mode0 = termios.tcgetattr(self.fd) + mode1 = tty.setcbreak(self.fd) + self.assertEqual(mode1, mode0) + mode2 = termios.tcgetattr(self.fd) + self.check_cbreak(mode2) + ICRNL = termios.ICRNL + self.assertEqual(mode2[tty.IFLAG] & ICRNL, mode0[tty.IFLAG] & ICRNL, + msg="ICRNL should not be altered by cbreak") + mode3 = tty.setcbreak(self.fd, termios.TCSANOW) + self.assertEqual(mode3, mode2) + tty.setcbreak(self.stream) + tty.setcbreak(fd=self.fd, when=termios.TCSANOW) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_unittest/test_discovery.py b/Lib/test/test_unittest/test_discovery.py index 9e231ff8d2d..a44b18406c0 100644 --- a/Lib/test/test_unittest/test_discovery.py +++ b/Lib/test/test_unittest/test_discovery.py @@ -364,8 +364,6 @@ def __eq__(self, other): [(loader, [], 'test*.py'), (loader, [], 'test*.py')]) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_discover(self): loader = unittest.TestLoader() @@ -412,8 +410,6 @@ def _find_tests(start_dir, pattern): self.assertEqual(_find_tests_args, [(start_dir, 'pattern')]) self.assertIn(top_level_dir, sys.path) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_discover_should_not_persist_top_level_dir_between_calls(self): original_isfile = os.path.isfile original_isdir = os.path.isdir diff --git a/Lib/test/test_unittest/test_loader.py b/Lib/test/test_unittest/test_loader.py index 9881335318e..83dd25ca546 100644 --- a/Lib/test/test_unittest/test_loader.py +++ b/Lib/test/test_unittest/test_loader.py @@ -90,8 +90,6 @@ def test_loadTestsFromTestCase__from_TestCase(self): self.assertIsInstance(suite, loader.suiteClass) self.assertEqual(list(suite), []) - # TODO: RUSTPYTHON - @unittest.expectedFailure # "Do not load any tests from `FunctionTestCase` class." def test_loadTestsFromTestCase__from_FunctionTestCase(self): loader = unittest.TestLoader() @@ -121,8 +119,6 @@ def test(self): expected = [loader.suiteClass([MyTestCase('test')])] self.assertEqual(list(suite), expected) - # TODO: RUSTPYTHON - @unittest.expectedFailure # "This test ensures that internal `TestCase` subclasses are not loaded" def test_loadTestsFromModule__TestCase_subclass_internals(self): # See https://github.com/python/cpython/issues/84867 @@ -187,8 +183,6 @@ class NotAModule(object): self.assertEqual(list(suite), reference) - # TODO: RUSTPYTHON - @unittest.expectedFailure # Check that loadTestsFromModule honors a module # with a load_tests function. def test_loadTestsFromModule__load_tests(self): diff --git a/Lib/test/test_unittest/test_program.py b/Lib/test/test_unittest/test_program.py index 8256aaeb8c3..ec1dbeb2df7 100644 --- a/Lib/test/test_unittest/test_program.py +++ b/Lib/test/test_unittest/test_program.py @@ -75,6 +75,14 @@ def testUnexpectedSuccess(self): class Empty(unittest.TestCase): pass + class SetUpClassFailure(unittest.TestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + raise Exception + def testPass(self): + pass + class TestLoader(unittest.TestLoader): """Test loader that returns a suite containing the supplied testcase.""" @@ -191,6 +199,18 @@ def test_ExitEmptySuite(self): out = stream.getvalue() self.assertIn('\nNO TESTS RAN\n', out) + def test_ExitSetUpClassFailureSuite(self): + stream = BufferedWriter() + with self.assertRaises(SystemExit) as cm: + unittest.main( + argv=["setup_class_failure"], + testRunner=unittest.TextTestRunner(stream=stream), + testLoader=self.TestLoader(self.SetUpClassFailure)) + self.assertEqual(cm.exception.code, 1) + out = stream.getvalue() + self.assertIn("ERROR: setUpClass", out) + self.assertIn("SetUpClassFailure", out) + class InitialisableProgram(unittest.TestProgram): exit = False diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py index b89e181f9f8..b047f7b06f8 100644 --- a/Lib/test/test_wsgiref.py +++ b/Lib/test/test_wsgiref.py @@ -1,6 +1,6 @@ from unittest import mock from test import support -from test.support import warnings_helper +from test.support import socket_helper from test.test_httpservers import NoLogRequestHandler from unittest import TestCase from wsgiref.util import setup_testing_defaults @@ -80,41 +80,26 @@ def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"): return out.getvalue(), err.getvalue() -def compare_generic_iter(make_it,match): - """Utility to compare a generic 2.1/2.2+ iterator with an iterable - If running under Python 2.2+, this tests the iterator using iter()/next(), - as well as __getitem__. 'make_it' must be a function returning a fresh +def compare_generic_iter(make_it, match): + """Utility to compare a generic iterator with an iterable + + This tests the iterator using iter()/next(). + 'make_it' must be a function returning a fresh iterator to be tested (since this may test the iterator twice).""" it = make_it() - n = 0 + if not iter(it) is it: + raise AssertionError for item in match: - if not it[n]==item: raise AssertionError - n+=1 + if not next(it) == item: + raise AssertionError try: - it[n] - except IndexError: + next(it) + except StopIteration: pass else: - raise AssertionError("Too many items from __getitem__",it) - - try: - iter, StopIteration - except NameError: - pass - else: - # Only test iter mode under 2.2+ - it = make_it() - if not iter(it) is it: raise AssertionError - for item in match: - if not next(it) == item: raise AssertionError - try: - next(it) - except StopIteration: - pass - else: - raise AssertionError("Too many items from .__next__()", it) + raise AssertionError("Too many items from .__next__()", it) class IntegrationTests(TestCase): @@ -152,7 +137,7 @@ def test_environ(self): def test_request_length(self): out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n") self.assertEqual(out.splitlines()[0], - b"HTTP/1.0 414 Request-URI Too Long") + b"HTTP/1.0 414 URI Too Long") def test_validated_hello(self): out, err = run_amock(validator(hello_app)) @@ -264,7 +249,7 @@ def app(environ, start_response): class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler): pass - server = make_server(support.HOST, 0, app, handler_class=WsgiHandler) + server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler) self.addCleanup(server.server_close) interrupted = threading.Event() @@ -339,7 +324,6 @@ def checkReqURI(self,uri,query=1,**kw): util.setup_testing_defaults(kw) self.assertEqual(util.request_uri(kw,query),uri) - @warnings_helper.ignore_warnings(category=DeprecationWarning) def checkFW(self,text,size,match): def make_it(text=text,size=size): @@ -358,15 +342,6 @@ def make_it(text=text,size=size): it.close() self.assertTrue(it.filelike.closed) - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_filewrapper_getitem_deprecation(self): - wrapper = util.FileWrapper(StringIO('foobar'), 3) - with self.assertWarnsRegex(DeprecationWarning, - r'Use iterator protocol instead'): - # This should have returned 'bar'. - self.assertEqual(wrapper[1], 'foo') - def testSimpleShifts(self): self.checkShift('','/', '', '/', '') self.checkShift('','/x', 'x', '/x', '') @@ -581,7 +556,7 @@ def testEnviron(self): # Test handler.environ as a dict expected = {} setup_testing_defaults(expected) - # Handler inherits os_environ variables which are not overriden + # Handler inherits os_environ variables which are not overridden # by SimpleHandler.add_cgi_vars() (SimpleHandler.base_env) for key, value in os_environ.items(): if key not in expected: @@ -821,8 +796,6 @@ def flush(self): b"Hello, world!", written) - # TODO: RUSTPYTHON - @unittest.expectedFailure def testClientConnectionTerminations(self): environ = {"SERVER_PROTOCOL": "HTTP/1.0"} for exception in ( @@ -841,8 +814,6 @@ def write(self, b): self.assertFalse(stderr.getvalue()) - # TODO: RUSTPYTHON - @unittest.expectedFailure def testDontResetInternalStateOnException(self): class CustomException(ValueError): pass diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index cf3f535b190..6b2e6c44ca4 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -1042,55 +1042,46 @@ def test_path2(self): self.assertEqual(p.add(6,8), 6+8) self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path3(self): p = xmlrpclib.ServerProxy(URL+"/is/broken") self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_invalid_path(self): p = xmlrpclib.ServerProxy(URL+"/invalid") self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path_query_fragment(self): p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag") self.assertEqual(p.test(), "/foo?k=v#frag") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path_fragment(self): p = xmlrpclib.ServerProxy(URL+"/foo#frag") self.assertEqual(p.test(), "/foo#frag") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path_query(self): p = xmlrpclib.ServerProxy(URL+"/foo?k=v") self.assertEqual(p.test(), "/foo?k=v") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_empty_path(self): p = xmlrpclib.ServerProxy(URL) self.assertEqual(p.test(), "/RPC2") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_root_path(self): p = xmlrpclib.ServerProxy(URL + "/") self.assertEqual(p.test(), "/") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_empty_path_query(self): p = xmlrpclib.ServerProxy(URL + "?k=v") self.assertEqual(p.test(), "?k=v") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_empty_path_fragment(self): p = xmlrpclib.ServerProxy(URL + "#frag") @@ -1142,7 +1133,6 @@ def test_two(self): #test special attribute access on the serverproxy, through the __call__ #function. -@unittest.skip("TODO: RUSTPYTHON, appears to hang") class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): #ask for two keepalive requests to be handled. request_count=2 diff --git a/Lib/token.py b/Lib/token.py index 493bf042650..54d7cdccadc 100644 --- a/Lib/token.py +++ b/Lib/token.py @@ -1,7 +1,8 @@ """Token constants.""" -# Auto-generated by Tools/scripts/generate_token.py +# Auto-generated by Tools/build/generate_token.py -__all__ = ['tok_name', 'ISTERMINAL', 'ISNONTERMINAL', 'ISEOF'] +__all__ = ['tok_name', 'ISTERMINAL', 'ISNONTERMINAL', 'ISEOF', + 'EXACT_TOKEN_TYPES'] ENDMARKER = 0 NAME = 1 @@ -57,17 +58,20 @@ RARROW = 51 ELLIPSIS = 52 COLONEQUAL = 53 -OP = 54 -AWAIT = 55 -ASYNC = 56 -TYPE_IGNORE = 57 -TYPE_COMMENT = 58 +EXCLAMATION = 54 +OP = 55 +TYPE_IGNORE = 56 +TYPE_COMMENT = 57 +SOFT_KEYWORD = 58 +FSTRING_START = 59 +FSTRING_MIDDLE = 60 +FSTRING_END = 61 +COMMENT = 62 +NL = 63 # These aren't used by the C tokenizer but are needed for tokenize.py -ERRORTOKEN = 59 -COMMENT = 60 -NL = 61 -ENCODING = 62 -N_TOKENS = 63 +ERRORTOKEN = 64 +ENCODING = 65 +N_TOKENS = 66 # Special definitions for cooperation with parser NT_OFFSET = 256 @@ -77,6 +81,7 @@ __all__.extend(tok_name.values()) EXACT_TOKEN_TYPES = { + '!': EXCLAMATION, '!=': NOTEQUAL, '%': PERCENT, '%=': PERCENTEQUAL, diff --git a/Lib/tty.py b/Lib/tty.py index a72eb675545..5a49e040042 100644 --- a/Lib/tty.py +++ b/Lib/tty.py @@ -4,9 +4,9 @@ from termios import * -__all__ = ["setraw", "setcbreak"] +__all__ = ["cfmakeraw", "cfmakecbreak", "setraw", "setcbreak"] -# Indexes for termios list. +# Indices for termios list. IFLAG = 0 OFLAG = 1 CFLAG = 2 @@ -15,22 +15,59 @@ OSPEED = 5 CC = 6 -def setraw(fd, when=TCSAFLUSH): - """Put terminal into a raw mode.""" - mode = tcgetattr(fd) - mode[IFLAG] = mode[IFLAG] & ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON) - mode[OFLAG] = mode[OFLAG] & ~(OPOST) - mode[CFLAG] = mode[CFLAG] & ~(CSIZE | PARENB) - mode[CFLAG] = mode[CFLAG] | CS8 - mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON | IEXTEN | ISIG) +def cfmakeraw(mode): + """Make termios mode raw.""" + # Clear all POSIX.1-2017 input mode flags. + # See chapter 11 "General Terminal Interface" + # of POSIX.1-2017 Base Definitions. + mode[IFLAG] &= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP | + INLCR | IGNCR | ICRNL | IXON | IXANY | IXOFF) + + # Do not post-process output. + mode[OFLAG] &= ~OPOST + + # Disable parity generation and detection; clear character size mask; + # let character size be 8 bits. + mode[CFLAG] &= ~(PARENB | CSIZE) + mode[CFLAG] |= CS8 + + # Clear all POSIX.1-2017 local mode flags. + mode[LFLAG] &= ~(ECHO | ECHOE | ECHOK | ECHONL | ICANON | + IEXTEN | ISIG | NOFLSH | TOSTOP) + + # POSIX.1-2017, 11.1.7 Non-Canonical Mode Input Processing, + # Case B: MIN>0, TIME=0 + # A pending read shall block until MIN (here 1) bytes are received, + # or a signal is received. + mode[CC] = list(mode[CC]) mode[CC][VMIN] = 1 mode[CC][VTIME] = 0 - tcsetattr(fd, when, mode) -def setcbreak(fd, when=TCSAFLUSH): - """Put terminal into a cbreak mode.""" - mode = tcgetattr(fd) - mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON) +def cfmakecbreak(mode): + """Make termios mode cbreak.""" + # Do not echo characters; disable canonical input. + mode[LFLAG] &= ~(ECHO | ICANON) + + # POSIX.1-2017, 11.1.7 Non-Canonical Mode Input Processing, + # Case B: MIN>0, TIME=0 + # A pending read shall block until MIN (here 1) bytes are received, + # or a signal is received. + mode[CC] = list(mode[CC]) mode[CC][VMIN] = 1 mode[CC][VTIME] = 0 - tcsetattr(fd, when, mode) + +def setraw(fd, when=TCSAFLUSH): + """Put terminal into raw mode.""" + mode = tcgetattr(fd) + new = list(mode) + cfmakeraw(new) + tcsetattr(fd, when, new) + return mode + +def setcbreak(fd, when=TCSAFLUSH): + """Put terminal into cbreak mode.""" + mode = tcgetattr(fd) + new = list(mode) + cfmakecbreak(new) + tcsetattr(fd, when, new) + return mode diff --git a/Lib/unittest/__init__.py b/Lib/unittest/__init__.py index 7ab3f5e87b4..db3beee2401 100644 --- a/Lib/unittest/__init__.py +++ b/Lib/unittest/__init__.py @@ -51,10 +51,6 @@ def testMultiply(self): 'registerResult', 'removeResult', 'removeHandler', 'addModuleCleanup', 'doModuleCleanups', 'enterModuleContext'] -# Expose obsolete functions for backwards compatibility -# bpo-5846: Deprecated in Python 3.11, scheduled for removal in Python 3.13. -__all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases']) - __unittest = True from .result import TestResult @@ -67,20 +63,6 @@ def testMultiply(self): from .runner import TextTestRunner, TextTestResult from .signals import installHandler, registerResult, removeResult, removeHandler # IsolatedAsyncioTestCase will be imported lazily. -from .loader import makeSuite, getTestCaseNames, findTestCases - -# deprecated -_TextTestResult = TextTestResult - - -# There are no tests here, so don't try to run anything discovered from -# introspecting the symbols (e.g. FunctionTestCase). Instead, all our -# tests come from within unittest.test. -def load_tests(loader, tests, pattern): - import os.path - # top level directory cached on loader instance - this_dir = os.path.dirname(__file__) - return loader.discover(start_dir=this_dir, pattern=pattern) # Lazy import of IsolatedAsyncioTestCase from .async_case diff --git a/Lib/unittest/async_case.py b/Lib/unittest/async_case.py index bd2a4711560..e761ba7e53c 100644 --- a/Lib/unittest/async_case.py +++ b/Lib/unittest/async_case.py @@ -5,6 +5,7 @@ from .case import TestCase +__unittest = True class IsolatedAsyncioTestCase(TestCase): # Names intentionally have a long prefix @@ -25,12 +26,15 @@ class IsolatedAsyncioTestCase(TestCase): # them inside the same task. # Note: the test case modifies event loop policy if the policy was not instantiated - # yet. + # yet, unless loop_factory=asyncio.EventLoop is set. # asyncio.get_event_loop_policy() creates a default policy on demand but never # returns None # I believe this is not an issue in user level tests but python itself for testing # should reset a policy in every test module # by calling asyncio.set_event_loop_policy(None) in tearDownModule() + # or set loop_factory=asyncio.EventLoop + + loop_factory = None def __init__(self, methodName='runTest'): super().__init__(methodName) @@ -118,7 +122,7 @@ def _callMaybeAsync(self, func, /, *args, **kwargs): def _setupAsyncioRunner(self): assert self._asyncioRunner is None, 'asyncio runner is already initialized' - runner = asyncio.Runner(debug=True) + runner = asyncio.Runner(debug=True, loop_factory=self.loop_factory) self._asyncioRunner = runner def _tearDownAsyncioRunner(self): diff --git a/Lib/unittest/loader.py b/Lib/unittest/loader.py index 7e6ce2f224b..22797b83a68 100644 --- a/Lib/unittest/loader.py +++ b/Lib/unittest/loader.py @@ -6,7 +6,6 @@ import traceback import types import functools -import warnings from fnmatch import fnmatch, fnmatchcase @@ -57,9 +56,7 @@ def testSkipped(self): TestClass = type("ModuleSkipped", (case.TestCase,), attrs) return suiteClass((TestClass(methodname),)) -def _jython_aware_splitext(path): - if path.lower().endswith('$py.class'): - return path[:-9] +def _splitext(path): return os.path.splitext(path)[0] @@ -87,40 +84,26 @@ def loadTestsFromTestCase(self, testCaseClass): raise TypeError("Test cases should not be derived from " "TestSuite. Maybe you meant to derive from " "TestCase?") - testCaseNames = self.getTestCaseNames(testCaseClass) - if not testCaseNames and hasattr(testCaseClass, 'runTest'): - testCaseNames = ['runTest'] + if testCaseClass in (case.TestCase, case.FunctionTestCase): + # We don't load any tests from base types that should not be loaded. + testCaseNames = [] + else: + testCaseNames = self.getTestCaseNames(testCaseClass) + if not testCaseNames and hasattr(testCaseClass, 'runTest'): + testCaseNames = ['runTest'] loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames)) return loaded_suite - # XXX After Python 3.5, remove backward compatibility hacks for - # use_load_tests deprecation via *args and **kws. See issue 16662. - def loadTestsFromModule(self, module, *args, pattern=None, **kws): + def loadTestsFromModule(self, module, *, pattern=None): """Return a suite of all test cases contained in the given module""" - # This method used to take an undocumented and unofficial - # use_load_tests argument. For backward compatibility, we still - # accept the argument (which can also be the first position) but we - # ignore it and issue a deprecation warning if it's present. - if len(args) > 0 or 'use_load_tests' in kws: - warnings.warn('use_load_tests is deprecated and ignored', - DeprecationWarning) - kws.pop('use_load_tests', None) - if len(args) > 1: - # Complain about the number of arguments, but don't forget the - # required `module` argument. - complaint = len(args) + 1 - raise TypeError('loadTestsFromModule() takes 1 positional argument but {} were given'.format(complaint)) - if len(kws) != 0: - # Since the keyword arguments are unsorted (see PEP 468), just - # pick the alphabetically sorted first argument to complain about, - # if multiple were given. At least the error message will be - # predictable. - complaint = sorted(kws)[0] - raise TypeError("loadTestsFromModule() got an unexpected keyword argument '{}'".format(complaint)) tests = [] for name in dir(module): obj = getattr(module, name) - if isinstance(obj, type) and issubclass(obj, case.TestCase): + if ( + isinstance(obj, type) + and issubclass(obj, case.TestCase) + and obj not in (case.TestCase, case.FunctionTestCase) + ): tests.append(self.loadTestsFromTestCase(obj)) load_tests = getattr(module, 'load_tests', None) @@ -189,7 +172,11 @@ def loadTestsFromName(self, name, module=None): if isinstance(obj, types.ModuleType): return self.loadTestsFromModule(obj) - elif isinstance(obj, type) and issubclass(obj, case.TestCase): + elif ( + isinstance(obj, type) + and issubclass(obj, case.TestCase) + and obj not in (case.TestCase, case.FunctionTestCase) + ): return self.loadTestsFromTestCase(obj) elif (isinstance(obj, types.FunctionType) and isinstance(parent, type) and @@ -267,6 +254,7 @@ def discover(self, start_dir, pattern='test*.py', top_level_dir=None): Paths are sorted before being imported to ensure reproducible execution order even on filesystems with non-alphabetical ordering like ext3/4. """ + original_top_level_dir = self._top_level_dir set_implicit_top = False if top_level_dir is None and self._top_level_dir is not None: # make top_level_dir optional if called from load_tests in a package @@ -320,6 +308,7 @@ def discover(self, start_dir, pattern='test*.py', top_level_dir=None): raise ImportError('Start directory is not importable: %r' % start_dir) tests = list(self._find_tests(start_dir, pattern)) + self._top_level_dir = original_top_level_dir return self.suiteClass(tests) def _get_directory_containing_module(self, module_name): @@ -337,7 +326,7 @@ def _get_directory_containing_module(self, module_name): def _get_name_from_path(self, path): if path == self._top_level_dir: return '.' - path = _jython_aware_splitext(os.path.normpath(path)) + path = _splitext(os.path.normpath(path)) _relpath = os.path.relpath(path, self._top_level_dir) assert not os.path.isabs(_relpath), "Path must be within the project" @@ -415,13 +404,13 @@ def _find_test_path(self, full_path, pattern): else: mod_file = os.path.abspath( getattr(module, '__file__', full_path)) - realpath = _jython_aware_splitext( + realpath = _splitext( os.path.realpath(mod_file)) - fullpath_noext = _jython_aware_splitext( + fullpath_noext = _splitext( os.path.realpath(full_path)) if realpath.lower() != fullpath_noext.lower(): module_dir = os.path.dirname(realpath) - mod_name = _jython_aware_splitext( + mod_name = _splitext( os.path.basename(full_path)) expected_dir = os.path.dirname(full_path) msg = ("%r module incorrectly imported from %r. Expected " @@ -462,47 +451,3 @@ def _find_test_path(self, full_path, pattern): defaultTestLoader = TestLoader() - - -# These functions are considered obsolete for long time. -# They will be removed in Python 3.13. - -def _makeLoader(prefix, sortUsing, suiteClass=None, testNamePatterns=None): - loader = TestLoader() - loader.sortTestMethodsUsing = sortUsing - loader.testMethodPrefix = prefix - loader.testNamePatterns = testNamePatterns - if suiteClass: - loader.suiteClass = suiteClass - return loader - -def getTestCaseNames(testCaseClass, prefix, sortUsing=util.three_way_cmp, testNamePatterns=None): - import warnings - warnings.warn( - "unittest.getTestCaseNames() is deprecated and will be removed in Python 3.13. " - "Please use unittest.TestLoader.getTestCaseNames() instead.", - DeprecationWarning, stacklevel=2 - ) - return _makeLoader(prefix, sortUsing, testNamePatterns=testNamePatterns).getTestCaseNames(testCaseClass) - -def makeSuite(testCaseClass, prefix='test', sortUsing=util.three_way_cmp, - suiteClass=suite.TestSuite): - import warnings - warnings.warn( - "unittest.makeSuite() is deprecated and will be removed in Python 3.13. " - "Please use unittest.TestLoader.loadTestsFromTestCase() instead.", - DeprecationWarning, stacklevel=2 - ) - return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase( - testCaseClass) - -def findTestCases(module, prefix='test', sortUsing=util.three_way_cmp, - suiteClass=suite.TestSuite): - import warnings - warnings.warn( - "unittest.findTestCases() is deprecated and will be removed in Python 3.13. " - "Please use unittest.TestLoader.loadTestsFromModule() instead.", - DeprecationWarning, stacklevel=2 - ) - return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromModule(\ - module) diff --git a/Lib/unittest/main.py b/Lib/unittest/main.py index c3869de3f6f..a0cd8a9f7ea 100644 --- a/Lib/unittest/main.py +++ b/Lib/unittest/main.py @@ -269,12 +269,12 @@ def runTests(self): testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: - if self.result.testsRun == 0 and len(self.result.skipped) == 0: + if not self.result.wasSuccessful(): + sys.exit(1) + elif self.result.testsRun == 0 and len(self.result.skipped) == 0: sys.exit(_NO_TESTS_EXITCODE) - elif self.result.wasSuccessful(): - sys.exit(0) else: - sys.exit(1) + sys.exit(0) main = TestProgram diff --git a/Lib/weakref.py b/Lib/weakref.py index 994ea8aa37d..25b70927e29 100644 --- a/Lib/weakref.py +++ b/Lib/weakref.py @@ -2,7 +2,7 @@ This module is an implementation of PEP 205: -https://www.python.org/dev/peps/pep-0205/ +https://peps.python.org/pep-0205/ """ # Naming convention: Variables named "wr" are weak reference objects; @@ -33,7 +33,6 @@ "WeakSet", "WeakMethod", "finalize"] -_collections_abc.Set.register(WeakSet) _collections_abc.MutableSet.register(WeakSet) class WeakMethod(ref): diff --git a/Lib/wsgiref/__init__.py b/Lib/wsgiref/__init__.py index 1efbba01a30..59ee48fddec 100644 --- a/Lib/wsgiref/__init__.py +++ b/Lib/wsgiref/__init__.py @@ -13,6 +13,8 @@ * validate -- validation wrapper that sits between an app and a server to detect errors in either +* types -- collection of WSGI-related types for static type checking + To-Do: * cgi_gateway -- Run WSGI apps under CGI (pending a deployment standard) diff --git a/Lib/wsgiref/handlers.py b/Lib/wsgiref/handlers.py index f4300b831a4..cafe872c7aa 100644 --- a/Lib/wsgiref/handlers.py +++ b/Lib/wsgiref/handlers.py @@ -136,6 +136,10 @@ def run(self, application): self.setup_environ() self.result = application(self.environ, self.start_response) self.finish_response() + except (ConnectionAbortedError, BrokenPipeError, ConnectionResetError): + # We expect the client to close the connection abruptly from time + # to time. + return except: try: self.handle_error() @@ -179,7 +183,16 @@ def finish_response(self): for data in self.result: self.write(data) self.finish_content() - finally: + except: + # Call close() on the iterable returned by the WSGI application + # in case of an exception. + if hasattr(self.result, 'close'): + self.result.close() + raise + else: + # We only call close() when no exception is raised, because it + # will set status, result, headers, and environ fields to None. + # See bpo-29183 for more details. self.close() @@ -215,8 +228,7 @@ def start_response(self, status, headers,exc_info=None): if exc_info: try: if self.headers_sent: - # Re-raise original exception if headers sent - raise exc_info[0](exc_info[1]).with_traceback(exc_info[2]) + raise finally: exc_info = None # avoid dangling circular ref elif self.headers is not None: @@ -225,18 +237,25 @@ def start_response(self, status, headers,exc_info=None): self.status = status self.headers = self.headers_class(headers) status = self._convert_string_type(status, "Status") - assert len(status)>=4,"Status must be at least 4 characters" - assert status[:3].isdigit(), "Status message must begin w/3-digit code" - assert status[3]==" ", "Status message must have a space after code" + self._validate_status(status) if __debug__: for name, val in headers: name = self._convert_string_type(name, "Header name") val = self._convert_string_type(val, "Header value") - assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed" + assert not is_hop_by_hop(name),\ + f"Hop-by-hop header, '{name}: {val}', not allowed" return self.write + def _validate_status(self, status): + if len(status) < 4: + raise AssertionError("Status must be at least 4 characters") + if not status[:3].isdigit(): + raise AssertionError("Status message must begin w/3-digit code") + if status[3] != " ": + raise AssertionError("Status message must have a space after code") + def _convert_string_type(self, value, title): """Convert/check value type.""" if type(value) is str: @@ -456,10 +475,7 @@ def _write(self,data): from warnings import warn warn("SimpleHandler.stdout.write() should not do partial writes", DeprecationWarning) - while True: - data = data[result:] - if not data: - break + while data := data[result:]: result = self.stdout.write(data) def _flush(self): diff --git a/Lib/wsgiref/simple_server.py b/Lib/wsgiref/simple_server.py index f71563a5ae0..a0f2397fcf0 100644 --- a/Lib/wsgiref/simple_server.py +++ b/Lib/wsgiref/simple_server.py @@ -84,10 +84,6 @@ def get_environ(self): env['PATH_INFO'] = urllib.parse.unquote(path, 'iso-8859-1') env['QUERY_STRING'] = query - - host = self.address_string() - if host != self.client_address[0]: - env['REMOTE_HOST'] = host env['REMOTE_ADDR'] = self.client_address[0] if self.headers.get('content-type') is None: @@ -127,7 +123,8 @@ def handle(self): return handler = ServerHandler( - self.rfile, self.wfile, self.get_stderr(), self.get_environ() + self.rfile, self.wfile, self.get_stderr(), self.get_environ(), + multithread=False, ) handler.request_handler = self # backpointer for logging handler.run(self.server.get_app()) diff --git a/Lib/wsgiref/types.py b/Lib/wsgiref/types.py new file mode 100644 index 00000000000..ef0aead5b28 --- /dev/null +++ b/Lib/wsgiref/types.py @@ -0,0 +1,54 @@ +"""WSGI-related types for static type checking""" + +from collections.abc import Callable, Iterable, Iterator +from types import TracebackType +from typing import Any, Protocol, TypeAlias + +__all__ = [ + "StartResponse", + "WSGIEnvironment", + "WSGIApplication", + "InputStream", + "ErrorStream", + "FileWrapper", +] + +_ExcInfo: TypeAlias = tuple[type[BaseException], BaseException, TracebackType] +_OptExcInfo: TypeAlias = _ExcInfo | tuple[None, None, None] + +class StartResponse(Protocol): + """start_response() callable as defined in PEP 3333""" + def __call__( + self, + status: str, + headers: list[tuple[str, str]], + exc_info: _OptExcInfo | None = ..., + /, + ) -> Callable[[bytes], object]: ... + +WSGIEnvironment: TypeAlias = dict[str, Any] +WSGIApplication: TypeAlias = Callable[[WSGIEnvironment, StartResponse], + Iterable[bytes]] + +class InputStream(Protocol): + """WSGI input stream as defined in PEP 3333""" + def read(self, size: int = ..., /) -> bytes: ... + def readline(self, size: int = ..., /) -> bytes: ... + def readlines(self, hint: int = ..., /) -> list[bytes]: ... + def __iter__(self) -> Iterator[bytes]: ... + +class ErrorStream(Protocol): + """WSGI error stream as defined in PEP 3333""" + def flush(self) -> object: ... + def write(self, s: str, /) -> object: ... + def writelines(self, seq: list[str], /) -> object: ... + +class _Readable(Protocol): + def read(self, size: int = ..., /) -> bytes: ... + # Optional: def close(self) -> object: ... + +class FileWrapper(Protocol): + """WSGI file wrapper as defined in PEP 3333""" + def __call__( + self, file: _Readable, block_size: int = ..., /, + ) -> Iterable[bytes]: ... diff --git a/Lib/wsgiref/util.py b/Lib/wsgiref/util.py index 516fe898d01..63b92331737 100644 --- a/Lib/wsgiref/util.py +++ b/Lib/wsgiref/util.py @@ -4,7 +4,7 @@ __all__ = [ 'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri', - 'shift_path_info', 'setup_testing_defaults', + 'shift_path_info', 'setup_testing_defaults', 'is_hop_by_hop', ] @@ -17,12 +17,6 @@ def __init__(self, filelike, blksize=8192): if hasattr(filelike,'close'): self.close = filelike.close - def __getitem__(self,key): - data = self.filelike.read(self.blksize) - if data: - return data - raise IndexError - def __iter__(self): return self @@ -155,9 +149,9 @@ def setup_testing_defaults(environ): _hoppish = { - 'connection':1, 'keep-alive':1, 'proxy-authenticate':1, - 'proxy-authorization':1, 'te':1, 'trailers':1, 'transfer-encoding':1, - 'upgrade':1 + 'connection', 'keep-alive', 'proxy-authenticate', + 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', + 'upgrade' }.__contains__ def is_hop_by_hop(header_name): diff --git a/Lib/wsgiref/validate.py b/Lib/wsgiref/validate.py index 6107dcd7a4d..1a1853cd63a 100644 --- a/Lib/wsgiref/validate.py +++ b/Lib/wsgiref/validate.py @@ -1,6 +1,6 @@ # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) -# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php -# Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php +# Licensed under the MIT license: https://opensource.org/licenses/mit-license.php +# Also licenced under the Apache License, 2.0: https://opensource.org/licenses/apache2.0.php # Licensed to PSF under a Contributor Agreement """ Middleware to check for obedience to the WSGI specification. @@ -77,7 +77,7 @@ * That wsgi.input is used properly: - - .read() is called with zero or one argument + - .read() is called with exactly one argument - That it returns a string @@ -137,7 +137,7 @@ def validator(application): """ When applied between a WSGI server and a WSGI application, this - middleware will check for WSGI compliancy on a number of levels. + middleware will check for WSGI compliance on a number of levels. This middleware does not modify the request or response in any way, but will raise an AssertionError if anything seems off (except for a failure to close the application iterator, which @@ -214,10 +214,7 @@ def readlines(self, *args): return lines def __iter__(self): - while 1: - line = self.readline() - if not line: - return + while line := self.readline(): yield line def close(self): @@ -390,7 +387,6 @@ def check_headers(headers): assert_(type(headers) is list, "Headers (%r) must be of type list: %r" % (headers, type(headers))) - header_names = {} for item in headers: assert_(type(item) is tuple, "Individual headers (%r) must be of type tuple: %r" @@ -403,7 +399,6 @@ def check_headers(headers): "The Status header cannot be used; it conflicts with CGI " "script, and HTTP status is not given through headers " "(value: %r)." % value) - header_names[name.lower()] = None assert_('\n' not in name and ':' not in name, "Header names may not contain ':' or '\\n': %r" % name) assert_(header_re.search(name), "Bad header name: %r" % name) diff --git a/Lib/xmlrpc/client.py b/Lib/xmlrpc/client.py index a614cef6ab2..f441376d09c 100644 --- a/Lib/xmlrpc/client.py +++ b/Lib/xmlrpc/client.py @@ -245,41 +245,15 @@ def __repr__(self): ## # Backwards compatibility - boolean = Boolean = bool -## -# Wrapper for XML-RPC DateTime values. This converts a time value to -# the format used by XML-RPC. -#
-# The value can be given as a datetime object, as a string in the -# format "yyyymmddThh:mm:ss", as a 9-item time tuple (as returned by -# time.localtime()), or an integer value (as returned by time.time()). -# The wrapper uses time.localtime() to convert an integer to a time -# tuple. -# -# @param value The time, given as a datetime object, an ISO 8601 string, -# a time tuple, or an integer time value. - -# Issue #13305: different format codes across platforms -_day0 = datetime(1, 1, 1) -def _try(fmt): - try: - return _day0.strftime(fmt) == '0001' - except ValueError: - return False -if _try('%Y'): # Mac OS X - def _iso8601_format(value): - return value.strftime("%Y%m%dT%H:%M:%S") -elif _try('%4Y'): # Linux - def _iso8601_format(value): - return value.strftime("%4Y%m%dT%H:%M:%S") -else: - def _iso8601_format(value): - return value.strftime("%Y%m%dT%H:%M:%S").zfill(17) -del _day0 -del _try +def _iso8601_format(value): + if value.tzinfo is not None: + # XML-RPC only uses the naive portion of the datetime + value = value.replace(tzinfo=None) + # XML-RPC doesn't use '-' separator in the date part + return value.isoformat(timespec='seconds').replace('-', '') def _strftime(value): @@ -850,9 +824,9 @@ def __init__(self, results): def __getitem__(self, i): item = self.results[i] - if type(item) == type({}): + if isinstance(item, dict): raise Fault(item['faultCode'], item['faultString']) - elif type(item) == type([]): + elif isinstance(item, list): return item[0] else: raise ValueError("unexpected type in multicall result") @@ -1339,10 +1313,7 @@ def parse_response(self, response): p, u = self.getparser() - while 1: - data = stream.read(1024) - if not data: - break + while data := stream.read(1024): if self.verbose: print("body:", repr(data)) p.feed(data) diff --git a/Lib/xmlrpc/server.py b/Lib/xmlrpc/server.py index 69a260f5b12..4dddb1d10e0 100644 --- a/Lib/xmlrpc/server.py +++ b/Lib/xmlrpc/server.py @@ -268,17 +268,11 @@ def _marshaled_dispatch(self, data, dispatch_method = None, path = None): except Fault as fault: response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) - except: - # report exception back to server - exc_type, exc_value, exc_tb = sys.exc_info() - try: - response = dumps( - Fault(1, "%s:%s" % (exc_type, exc_value)), - encoding=self.encoding, allow_none=self.allow_none, - ) - finally: - # Break reference cycle - exc_type = exc_value = exc_tb = None + except BaseException as exc: + response = dumps( + Fault(1, "%s:%s" % (type(exc), exc)), + encoding=self.encoding, allow_none=self.allow_none, + ) return response.encode(self.encoding, 'xmlcharrefreplace') @@ -368,16 +362,11 @@ def system_multicall(self, call_list): {'faultCode' : fault.faultCode, 'faultString' : fault.faultString} ) - except: - exc_type, exc_value, exc_tb = sys.exc_info() - try: - results.append( - {'faultCode' : 1, - 'faultString' : "%s:%s" % (exc_type, exc_value)} - ) - finally: - # Break reference cycle - exc_type = exc_value = exc_tb = None + except BaseException as exc: + results.append( + {'faultCode' : 1, + 'faultString' : "%s:%s" % (type(exc), exc)} + ) return results def _dispatch(self, method, params): @@ -440,7 +429,7 @@ class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): # Class attribute listing the accessible path components; # paths not on this list will result in a 404 error. - rpc_paths = ('/', '/RPC2') + rpc_paths = ('/', '/RPC2', '/pydoc.css') #if not None, encode responses larger than this, if possible encode_threshold = 1400 #a common MTU @@ -634,19 +623,14 @@ def _marshaled_dispatch(self, data, dispatch_method = None, path = None): try: response = self.dispatchers[path]._marshaled_dispatch( data, dispatch_method, path) - except: + except BaseException as exc: # report low level exception back to server # (each dispatcher should have handled their own # exceptions) - exc_type, exc_value = sys.exc_info()[:2] - try: - response = dumps( - Fault(1, "%s:%s" % (exc_type, exc_value)), - encoding=self.encoding, allow_none=self.allow_none) - response = response.encode(self.encoding, 'xmlcharrefreplace') - finally: - # Break reference cycle - exc_type = exc_value = None + response = dumps( + Fault(1, "%s:%s" % (type(exc), exc)), + encoding=self.encoding, allow_none=self.allow_none) + response = response.encode(self.encoding, 'xmlcharrefreplace') return response class CGIXMLRPCRequestHandler(SimpleXMLRPCDispatcher): @@ -736,9 +720,7 @@ def markup(self, text, escape=None, funcs={}, classes={}, methods={}): r'RFC[- ]?(\d+)|' r'PEP[- ]?(\d+)|' r'(self\.)?((?:\w|\.)+))\b') - while 1: - match = pattern.search(text, here) - if not match: break + while match := pattern.search(text, here): start, end = match.span() results.append(escape(text[here:start])) @@ -747,10 +729,10 @@ def markup(self, text, escape=None, funcs={}, classes={}, methods={}): url = escape(all).replace('"', '"') results.append('%s' % (url, url)) elif rfc: - url = 'http://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) + url = 'https://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) results.append('%s' % (url, escape(all))) elif pep: - url = 'https://www.python.org/dev/peps/pep-%04d/' % int(pep) + url = 'https://peps.python.org/pep-%04d/' % int(pep) results.append('%s' % (url, escape(all))) elif text[end:end+1] == '(': results.append(self.namelink(name, methods, funcs, classes)) @@ -801,7 +783,7 @@ def docserver(self, server_name, package_documentation, methods): server_name = self.escape(server_name) head = '%s' % server_name - result = self.heading(head, '#ffffff', '#7799ee') + result = self.heading(head) doc = self.markup(package_documentation, self.preformat, fdict) doc = doc and '%s' % doc @@ -812,10 +794,25 @@ def docserver(self, server_name, package_documentation, methods): for key, value in method_items: contents.append(self.docroutine(value, key, funcs=fdict)) result = result + self.bigsection( - 'Methods', '#ffffff', '#eeaa77', ''.join(contents)) + 'Methods', 'functions', ''.join(contents)) return result + + def page(self, title, contents): + """Format an HTML page.""" + css_path = "/pydoc.css" + css_link = ( + '' % + css_path) + return '''\ + + +
+ +