diff --git a/Lib/tempfile.py b/Lib/tempfile.py index 3aceb3f70fd..8036e93cd6d 100644 --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -46,7 +46,6 @@ import sys as _sys import types as _types import weakref as _weakref - import _thread _allocate_lock = _thread.allocate_lock @@ -181,7 +180,7 @@ def _candidate_tempdir_list(): return dirlist -def _get_default_tempdir(): +def _get_default_tempdir(dirlist=None): """Calculate the default directory to use for temporary files. This routine should be called exactly once. @@ -191,7 +190,8 @@ def _get_default_tempdir(): service, the name of the test file must be randomized.""" namer = _RandomNameSequence() - dirlist = _candidate_tempdir_list() + if dirlist is None: + dirlist = _candidate_tempdir_list() for dir in dirlist: if dir != _os.curdir: @@ -204,8 +204,7 @@ def _get_default_tempdir(): fd = _os.open(filename, _bin_openflags, 0o600) try: try: - with _io.open(fd, 'wb', closefd=False) as fp: - fp.write(b'blat') + _os.write(fd, b'blat') finally: _os.close(fd) finally: @@ -245,6 +244,7 @@ def _get_candidate_names(): def _mkstemp_inner(dir, pre, suf, flags, output_type): """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" + dir = _os.path.abspath(dir) names = _get_candidate_names() if output_type is bytes: names = map(_os.fsencode, names) @@ -265,11 +265,27 @@ def _mkstemp_inner(dir, pre, suf, flags, output_type): continue else: raise - return (fd, _os.path.abspath(file)) + return fd, file raise FileExistsError(_errno.EEXIST, "No usable temporary file name found") +def _dont_follow_symlinks(func, path, *args): + # Pass follow_symlinks=False, unless not supported on this platform. + if func in _os.supports_follow_symlinks: + func(path, *args, follow_symlinks=False) + elif not _os.path.islink(path): + func(path, *args) + +def _resetperms(path): + try: + chflags = _os.chflags + except AttributeError: + pass + else: + _dont_follow_symlinks(chflags, path, 0) + _dont_follow_symlinks(_os.chmod, path, 0o700) + # User visible interfaces. @@ -377,7 +393,7 @@ def mkdtemp(suffix=None, prefix=None, dir=None): continue else: raise - return file + return _os.path.abspath(file) raise FileExistsError(_errno.EEXIST, "No usable temporary directory name found") @@ -419,42 +435,42 @@ class _TemporaryFileCloser: underlying file object, without adding a __del__ method to the temporary file.""" - file = None # Set here since __del__ checks it + cleanup_called = False close_called = False - def __init__(self, file, name, delete=True): + def __init__(self, file, name, delete=True, delete_on_close=True): self.file = file self.name = name self.delete = delete + self.delete_on_close = delete_on_close - # NT provides delete-on-close as a primitive, so we don't need - # the wrapper to do anything special. We still use it so that - # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile. - if _os.name != 'nt': - # Cache the unlinker so we don't get spurious errors at - # shutdown when the module-level "os" is None'd out. Note - # that this must be referenced as self.unlink, because the - # name TemporaryFileWrapper may also get None'd out before - # __del__ is called. - - def close(self, unlink=_os.unlink): - if not self.close_called and self.file is not None: - self.close_called = True - try: + def cleanup(self, windows=(_os.name == 'nt'), unlink=_os.unlink): + if not self.cleanup_called: + self.cleanup_called = True + try: + if not self.close_called: + self.close_called = True self.file.close() - finally: - if self.delete: + finally: + # Windows provides delete-on-close as a primitive, in which + # case the file was deleted by self.file.close(). + if self.delete and not (windows and self.delete_on_close): + try: unlink(self.name) + except FileNotFoundError: + pass - # Need to ensure the file is deleted on __del__ - def __del__(self): - self.close() - - else: - def close(self): - if not self.close_called: - self.close_called = True + def close(self): + if not self.close_called: + self.close_called = True + try: self.file.close() + finally: + if self.delete and self.delete_on_close: + self.cleanup() + + def __del__(self): + self.cleanup() class _TemporaryFileWrapper: @@ -465,11 +481,11 @@ class _TemporaryFileWrapper: remove the file when it is no longer needed. """ - def __init__(self, file, name, delete=True): + def __init__(self, file, name, delete=True, delete_on_close=True): self.file = file self.name = name - self.delete = delete - self._closer = _TemporaryFileCloser(file, name, delete) + self._closer = _TemporaryFileCloser(file, name, delete, + delete_on_close) def __getattr__(self, name): # Attribute lookups are delegated to the underlying file @@ -500,7 +516,7 @@ def __enter__(self): # deleted when used in a with statement def __exit__(self, exc, value, tb): result = self.file.__exit__(exc, value, tb) - self.close() + self._closer.cleanup() return result def close(self): @@ -519,10 +535,10 @@ def __iter__(self): for line in self.file: yield line - def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, newline=None, suffix=None, prefix=None, - dir=None, delete=True, *, errors=None): + dir=None, delete=True, *, errors=None, + delete_on_close=True): """Create and return a temporary file. Arguments: 'prefix', 'suffix', 'dir' -- as for mkstemp. @@ -530,13 +546,20 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, 'buffering' -- the buffer size argument to io.open (default -1). 'encoding' -- the encoding argument to io.open (default None) 'newline' -- the newline argument to io.open (default None) - 'delete' -- whether the file is deleted on close (default True). + 'delete' -- whether the file is automatically deleted (default True). + 'delete_on_close' -- if 'delete', whether the file is deleted on close + (default True) or otherwise either on context manager exit + (if context manager was used) or on object finalization. . 'errors' -- the errors argument to io.open (default None) The file is created as mkstemp() would do it. Returns an object with a file-like interface; the name of the file is accessible as its 'name' attribute. The file will be automatically deleted when it is closed unless the 'delete' argument is set to False. + + On POSIX, NamedTemporaryFiles cannot be automatically deleted if + the creating process is terminated abruptly with a SIGKILL signal. + Windows can delete the file even in this case. """ prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) @@ -545,21 +568,33 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, # Setting O_TEMPORARY in the flags causes the OS to delete # the file when it is closed. This is only supported by Windows. - if _os.name == 'nt' and delete: + if _os.name == 'nt' and delete and delete_on_close: flags |= _os.O_TEMPORARY if "b" not in mode: encoding = _io.text_encoding(encoding) - (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type) + name = None + def opener(*args): + nonlocal name + fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type) + return fd try: - file = _io.open(fd, mode, buffering=buffering, - newline=newline, encoding=encoding, errors=errors) - - return _TemporaryFileWrapper(file, name, delete) - except BaseException: - _os.unlink(name) - _os.close(fd) + file = _io.open(dir, mode, buffering=buffering, + newline=newline, encoding=encoding, errors=errors, + opener=opener) + try: + raw = getattr(file, 'buffer', file) + raw = getattr(raw, 'raw', raw) + raw.name = name + return _TemporaryFileWrapper(file, name, delete, delete_on_close) + except: + file.close() + raise + except: + if name is not None and not ( + _os.name == 'nt' and delete and delete_on_close): + _os.unlink(name) raise if _os.name != 'posix' or _sys.platform == 'cygwin': @@ -598,9 +633,20 @@ def TemporaryFile(mode='w+b', buffering=-1, encoding=None, flags = _bin_openflags if _O_TMPFILE_WORKS: - try: + fd = None + def opener(*args): + nonlocal fd flags2 = (flags | _os.O_TMPFILE) & ~_os.O_CREAT fd = _os.open(dir, flags2, 0o600) + return fd + try: + file = _io.open(dir, mode, buffering=buffering, + newline=newline, encoding=encoding, + errors=errors, opener=opener) + raw = getattr(file, 'buffer', file) + raw = getattr(raw, 'raw', raw) + raw.name = fd + return file except IsADirectoryError: # Linux kernel older than 3.11 ignores the O_TMPFILE flag: # O_TMPFILE is read as O_DIRECTORY. Trying to open a directory @@ -617,26 +663,27 @@ def TemporaryFile(mode='w+b', buffering=-1, encoding=None, # fails with NotADirectoryError, because O_TMPFILE is read as # O_DIRECTORY. pass - else: - try: - return _io.open(fd, mode, buffering=buffering, - newline=newline, encoding=encoding, - errors=errors) - except: - _os.close(fd) - raise # Fallback to _mkstemp_inner(). - (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type) - try: - _os.unlink(name) - return _io.open(fd, mode, buffering=buffering, - newline=newline, encoding=encoding, errors=errors) - except: - _os.close(fd) - raise + fd = None + def opener(*args): + nonlocal fd + fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type) + try: + _os.unlink(name) + except BaseException as e: + _os.close(fd) + raise + return fd + file = _io.open(dir, mode, buffering=buffering, + newline=newline, encoding=encoding, errors=errors, + opener=opener) + raw = getattr(file, 'buffer', file) + raw = getattr(raw, 'raw', raw) + raw.name = fd + return file -class SpooledTemporaryFile: +class SpooledTemporaryFile(_io.IOBase): """Temporary file wrapper, specialized to switch from BytesIO or StringIO to a real file when it exceeds a certain size or when a fileno is needed. @@ -701,6 +748,16 @@ def __exit__(self, exc, value, tb): def __iter__(self): return self._file.__iter__() + def __del__(self): + if not self.closed: + _warnings.warn( + "Unclosed file {!r}".format(self), + ResourceWarning, + stacklevel=2, + source=self + ) + self.close() + def close(self): self._file.close() @@ -744,15 +801,30 @@ def name(self): def newlines(self): return self._file.newlines + def readable(self): + return self._file.readable() + def read(self, *args): return self._file.read(*args) + def read1(self, *args): + return self._file.read1(*args) + + def readinto(self, b): + return self._file.readinto(b) + + def readinto1(self, b): + return self._file.readinto1(b) + def readline(self, *args): return self._file.readline(*args) def readlines(self, *args): return self._file.readlines(*args) + def seekable(self): + return self._file.seekable() + def seek(self, *args): return self._file.seek(*args) @@ -761,11 +833,14 @@ def tell(self): def truncate(self, size=None): if size is None: - self._file.truncate() + return self._file.truncate() else: if size > self._max_size: self.rollover() - self._file.truncate(size) + return self._file.truncate(size) + + def writable(self): + return self._file.writable() def write(self, s): file = self._file @@ -774,10 +849,17 @@ def write(self, s): return rv def writelines(self, iterable): - file = self._file - rv = file.writelines(iterable) - self._check(file) - return rv + if self._max_size == 0 or self._rolled: + return self._file.writelines(iterable) + + it = iter(iterable) + for line in it: + self.write(line) + if self._rolled: + return self._file.writelines(it) + + def detach(self): + return self._file.detach() class TemporaryDirectory: @@ -789,53 +871,74 @@ class TemporaryDirectory: ... Upon exiting the context, the directory and everything contained - in it are removed. + in it are removed (unless delete=False is passed or an exception + is raised during cleanup and ignore_cleanup_errors is not True). + + Optional Arguments: + suffix - A str suffix for the directory name. (see mkdtemp) + prefix - A str prefix for the directory name. (see mkdtemp) + dir - A directory to create this temp dir in. (see mkdtemp) + ignore_cleanup_errors - False; ignore exceptions during cleanup? + delete - True; whether the directory is automatically deleted. """ def __init__(self, suffix=None, prefix=None, dir=None, - ignore_cleanup_errors=False): + ignore_cleanup_errors=False, *, delete=True): self.name = mkdtemp(suffix, prefix, dir) self._ignore_cleanup_errors = ignore_cleanup_errors + self._delete = delete self._finalizer = _weakref.finalize( self, self._cleanup, self.name, warn_message="Implicitly cleaning up {!r}".format(self), - ignore_errors=self._ignore_cleanup_errors) + ignore_errors=self._ignore_cleanup_errors, delete=self._delete) @classmethod - def _rmtree(cls, name, ignore_errors=False): - def onerror(func, path, exc_info): - if issubclass(exc_info[0], PermissionError): - def resetperms(path): - try: - _os.chflags(path, 0) - except AttributeError: - pass - _os.chmod(path, 0o700) + def _rmtree(cls, name, ignore_errors=False, repeated=False): + def onexc(func, path, exc): + if isinstance(exc, PermissionError): + if repeated and path == name: + if ignore_errors: + return + raise try: if path != name: - resetperms(_os.path.dirname(path)) - resetperms(path) + _resetperms(_os.path.dirname(path)) + _resetperms(path) try: _os.unlink(path) - # PermissionError is raised on FreeBSD for directories - except (IsADirectoryError, PermissionError): + except IsADirectoryError: cls._rmtree(path, ignore_errors=ignore_errors) + except PermissionError: + # The PermissionError handler was originally added for + # FreeBSD in directories, but it seems that it is raised + # on Windows too. + # bpo-43153: Calling _rmtree again may + # raise NotADirectoryError and mask the PermissionError. + # So we must re-raise the current PermissionError if + # path is not a directory. + if not _os.path.isdir(path) or _os.path.isjunction(path): + if ignore_errors: + return + raise + cls._rmtree(path, ignore_errors=ignore_errors, + repeated=(path == name)) except FileNotFoundError: pass - elif issubclass(exc_info[0], FileNotFoundError): + elif isinstance(exc, FileNotFoundError): pass else: if not ignore_errors: raise - _shutil.rmtree(name, onerror=onerror) + _shutil.rmtree(name, onexc=onexc) @classmethod - def _cleanup(cls, name, warn_message, ignore_errors=False): - cls._rmtree(name, ignore_errors=ignore_errors) - _warnings.warn(warn_message, ResourceWarning) + def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True): + if delete: + cls._rmtree(name, ignore_errors=ignore_errors) + _warnings.warn(warn_message, ResourceWarning) def __repr__(self): return "<{} {!r}>".format(self.__class__.__name__, self.name) @@ -844,7 +947,8 @@ def __enter__(self): return self.name def __exit__(self, exc, value, tb): - self.cleanup() + if self._delete: + self.cleanup() def cleanup(self): if self._finalizer.detach() or _os.path.exists(self.name): diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 997c98c2c62..4d058652723 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -800,8 +800,7 @@ def test_env(self): stdout, stderr = p.communicate() self.assertEqual(stdout, b"orange") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipUnless(sys.platform == "win32", "Windows only issue") def test_win32_duplicate_envs(self): newenv = os.environ.copy() @@ -1292,8 +1291,7 @@ def test_universal_newlines_communicate_stdin_stdout_stderr(self): # to stderr at exit of subprocess. self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_universal_newlines_communicate_encodings(self): # Check that universal newlines mode works for various encodings, # in particular for encodings in the UTF-16 and UTF-32 families. @@ -2276,16 +2274,14 @@ def test_group_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, group=65535) - # TODO: RUSTPYTHON, observed gids do not match expected gids - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; observed gids do not match expected gids @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') def test_extra_groups(self): gid = os.getegid() group_list = [65534 if gid != 65534 else 65533] self._test_extra_groups_impl(gid=gid, group_list=group_list) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') def test_extra_groups_empty_list(self): self._test_extra_groups_impl(gid=os.getegid(), group_list=[]) @@ -2321,8 +2317,8 @@ def _test_extra_groups_impl(self, *, gid, group_list): subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[name_group]) - @unittest.skip("TODO: RUSTPYTHON; clarify failure condition") # No skip necessary, this test won't make it to a setgroup() call. + @unittest.skip('TODO: RUSTPYTHON; clarify failure condition') def test_extra_groups_invalid_gid_t_values(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1]) @@ -2449,8 +2445,7 @@ def raise_it(): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=raise_it) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_preexec_gc_module_failure(self): # This tests the code that disables garbage collection if the child # process will execute any Python. @@ -3021,7 +3016,7 @@ def kill_p2(): p1.stdout.close() p2.stdout.close() - @unittest.skip("TODO: RUSTPYTHON, flaky test") + @unittest.skip('TODO: RUSTPYTHON; flaky test') def test_close_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -3149,11 +3144,11 @@ def test_close_fds_when_max_fd_is_lowered(self): msg="Some fds were left open.") - @unittest.skip("TODO: RUSTPYTHON, flaky test") # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file # descriptor of a pipe closed in the parent process is valid in the # child process according to fstat(), but the mode of the file # descriptor is invalid, and read or write raise an error. + @unittest.skip('TODO: RUSTPYTHON; flaky test') @support.requires_mac_ver(10, 5) def test_pass_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -3566,8 +3561,7 @@ def test_communicate_repeated_call_after_stdout_close(self): except subprocess.TimeoutExpired: pass - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_preexec_at_exit(self): code = f"""if 1: import atexit diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 5674839cd43..d389529ca3b 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -11,6 +11,9 @@ import stat import types import weakref +import gc +import shutil +import subprocess from unittest import mock import unittest @@ -60,16 +63,10 @@ def test_infer_return_type_multiples_and_none(self): tempfile._infer_return_type(b'', None, '') def test_infer_return_type_pathlib(self): - self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) + self.assertIs(str, tempfile._infer_return_type(os_helper.FakePath('/'))) def test_infer_return_type_pathlike(self): - class Path: - def __init__(self, path): - self.path = path - - def __fspath__(self): - return self.path - + Path = os_helper.FakePath self.assertIs(str, tempfile._infer_return_type(Path('/'))) self.assertIs(bytes, tempfile._infer_return_type(Path(b'/'))) self.assertIs(str, tempfile._infer_return_type('', Path(''))) @@ -90,14 +87,10 @@ class BaseTestCase(unittest.TestCase): b_check = re.compile(br"^[a-z0-9_-]{8}$") def setUp(self): - self._warnings_manager = warnings_helper.check_warnings() - self._warnings_manager.__enter__() + self.enterContext(warnings_helper.check_warnings()) warnings.filterwarnings("ignore", category=RuntimeWarning, message="mktemp", module=__name__) - def tearDown(self): - self._warnings_manager.__exit__(None, None, None) - def nameCheck(self, name, dir, pre, suf): (ndir, nbase) = os.path.split(name) npre = nbase[:len(pre)] @@ -198,8 +191,7 @@ def supports_iter(self): if i == 20: break - @unittest.skipUnless(hasattr(os, 'fork'), - "os.fork is required for this test") + @support.requires_fork() def test_process_awareness(self): # ensure that the random source differs between # child and parent. @@ -290,19 +282,14 @@ def our_candidate_list(): def raise_OSError(*args, **kwargs): raise OSError() - with support.swap_attr(io, "open", raise_OSError): - # test again with failing io.open() + with support.swap_attr(os, "open", raise_OSError): + # test again with failing os.open() with self.assertRaises(FileNotFoundError): tempfile._get_default_tempdir() self.assertEqual(os.listdir(our_temp_directory), []) - def bad_writer(*args, **kwargs): - fp = orig_open(*args, **kwargs) - fp.write = raise_OSError - return fp - - with support.swap_attr(io, "open", bad_writer) as orig_open: - # test again with failing write() + with support.swap_attr(os, "write", raise_OSError): + # test again with failing os.write() with self.assertRaises(FileNotFoundError): tempfile._get_default_tempdir() self.assertEqual(os.listdir(our_temp_directory), []) @@ -342,6 +329,9 @@ def _mock_candidate_names(*names): class TestBadTempdir: + @unittest.skipIf( + support.is_emscripten, "Emscripten cannot remove write bits." + ) def test_read_only_directory(self): with _inside_empty_temp_dir(): oldmode = mode = os.stat(tempfile.tempdir).st_mode @@ -447,11 +437,12 @@ def test_choose_directory(self): dir = tempfile.mkdtemp() try: self.do_create(dir=dir).write(b"blat") - self.do_create(dir=pathlib.Path(dir)).write(b"blat") + self.do_create(dir=os_helper.FakePath(dir)).write(b"blat") finally: support.gc_collect() # For PyPy or other GCs. os.rmdir(dir) + @os_helper.skip_unless_working_chmod def test_file_mode(self): # _mkstemp_inner creates files with the proper mode @@ -465,8 +456,8 @@ def test_file_mode(self): expected = user * (1 + 8 + 64) self.assertEqual(mode, expected) - @support.requires_fork() @unittest.skipUnless(has_spawnl, 'os.spawnl not available') + @support.requires_subprocess() def test_noinherit(self): # _mkstemp_inner file handles are not inherited by child processes @@ -684,7 +675,7 @@ def test_choose_directory(self): dir = tempfile.mkdtemp() try: self.do_create(dir=dir) - self.do_create(dir=pathlib.Path(dir)) + self.do_create(dir=os_helper.FakePath(dir)) finally: os.rmdir(dir) @@ -785,10 +776,11 @@ def test_choose_directory(self): dir = tempfile.mkdtemp() try: os.rmdir(self.do_create(dir=dir)) - os.rmdir(self.do_create(dir=pathlib.Path(dir))) + os.rmdir(self.do_create(dir=os_helper.FakePath(dir))) finally: os.rmdir(dir) + @os_helper.skip_unless_working_chmod def test_mode(self): # mkdtemp creates directories with the proper mode @@ -806,6 +798,33 @@ def test_mode(self): finally: os.rmdir(dir) + @unittest.skipUnless(os.name == "nt", "Only on Windows.") + def test_mode_win32(self): + # Use icacls.exe to extract the users with some level of access + # Main thing we are testing is that the BUILTIN\Users group has + # no access. The exact ACL is going to vary based on which user + # is running the test. + dir = self.do_create() + try: + out = subprocess.check_output(["icacls.exe", dir], encoding="oem").casefold() + finally: + os.rmdir(dir) + + dir = dir.casefold() + users = set() + found_user = False + for line in out.strip().splitlines(): + acl = None + # First line of result includes our directory + if line.startswith(dir): + acl = line.removeprefix(dir).strip() + elif line and line[:1].isspace(): + acl = line.strip() + if acl: + users.add(acl.partition(":")[0]) + + self.assertNotIn(r"BUILTIN\Users".casefold(), users) + def test_collision_with_existing_file(self): # mkdtemp tries another name when a file with # the chosen name already exists @@ -853,6 +872,15 @@ def test_for_tempdir_is_bytes_issue40701_api_warts(self): finally: tempfile.tempdir = orig_tempdir + def test_path_is_absolute(self): + # Test that the path returned by mkdtemp with a relative `dir` + # argument is absolute + try: + path = tempfile.mkdtemp(dir=".") + self.assertTrue(os.path.isabs(path)) + finally: + os.rmdir(path) + class TestMktemp(BaseTestCase): """Test mktemp().""" @@ -978,6 +1006,7 @@ def test_del_on_close(self): try: with tempfile.NamedTemporaryFile(dir=dir) as f: f.write(b'blat') + self.assertEqual(os.listdir(dir), []) self.assertFalse(os.path.exists(f.name), "NamedTemporaryFile %s exists after close" % f.name) finally: @@ -1017,18 +1046,101 @@ def use_closed(): pass self.assertRaises(ValueError, use_closed) - def test_no_leak_fd(self): - # Issue #21058: don't leak file descriptor when io.open() fails - closed = [] - os_close = os.close - def close(fd): - closed.append(fd) - os_close(fd) + def test_context_man_not_del_on_close_if_delete_on_close_false(self): + # Issue gh-58451: tempfile.NamedTemporaryFile is not particularly useful + # on Windows + # A NamedTemporaryFile is NOT deleted when closed if + # delete_on_close=False, but is deleted on context manager exit + dir = tempfile.mkdtemp() + try: + with tempfile.NamedTemporaryFile(dir=dir, + delete=True, + delete_on_close=False) as f: + f.write(b'blat') + f_name = f.name + f.close() + with self.subTest(): + # Testing that file is not deleted on close + self.assertTrue(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} is incorrectly " + f"deleted on closure when delete_on_close=False") + + with self.subTest(): + # Testing that file is deleted on context manager exit + self.assertFalse(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} exists " + f"after context manager exit") + + finally: + os.rmdir(dir) + + def test_context_man_ok_to_delete_manually(self): + # In the case of delete=True, a NamedTemporaryFile can be manually + # deleted in a with-statement context without causing an error. + dir = tempfile.mkdtemp() + try: + with tempfile.NamedTemporaryFile(dir=dir, + delete=True, + delete_on_close=False) as f: + f.write(b'blat') + f.close() + os.unlink(f.name) + + finally: + os.rmdir(dir) + + def test_context_man_not_del_if_delete_false(self): + # A NamedTemporaryFile is not deleted if delete = False + dir = tempfile.mkdtemp() + f_name = "" + try: + # Test that delete_on_close=True has no effect if delete=False. + with tempfile.NamedTemporaryFile(dir=dir, delete=False, + delete_on_close=True) as f: + f.write(b'blat') + f_name = f.name + self.assertTrue(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} exists after close") + finally: + os.unlink(f_name) + os.rmdir(dir) + + def test_del_by_finalizer(self): + # A NamedTemporaryFile is deleted when finalized in the case of + # delete=True, delete_on_close=False, and no with-statement is used. + def my_func(dir): + f = tempfile.NamedTemporaryFile(dir=dir, delete=True, + delete_on_close=False) + tmp_name = f.name + f.write(b'blat') + # Testing extreme case, where the file is not explicitly closed + # f.close() + return tmp_name + # Make sure that the garbage collector has finalized the file object. + gc.collect() + dir = tempfile.mkdtemp() + try: + tmp_name = my_func(dir) + self.assertFalse(os.path.exists(tmp_name), + f"NamedTemporaryFile {tmp_name!r} " + f"exists after finalizer ") + finally: + os.rmdir(dir) - with mock.patch('os.close', side_effect=close): - with mock.patch('io.open', side_effect=ValueError): - self.assertRaises(ValueError, tempfile.NamedTemporaryFile) - self.assertEqual(len(closed), 1) + def test_correct_finalizer_work_if_already_deleted(self): + # There should be no error in the case of delete=True, + # delete_on_close=False, no with-statement is used, and the file is + # deleted manually. + def my_func(dir)->str: + f = tempfile.NamedTemporaryFile(dir=dir, delete=True, + delete_on_close=False) + tmp_name = f.name + f.write(b'blat') + f.close() + os.unlink(tmp_name) + return tmp_name + # Make sure that the garbage collector has finalized the file object. + gc.collect() def test_bad_mode(self): dir = tempfile.mkdtemp() @@ -1039,6 +1151,24 @@ def test_bad_mode(self): tempfile.NamedTemporaryFile(mode=2, dir=dir) self.assertEqual(os.listdir(dir), []) + def test_bad_encoding(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with self.assertRaises(LookupError): + tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir) + self.assertEqual(os.listdir(dir), []) + + def test_unexpected_error(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \ + mock.patch('io.open', mock.mock_open()) as mock_open: + mock_ntf.side_effect = KeyboardInterrupt() + with self.assertRaises(KeyboardInterrupt): + tempfile.NamedTemporaryFile(dir=dir) + mock_open().close.assert_called() + self.assertEqual(os.listdir(dir), []) + # How to test the mode and bufsize parameters? class TestSpooledTemporaryFile(BaseTestCase): @@ -1059,6 +1189,31 @@ def test_basic(self): f = self.do_create(max_size=100, pre="a", suf=".txt") self.assertFalse(f._rolled) + def test_is_iobase(self): + # SpooledTemporaryFile should implement io.IOBase + self.assertIsInstance(self.do_create(), io.IOBase) + + def test_iobase_interface(self): + # SpooledTemporaryFile should implement the io.IOBase interface. + # Ensure it has all the required methods and properties. + iobase_attrs = { + # From IOBase + 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__', + '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable', + 'readline', 'readlines', 'seekable', 'tell', 'writable', + 'writelines', + # From BufferedIOBase (binary mode) and TextIOBase (text mode) + 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1', + 'encoding', 'errors', 'newlines', + } + spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile)) + missing_attrs = iobase_attrs - spooledtempfile_attrs + self.assertFalse( + missing_attrs, + 'SpooledTemporaryFile missing attributes from ' + 'IOBase/BufferedIOBase/TextIOBase' + ) + def test_del_on_close(self): # A SpooledTemporaryFile is deleted when closed dir = tempfile.mkdtemp() @@ -1069,11 +1224,40 @@ def test_del_on_close(self): self.assertTrue(f._rolled) filename = f.name f.close() - self.assertFalse(isinstance(filename, str) and os.path.exists(filename), - "SpooledTemporaryFile %s exists after close" % filename) + self.assertEqual(os.listdir(dir), []) + if not isinstance(filename, int): + self.assertFalse(os.path.exists(filename), + "SpooledTemporaryFile %s exists after close" % filename) finally: os.rmdir(dir) + def test_del_unrolled_file(self): + # The unrolled SpooledTemporaryFile should raise a ResourceWarning + # when deleted since the file was not explicitly closed. + f = self.do_create(max_size=10) + f.write(b'foo') + self.assertEqual(f.name, None) # Unrolled so no filename/fd + with self.assertWarns(ResourceWarning): + f.__del__() + + @unittest.skipIf( + support.is_emscripten, "Emscripten cannot fstat renamed files." + ) + def test_del_rolled_file(self): + # The rolled file should be deleted when the SpooledTemporaryFile + # object is deleted. This should raise a ResourceWarning since the file + # was not explicitly closed. + f = self.do_create(max_size=2) + f.write(b'foo') + name = f.name # This is a fd on posix+cygwin, a filename everywhere else + self.assertTrue(os.path.exists(name)) + with self.assertWarns(ResourceWarning): + f.__del__() + self.assertFalse( + os.path.exists(name), + "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name + ) + def test_rewrite_small(self): # A SpooledTemporaryFile can be written to multiple within the max_size f = self.do_create(max_size=30) @@ -1104,6 +1288,34 @@ def test_writelines(self): buf = f.read() self.assertEqual(buf, b'xyz') + def test_writelines_rollover(self): + # Verify writelines rolls over before exhausting the iterator + f = self.do_create(max_size=2) + + def it(): + yield b'xy' + self.assertFalse(f._rolled) + yield b'z' + self.assertTrue(f._rolled) + + f.writelines(it()) + pos = f.seek(0) + self.assertEqual(pos, 0) + buf = f.read() + self.assertEqual(buf, b'xyz') + + def test_writelines_fast_path(self): + f = self.do_create(max_size=2) + f.write(b'abc') + self.assertTrue(f._rolled) + + f.writelines([b'd', b'e', b'f']) + pos = f.seek(0) + self.assertEqual(pos, 0) + buf = f.read() + self.assertEqual(buf, b'abcdef') + + def test_writelines_sequential(self): # A SpooledTemporaryFile should hold exactly max_size bytes, and roll # over afterward @@ -1284,6 +1496,9 @@ def use_closed(): pass self.assertRaises(ValueError, use_closed) + @unittest.skipIf( + support.is_emscripten, "Emscripten cannot fstat renamed files." + ) def test_truncate_with_size_parameter(self): # A SpooledTemporaryFile can be truncated to zero size f = tempfile.SpooledTemporaryFile(max_size=10) @@ -1357,19 +1572,34 @@ def roundtrip(input, *args, **kwargs): roundtrip("\u039B", "w+", encoding="utf-16") roundtrip("foo\r\n", "w+", newline="") - def test_no_leak_fd(self): - # Issue #21058: don't leak file descriptor when io.open() fails - closed = [] - os_close = os.close - def close(fd): - closed.append(fd) - os_close(fd) - - with mock.patch('os.close', side_effect=close): - with mock.patch('io.open', side_effect=ValueError): - self.assertRaises(ValueError, tempfile.TemporaryFile) - self.assertEqual(len(closed), 1) + def test_bad_mode(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with self.assertRaises(ValueError): + tempfile.TemporaryFile(mode='wr', dir=dir) + with self.assertRaises(TypeError): + tempfile.TemporaryFile(mode=2, dir=dir) + self.assertEqual(os.listdir(dir), []) + + def test_bad_encoding(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with self.assertRaises(LookupError): + tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir) + self.assertEqual(os.listdir(dir), []) + def test_unexpected_error(self): + dir = tempfile.mkdtemp() + self.addCleanup(os_helper.rmtree, dir) + with mock.patch('tempfile._O_TMPFILE_WORKS', False), \ + mock.patch('os.unlink') as mock_unlink, \ + mock.patch('os.open') as mock_open, \ + mock.patch('os.close') as mock_close: + mock_unlink.side_effect = KeyboardInterrupt() + with self.assertRaises(KeyboardInterrupt): + tempfile.TemporaryFile(dir=dir) + mock_close.assert_called() + self.assertEqual(os.listdir(dir), []) # Helper for test_del_on_shutdown @@ -1437,7 +1667,7 @@ def test_explicit_cleanup(self): finally: os.rmdir(dir) - def test_explict_cleanup_ignore_errors(self): + def test_explicit_cleanup_ignore_errors(self): """Test that cleanup doesn't return an error when ignoring them.""" with tempfile.TemporaryDirectory() as working_dir: temp_dir = self.do_create( @@ -1461,6 +1691,28 @@ def test_explict_cleanup_ignore_errors(self): temp_path.exists(), f"TemporaryDirectory {temp_path!s} exists after cleanup") + @unittest.skipUnless(os.name == "nt", "Only on Windows.") + def test_explicit_cleanup_correct_error(self): + with tempfile.TemporaryDirectory() as working_dir: + temp_dir = self.do_create(dir=working_dir) + with open(os.path.join(temp_dir.name, "example.txt"), 'wb'): + # Previously raised NotADirectoryError on some OSes + # (e.g. Windows). See bpo-43153. + with self.assertRaises(PermissionError): + temp_dir.cleanup() + + @unittest.skipUnless(os.name == "nt", "Only on Windows.") + def test_cleanup_with_used_directory(self): + with tempfile.TemporaryDirectory() as working_dir: + temp_dir = self.do_create(dir=working_dir) + subdir = os.path.join(temp_dir.name, "subdir") + os.mkdir(subdir) + with os_helper.change_cwd(subdir): + # Previously raised RecursionError on some OSes + # (e.g. Windows). See bpo-35144. + with self.assertRaises(PermissionError): + temp_dir.cleanup() + @os_helper.skip_unless_symlink def test_cleanup_with_symlink_to_a_directory(self): # cleanup() should not follow symlinks to directories (issue #12464) @@ -1482,6 +1734,104 @@ def test_cleanup_with_symlink_to_a_directory(self): "were deleted") d2.cleanup() + @os_helper.skip_unless_symlink + @unittest.skip('TODO: RUSTPYTHON; No such file or directory "..."') + def test_cleanup_with_symlink_modes(self): + # cleanup() should not follow symlinks when fixing mode bits (#91133) + with self.do_create(recurse=0) as d2: + file1 = os.path.join(d2, 'file1') + open(file1, 'wb').close() + dir1 = os.path.join(d2, 'dir1') + os.mkdir(dir1) + for mode in range(8): + mode <<= 6 + with self.subTest(mode=format(mode, '03o')): + def test(target, target_is_directory): + d1 = self.do_create(recurse=0) + symlink = os.path.join(d1.name, 'symlink') + os.symlink(target, symlink, + target_is_directory=target_is_directory) + try: + os.chmod(symlink, mode, follow_symlinks=False) + except NotImplementedError: + pass + try: + os.chmod(symlink, mode) + except FileNotFoundError: + pass + os.chmod(d1.name, mode) + d1.cleanup() + self.assertFalse(os.path.exists(d1.name)) + + with self.subTest('nonexisting file'): + test('nonexisting', target_is_directory=False) + with self.subTest('nonexisting dir'): + test('nonexisting', target_is_directory=True) + + with self.subTest('existing file'): + os.chmod(file1, mode) + old_mode = os.stat(file1).st_mode + test(file1, target_is_directory=False) + new_mode = os.stat(file1).st_mode + self.assertEqual(new_mode, old_mode, + '%03o != %03o' % (new_mode, old_mode)) + + with self.subTest('existing dir'): + os.chmod(dir1, mode) + old_mode = os.stat(dir1).st_mode + test(dir1, target_is_directory=True) + new_mode = os.stat(dir1).st_mode + self.assertEqual(new_mode, old_mode, + '%03o != %03o' % (new_mode, old_mode)) + + @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') + @os_helper.skip_unless_symlink + def test_cleanup_with_symlink_flags(self): + # cleanup() should not follow symlinks when fixing flags (#91133) + flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK + self.check_flags(flags) + + with self.do_create(recurse=0) as d2: + file1 = os.path.join(d2, 'file1') + open(file1, 'wb').close() + dir1 = os.path.join(d2, 'dir1') + os.mkdir(dir1) + def test(target, target_is_directory): + d1 = self.do_create(recurse=0) + symlink = os.path.join(d1.name, 'symlink') + os.symlink(target, symlink, + target_is_directory=target_is_directory) + try: + os.chflags(symlink, flags, follow_symlinks=False) + except NotImplementedError: + pass + try: + os.chflags(symlink, flags) + except FileNotFoundError: + pass + os.chflags(d1.name, flags) + d1.cleanup() + self.assertFalse(os.path.exists(d1.name)) + + with self.subTest('nonexisting file'): + test('nonexisting', target_is_directory=False) + with self.subTest('nonexisting dir'): + test('nonexisting', target_is_directory=True) + + with self.subTest('existing file'): + os.chflags(file1, flags) + old_flags = os.stat(file1).st_flags + test(file1, target_is_directory=False) + new_flags = os.stat(file1).st_flags + self.assertEqual(new_flags, old_flags) + + with self.subTest('existing dir'): + os.chflags(dir1, flags) + old_flags = os.stat(dir1).st_flags + test(dir1, target_is_directory=True) + new_flags = os.stat(dir1).st_flags + self.assertEqual(new_flags, old_flags) + @support.cpython_only def test_del_on_collection(self): # A TemporaryDirectory is deleted when garbage collected @@ -1654,9 +2004,27 @@ def test_modes(self): d.cleanup() self.assertFalse(os.path.exists(d.name)) - @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags') + def check_flags(self, flags): + # skip the test if these flags are not supported (ex: FreeBSD 13) + filename = os_helper.TESTFN + try: + open(filename, "w").close() + try: + os.chflags(filename, flags) + except OSError as exc: + # "OSError: [Errno 45] Operation not supported" + self.skipTest(f"chflags() doesn't support flags " + f"{flags:#b}: {exc}") + else: + os.chflags(filename, 0) + finally: + os_helper.unlink(filename) + + @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') def test_flags(self): flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK + self.check_flags(flags) + d = self.do_create(recurse=3, dirs=2, files=2) with d: # Change files and directories flags recursively. @@ -1667,6 +2035,11 @@ def test_flags(self): d.cleanup() self.assertFalse(os.path.exists(d.name)) + def test_delete_false(self): + with tempfile.TemporaryDirectory(delete=False) as working_dir: + pass + self.assertTrue(os.path.exists(working_dir)) + shutil.rmtree(working_dir) if __name__ == "__main__": unittest.main()