diff --git a/Lib/pty.py b/Lib/pty.py index 8d8ce40df54..1d97994abef 100644 --- a/Lib/pty.py +++ b/Lib/pty.py @@ -40,6 +40,9 @@ def master_open(): Open a pty master and return the fd, and the filename of the slave end. Deprecated, use openpty() instead.""" + import warnings + warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 + try: master_fd, slave_fd = os.openpty() except (AttributeError, OSError): @@ -69,6 +72,9 @@ def slave_open(tty_name): opened filedescriptor. Deprecated, use openpty() instead.""" + import warnings + warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 + result = os.open(tty_name, os.O_RDWR) try: from fcntl import ioctl, I_PUSH @@ -101,32 +107,14 @@ def fork(): master_fd, slave_fd = openpty() pid = os.fork() if pid == CHILD: - # Establish a new session. - os.setsid() os.close(master_fd) - - # Slave becomes stdin/stdout/stderr of child. - os.dup2(slave_fd, STDIN_FILENO) - os.dup2(slave_fd, STDOUT_FILENO) - os.dup2(slave_fd, STDERR_FILENO) - if slave_fd > STDERR_FILENO: - os.close(slave_fd) - - # Explicitly open the tty to make it become a controlling tty. - tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR) - os.close(tmp_fd) + os.login_tty(slave_fd) else: os.close(slave_fd) # Parent and child process. return pid, master_fd -def _writen(fd, data): - """Write all the data to a descriptor.""" - while data: - n = os.write(fd, data) - data = data[n:] - def _read(fd): """Default read function.""" return os.read(fd, 1024) @@ -136,9 +124,42 @@ def _copy(master_fd, master_read=_read, stdin_read=_read): Copies pty master -> standard output (master_read) standard input -> pty master (stdin_read)""" - fds = [master_fd, STDIN_FILENO] - while fds: - rfds, _wfds, _xfds = select(fds, [], []) + if os.get_blocking(master_fd): + # If we write more than tty/ndisc is willing to buffer, we may block + # indefinitely. So we set master_fd to non-blocking temporarily during + # the copy operation. + os.set_blocking(master_fd, False) + try: + _copy(master_fd, master_read=master_read, stdin_read=stdin_read) + finally: + # restore blocking mode for backwards compatibility + os.set_blocking(master_fd, True) + return + high_waterlevel = 4096 + stdin_avail = master_fd != STDIN_FILENO + stdout_avail = master_fd != STDOUT_FILENO + i_buf = b'' + o_buf = b'' + while 1: + rfds = [] + wfds = [] + if stdin_avail and len(i_buf) < high_waterlevel: + rfds.append(STDIN_FILENO) + if stdout_avail and len(o_buf) < high_waterlevel: + rfds.append(master_fd) + if stdout_avail and len(o_buf) > 0: + wfds.append(STDOUT_FILENO) + if len(i_buf) > 0: + wfds.append(master_fd) + + rfds, wfds, _xfds = select(rfds, wfds, []) + + if STDOUT_FILENO in wfds: + try: + n = os.write(STDOUT_FILENO, o_buf) + o_buf = o_buf[n:] + except OSError: + stdout_avail = False if master_fd in rfds: # Some OSes signal EOF by returning an empty byte string, @@ -150,19 +171,22 @@ def _copy(master_fd, master_read=_read, stdin_read=_read): if not data: # Reached EOF. return # Assume the child process has exited and is # unreachable, so we clean up. - else: - os.write(STDOUT_FILENO, data) + o_buf += data + + if master_fd in wfds: + n = os.write(master_fd, i_buf) + i_buf = i_buf[n:] - if STDIN_FILENO in rfds: + if stdin_avail and STDIN_FILENO in rfds: data = stdin_read(STDIN_FILENO) if not data: - fds.remove(STDIN_FILENO) + stdin_avail = False else: - _writen(master_fd, data) + i_buf += data def spawn(argv, master_read=_read, stdin_read=_read): """Create a spawned process.""" - if type(argv) == type(''): + if isinstance(argv, str): argv = (argv,) sys.audit('pty.spawn', argv) diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index 7d8c7a5e016..55585d9ef84 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -2241,6 +2241,7 @@ def child(wpipe): expected = terminal_input.decode(sys.stdin.encoding) # what else? self.assertEqual(input_result, expected) + @unittest.expectedFailure # TODO: RUSTPYTHON def test_input_tty(self): # Test input() functionality when wired to a tty (the code path # is different and invokes GNU readline if available). @@ -2257,17 +2258,20 @@ def skip_if_readline(self): self.skipTest("the readline module is loaded") @unittest.skipUnless(hasattr(sys.stdin, 'detach'), 'TODO: RustPython: requires detach function in TextIOWrapper') + @unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: got 0 lines in pipe but expected 2, child output was: quux def test_input_tty_non_ascii(self): self.skip_if_readline() # Check stdin/stdout encoding is used when invoking PyOS_Readline() self.check_input_tty("prompté", b"quux\xe9", "utf-8") @unittest.skipUnless(hasattr(sys.stdin, 'detach'), 'TODO: RustPython: requires detach function in TextIOWrapper') + @unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: got 0 lines in pipe but expected 2, child output was: quux def test_input_tty_non_ascii_unicode_errors(self): self.skip_if_readline() # Check stdin/stdout error handler is used when invoking PyOS_Readline() self.check_input_tty("prompté", b"quux\xe9", "ascii") + @unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: got 0 lines in pipe but expected 2, child output was: quux def test_input_no_stdout_fileno(self): # Issue #24402: If stdin is the original terminal but stdout.fileno() # fails, do not use the original stdout file descriptor diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index b1c1f6abffb..6d3c47195c6 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -1,10 +1,16 @@ -from test import support -from test.support import verbose, reap_children +import unittest +from test.support import ( + is_android, is_apple_mobile, is_emscripten, is_wasi, reap_children, verbose +) from test.support.import_helper import import_module +from test.support.os_helper import TESTFN, unlink # Skip these tests if termios is not available import_module('termios') +if is_android or is_apple_mobile or is_emscripten or is_wasi: + raise unittest.SkipTest("pty is not available on this platform") + import errno import os import pty @@ -14,21 +20,12 @@ import signal import socket import io # readline -import unittest - -import struct -import fcntl import warnings TEST_STRING_1 = b"I wish to buy a fish license.\n" TEST_STRING_2 = b"For my pet fish, Eric.\n" -try: - _TIOCGWINSZ = tty.TIOCGWINSZ - _TIOCSWINSZ = tty.TIOCSWINSZ - _HAVE_WINSZ = True -except AttributeError: - _HAVE_WINSZ = False +_HAVE_WINSZ = hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ") if verbose: def debug(msg): @@ -82,90 +79,78 @@ def expectedFailureIfStdinIsTTY(fun): pass return fun -def _get_term_winsz(fd): - s = struct.pack("HHHH", 0, 0, 0, 0) - return fcntl.ioctl(fd, _TIOCGWINSZ, s) -def _set_term_winsz(fd, winsz): - fcntl.ioctl(fd, _TIOCSWINSZ, winsz) +def write_all(fd, data): + written = os.write(fd, data) + if written != len(data): + # gh-73256, gh-110673: It should never happen, but check just in case + raise Exception(f"short write: os.write({fd}, {len(data)} bytes) " + f"wrote {written} bytes") # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing # because pty code is not too portable. class PtyTest(unittest.TestCase): def setUp(self): - old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) - self.addCleanup(signal.signal, signal.SIGALRM, old_alarm) - old_sighup = signal.signal(signal.SIGHUP, self.handle_sighup) self.addCleanup(signal.signal, signal.SIGHUP, old_sighup) - # isatty() and close() can hang on some platforms. Set an alarm - # before running the test to make sure we don't hang forever. - self.addCleanup(signal.alarm, 0) - signal.alarm(10) - - # Save original stdin window size - self.stdin_rows = None - self.stdin_cols = None + # Save original stdin window size. + self.stdin_dim = None if _HAVE_WINSZ: try: - stdin_dim = os.get_terminal_size(pty.STDIN_FILENO) - self.stdin_rows = stdin_dim.lines - self.stdin_cols = stdin_dim.columns - old_stdin_winsz = struct.pack("HHHH", self.stdin_rows, - self.stdin_cols, 0, 0) - self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz) - except OSError: + self.stdin_dim = tty.tcgetwinsize(pty.STDIN_FILENO) + self.addCleanup(tty.tcsetwinsize, pty.STDIN_FILENO, + self.stdin_dim) + except tty.error: pass - def handle_sig(self, sig, frame): - self.fail("isatty hung") - @staticmethod def handle_sighup(signum, frame): pass @expectedFailureIfStdinIsTTY + @unittest.skip('TODO: RUSTPYTHON; "Not runnable. tty.tcgetwinsize" is required to setUp') def test_openpty(self): try: mode = tty.tcgetattr(pty.STDIN_FILENO) except tty.error: - # not a tty or bad/closed fd + # Not a tty or bad/closed fd. debug("tty.tcgetattr(pty.STDIN_FILENO) failed") mode = None - new_stdin_winsz = None - if self.stdin_rows is not None and self.stdin_cols is not None: + new_dim = None + if self.stdin_dim: try: # Modify pty.STDIN_FILENO window size; we need to # check if pty.openpty() is able to set pty slave # window size accordingly. - debug("Setting pty.STDIN_FILENO window size") - debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})") - target_stdin_rows = self.stdin_rows + 1 - target_stdin_cols = self.stdin_cols + 1 - debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})") - target_stdin_winsz = struct.pack("HHHH", target_stdin_rows, - target_stdin_cols, 0, 0) - _set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz) + debug("Setting pty.STDIN_FILENO window size.") + debug(f"original size: (row, col) = {self.stdin_dim}") + target_dim = (self.stdin_dim[0] + 1, self.stdin_dim[1] + 1) + debug(f"target size: (row, col) = {target_dim}") + tty.tcsetwinsize(pty.STDIN_FILENO, target_dim) # Were we able to set the window size # of pty.STDIN_FILENO successfully? - new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO) - self.assertEqual(new_stdin_winsz, target_stdin_winsz, + new_dim = tty.tcgetwinsize(pty.STDIN_FILENO) + self.assertEqual(new_dim, target_dim, "pty.STDIN_FILENO window size unchanged") - except OSError: - warnings.warn("Failed to set pty.STDIN_FILENO window size") + except OSError as e: + logging.getLogger(__name__).warning( + "Failed to set pty.STDIN_FILENO window size.", exc_info=e, + ) pass try: debug("Calling pty.openpty()") try: - master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz) + master_fd, slave_fd, slave_name = pty.openpty(mode, new_dim, + True) except TypeError: master_fd, slave_fd = pty.openpty() - debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'") + slave_name = None + debug(f"Got {master_fd=}, {slave_fd=}, {slave_name=}") except OSError: # " An optional feature could not be imported " ... ? raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.") @@ -181,8 +166,8 @@ def test_openpty(self): if mode: self.assertEqual(tty.tcgetattr(slave_fd), mode, "openpty() failed to set slave termios") - if new_stdin_winsz: - self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz, + if new_dim: + self.assertEqual(tty.tcgetwinsize(slave_fd), new_dim, "openpty() failed to set slave window size") # Ensure the fd is non-blocking in case there's nothing to read. @@ -200,18 +185,18 @@ def test_openpty(self): os.set_blocking(master_fd, blocking) debug("Writing to slave_fd") - os.write(slave_fd, TEST_STRING_1) + write_all(slave_fd, TEST_STRING_1) s1 = _readline(master_fd) self.assertEqual(b'I wish to buy a fish license.\n', normalize_output(s1)) debug("Writing chunked output") - os.write(slave_fd, TEST_STRING_2[:5]) - os.write(slave_fd, TEST_STRING_2[5:]) + write_all(slave_fd, TEST_STRING_2[:5]) + write_all(slave_fd, TEST_STRING_2[5:]) s2 = _readline(master_fd) self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2)) - @support.requires_fork() + @unittest.skip('TODO: RUSTPYTHON; "Not runnable. tty.tcgetwinsize" is required to setUp') def test_fork(self): debug("calling pty.fork()") pid, master_fd = pty.fork() @@ -294,6 +279,7 @@ def test_fork(self): ##else: ## raise TestFailed("Read from master_fd did not raise exception") + @unittest.skip('TODO: RUSTPYTHON; AttributeError: module "tty" has no attribute "tcgetwinsize"') def test_master_read(self): # XXX(nnorwitz): this test leaks fds when there is an error. debug("Calling pty.openpty()") @@ -313,8 +299,28 @@ def test_master_read(self): self.assertEqual(data, b"") + @unittest.skip('TODO: RUSTPYTHON; AttributeError: module "tty" has no attribute "tcgetwinsize"') def test_spawn_doesnt_hang(self): - pty.spawn([sys.executable, '-c', 'print("hi there")']) + self.addCleanup(unlink, TESTFN) + with open(TESTFN, 'wb') as f: + STDOUT_FILENO = 1 + dup_stdout = os.dup(STDOUT_FILENO) + os.dup2(f.fileno(), STDOUT_FILENO) + buf = b'' + def master_read(fd): + nonlocal buf + data = os.read(fd, 1024) + buf += data + return data + try: + pty.spawn([sys.executable, '-c', 'print("hi there")'], + master_read) + finally: + os.dup2(dup_stdout, STDOUT_FILENO) + os.close(dup_stdout) + self.assertEqual(buf, b'hi there\r\n') + with open(TESTFN, 'rb') as f: + self.assertEqual(f.read(), b'hi there\r\n') class SmallPtyTests(unittest.TestCase): """These tests don't spawn children or hang.""" @@ -332,8 +338,8 @@ def setUp(self): self.orig_pty_waitpid = pty.waitpid self.fds = [] # A list of file descriptors to close. self.files = [] - self.select_rfds_lengths = [] - self.select_rfds_results = [] + self.select_input = [] + self.select_output = [] self.tcsetattr_mode_setting = None def tearDown(self): @@ -368,11 +374,10 @@ def _socketpair(self): self.files.extend(socketpair) return socketpair - def _mock_select(self, rfds, wfds, xfds, timeout=0): + def _mock_select(self, rfds, wfds, xfds): # This will raise IndexError when no more expected calls exist. - # This ignores the timeout - self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) - return self.select_rfds_results.pop(0), [], [] + self.assertEqual((rfds, wfds, xfds), self.select_input.pop(0)) + return self.select_output.pop(0) def _make_mock_fork(self, pid): def mock_fork(): @@ -392,14 +397,16 @@ def test__copy_to_each(self): masters = [s.fileno() for s in socketpair] # Feed data. Smaller than PIPEBUF. These writes will not block. - os.write(masters[1], b'from master') - os.write(write_to_stdin_fd, b'from stdin') + write_all(masters[1], b'from master') + write_all(write_to_stdin_fd, b'from stdin') - # Expect two select calls, the last one will cause IndexError + # Expect three select calls, the last one will cause IndexError pty.select = self._mock_select - self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - self.select_rfds_lengths.append(2) + self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) + self.select_output.append(([mock_stdin_fd, masters[0]], [], [])) + self.select_input.append(([mock_stdin_fd, masters[0]], [mock_stdout_fd, masters[0]], [])) + self.select_output.append(([], [mock_stdout_fd, masters[0]], [])) + self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) with self.assertRaises(IndexError): pty._copy(masters[0]) @@ -410,28 +417,6 @@ def test__copy_to_each(self): self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin') - def test__copy_eof_on_all(self): - """Test the empty read EOF case on both master_fd and stdin.""" - read_from_stdout_fd, mock_stdout_fd = self._pipe() - pty.STDOUT_FILENO = mock_stdout_fd - mock_stdin_fd, write_to_stdin_fd = self._pipe() - pty.STDIN_FILENO = mock_stdin_fd - socketpair = self._socketpair() - masters = [s.fileno() for s in socketpair] - - socketpair[1].close() - os.close(write_to_stdin_fd) - - pty.select = self._mock_select - self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - # We expect that both fds were removed from the fds list as they - # both encountered an EOF before the second select call. - self.select_rfds_lengths.append(0) - - # We expect the function to return without error. - self.assertEqual(pty._copy(masters[0]), None) - def test__restore_tty_mode_normal_return(self): """Test that spawn resets the tty mode no when _copy returns normally.""" diff --git a/Lib/test/test_tty.py b/Lib/test/test_tty.py new file mode 100644 index 00000000000..681772fb519 --- /dev/null +++ b/Lib/test/test_tty.py @@ -0,0 +1,96 @@ +import os +import unittest +from test.support.import_helper import import_module + +termios = import_module('termios') +tty = import_module('tty') + + +@unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") +class TestTty(unittest.TestCase): + + def setUp(self): + master_fd, self.fd = os.openpty() + self.addCleanup(os.close, master_fd) + self.stream = self.enterContext(open(self.fd, 'wb', buffering=0)) + self.fd = self.stream.fileno() + self.mode = termios.tcgetattr(self.fd) + self.addCleanup(termios.tcsetattr, self.fd, termios.TCSANOW, self.mode) + self.addCleanup(termios.tcsetattr, self.fd, termios.TCSAFLUSH, self.mode) + + def check_cbreak(self, mode): + self.assertEqual(mode[3] & termios.ECHO, 0) + self.assertEqual(mode[3] & termios.ICANON, 0) + self.assertEqual(mode[6][termios.VMIN], 1) + self.assertEqual(mode[6][termios.VTIME], 0) + + def check_raw(self, mode): + self.check_cbreak(mode) + self.assertEqual(mode[0] & termios.ISTRIP, 0) + self.assertEqual(mode[0] & termios.ICRNL, 0) + self.assertEqual(mode[1] & termios.OPOST, 0) + self.assertEqual(mode[2] & termios.PARENB, termios.CS8 & termios.PARENB) + self.assertEqual(mode[2] & termios.CSIZE, termios.CS8 & termios.CSIZE) + self.assertEqual(mode[2] & termios.CS8, termios.CS8) + self.assertEqual(mode[3] & termios.ECHO, 0) + self.assertEqual(mode[3] & termios.ICANON, 0) + self.assertEqual(mode[3] & termios.ISIG, 0) + self.assertEqual(mode[6][termios.VMIN], 1) + self.assertEqual(mode[6][termios.VTIME], 0) + + def test_cfmakeraw(self): + mode = termios.tcgetattr(self.fd) + self.assertEqual(mode, self.mode) + tty.cfmakeraw(mode) + self.check_raw(mode) + self.assertEqual(mode[4], self.mode[4]) + self.assertEqual(mode[5], self.mode[5]) + + def test_cfmakecbreak(self): + mode = termios.tcgetattr(self.fd) + self.assertEqual(mode, self.mode) + tty.cfmakecbreak(mode) + self.check_cbreak(mode) + self.assertEqual(mode[1], self.mode[1]) + self.assertEqual(mode[2], self.mode[2]) + self.assertEqual(mode[4], self.mode[4]) + self.assertEqual(mode[5], self.mode[5]) + mode[tty.IFLAG] |= termios.ICRNL + tty.cfmakecbreak(mode) + self.assertEqual(mode[tty.IFLAG] & termios.ICRNL, termios.ICRNL, + msg="ICRNL should not be cleared by cbreak") + mode[tty.IFLAG] &= ~termios.ICRNL + tty.cfmakecbreak(mode) + self.assertEqual(mode[tty.IFLAG] & termios.ICRNL, 0, + msg="ICRNL should not be set by cbreak") + + @unittest.expectedFailure # TODO: RUSTPYTHON TypeError: Expected type "int" but "FileIO" found. + def test_setraw(self): + mode0 = termios.tcgetattr(self.fd) + mode1 = tty.setraw(self.fd) + self.assertEqual(mode1, mode0) + mode2 = termios.tcgetattr(self.fd) + self.check_raw(mode2) + mode3 = tty.setraw(self.fd, termios.TCSANOW) + self.assertEqual(mode3, mode2) + tty.setraw(self.stream) + tty.setraw(fd=self.fd, when=termios.TCSANOW) + + @unittest.expectedFailure # TODO: RUSTPYTHON TypeError: Expected type "int" but "FileIO" found. + def test_setcbreak(self): + mode0 = termios.tcgetattr(self.fd) + mode1 = tty.setcbreak(self.fd) + self.assertEqual(mode1, mode0) + mode2 = termios.tcgetattr(self.fd) + self.check_cbreak(mode2) + ICRNL = termios.ICRNL + self.assertEqual(mode2[tty.IFLAG] & ICRNL, mode0[tty.IFLAG] & ICRNL, + msg="ICRNL should not be altered by cbreak") + mode3 = tty.setcbreak(self.fd, termios.TCSANOW) + self.assertEqual(mode3, mode2) + tty.setcbreak(self.stream) + tty.setcbreak(fd=self.fd, when=termios.TCSANOW) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/tty.py b/Lib/tty.py index a72eb675545..5a49e040042 100644 --- a/Lib/tty.py +++ b/Lib/tty.py @@ -4,9 +4,9 @@ from termios import * -__all__ = ["setraw", "setcbreak"] +__all__ = ["cfmakeraw", "cfmakecbreak", "setraw", "setcbreak"] -# Indexes for termios list. +# Indices for termios list. IFLAG = 0 OFLAG = 1 CFLAG = 2 @@ -15,22 +15,59 @@ OSPEED = 5 CC = 6 -def setraw(fd, when=TCSAFLUSH): - """Put terminal into a raw mode.""" - mode = tcgetattr(fd) - mode[IFLAG] = mode[IFLAG] & ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON) - mode[OFLAG] = mode[OFLAG] & ~(OPOST) - mode[CFLAG] = mode[CFLAG] & ~(CSIZE | PARENB) - mode[CFLAG] = mode[CFLAG] | CS8 - mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON | IEXTEN | ISIG) +def cfmakeraw(mode): + """Make termios mode raw.""" + # Clear all POSIX.1-2017 input mode flags. + # See chapter 11 "General Terminal Interface" + # of POSIX.1-2017 Base Definitions. + mode[IFLAG] &= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP | + INLCR | IGNCR | ICRNL | IXON | IXANY | IXOFF) + + # Do not post-process output. + mode[OFLAG] &= ~OPOST + + # Disable parity generation and detection; clear character size mask; + # let character size be 8 bits. + mode[CFLAG] &= ~(PARENB | CSIZE) + mode[CFLAG] |= CS8 + + # Clear all POSIX.1-2017 local mode flags. + mode[LFLAG] &= ~(ECHO | ECHOE | ECHOK | ECHONL | ICANON | + IEXTEN | ISIG | NOFLSH | TOSTOP) + + # POSIX.1-2017, 11.1.7 Non-Canonical Mode Input Processing, + # Case B: MIN>0, TIME=0 + # A pending read shall block until MIN (here 1) bytes are received, + # or a signal is received. + mode[CC] = list(mode[CC]) mode[CC][VMIN] = 1 mode[CC][VTIME] = 0 - tcsetattr(fd, when, mode) -def setcbreak(fd, when=TCSAFLUSH): - """Put terminal into a cbreak mode.""" - mode = tcgetattr(fd) - mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON) +def cfmakecbreak(mode): + """Make termios mode cbreak.""" + # Do not echo characters; disable canonical input. + mode[LFLAG] &= ~(ECHO | ICANON) + + # POSIX.1-2017, 11.1.7 Non-Canonical Mode Input Processing, + # Case B: MIN>0, TIME=0 + # A pending read shall block until MIN (here 1) bytes are received, + # or a signal is received. + mode[CC] = list(mode[CC]) mode[CC][VMIN] = 1 mode[CC][VTIME] = 0 - tcsetattr(fd, when, mode) + +def setraw(fd, when=TCSAFLUSH): + """Put terminal into raw mode.""" + mode = tcgetattr(fd) + new = list(mode) + cfmakeraw(new) + tcsetattr(fd, when, new) + return mode + +def setcbreak(fd, when=TCSAFLUSH): + """Put terminal into cbreak mode.""" + mode = tcgetattr(fd) + new = list(mode) + cfmakecbreak(new) + tcsetattr(fd, when, new) + return mode