|
1 | 1 | import array
|
| 2 | +import os |
| 3 | +import struct |
| 4 | +import threading |
2 | 5 | import unittest
|
3 | 6 | from test.support import get_attribute
|
| 7 | +from test.support import threading_helper |
4 | 8 | from test.support.import_helper import import_module
|
5 |
| -import os, struct |
6 | 9 | fcntl = import_module('fcntl')
|
7 | 10 | termios = import_module('termios')
|
8 |
| -get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature |
9 |
| - |
10 |
| -try: |
11 |
| - tty = open("/dev/tty", "rb") |
12 |
| -except OSError: |
13 |
| - raise unittest.SkipTest("Unable to open /dev/tty") |
14 |
| -else: |
15 |
| - with tty: |
16 |
| - # Skip if another process is in foreground |
17 |
| - r = fcntl.ioctl(tty, termios.TIOCGPGRP, struct.pack("i", 0)) |
18 |
| - rpgrp = struct.unpack("i", r)[0] |
19 |
| - if rpgrp not in (os.getpgrp(), os.getsid(0)): |
20 |
| - raise unittest.SkipTest("Neither the process group nor the session " |
21 |
| - "are attached to /dev/tty") |
22 |
| - del tty, r, rpgrp |
23 | 11 |
|
24 | 12 | try:
|
25 | 13 | import pty
|
26 | 14 | except ImportError:
|
27 | 15 | pty = None
|
28 | 16 |
|
29 |
| -class IoctlTests(unittest.TestCase): |
| 17 | +class IoctlTestsTty(unittest.TestCase): |
| 18 | + @classmethod |
| 19 | + def setUpClass(cls): |
| 20 | + TIOCGPGRP = get_attribute(termios, 'TIOCGPGRP') |
| 21 | + try: |
| 22 | + tty = open("/dev/tty", "rb") |
| 23 | + except OSError: |
| 24 | + raise unittest.SkipTest("Unable to open /dev/tty") |
| 25 | + with tty: |
| 26 | + # Skip if another process is in foreground |
| 27 | + r = fcntl.ioctl(tty, TIOCGPGRP, struct.pack("i", 0)) |
| 28 | + rpgrp = struct.unpack("i", r)[0] |
| 29 | + if rpgrp not in (os.getpgrp(), os.getsid(0)): |
| 30 | + raise unittest.SkipTest("Neither the process group nor the session " |
| 31 | + "are attached to /dev/tty") |
| 32 | + |
30 | 33 | def test_ioctl_immutable_buf(self):
|
31 | 34 | # If this process has been put into the background, TIOCGPGRP returns
|
32 | 35 | # the session ID instead of the process group id.
|
@@ -132,23 +135,48 @@ def test_ioctl_mutate_2048(self):
|
132 | 135 | self._check_ioctl_mutate_len(2048)
|
133 | 136 | self.assertRaises(ValueError, self._check_ioctl_not_mutate_len, 2048)
|
134 | 137 |
|
135 |
| - def test_ioctl_tcflush(self): |
136 |
| - with open("/dev/tty", "rb") as tty: |
137 |
| - r = fcntl.ioctl(tty, termios.TCFLSH, termios.TCIFLUSH) |
138 |
| - self.assertEqual(r, 0) |
139 | 138 |
|
140 |
| - @unittest.skipIf(pty is None, 'pty module required') |
| 139 | +@unittest.skipIf(pty is None, 'pty module required') |
| 140 | +class IoctlTestsPty(unittest.TestCase): |
| 141 | + def setUp(self): |
| 142 | + self.master_fd, self.slave_fd = pty.openpty() |
| 143 | + self.addCleanup(os.close, self.slave_fd) |
| 144 | + self.addCleanup(os.close, self.master_fd) |
| 145 | + |
| 146 | + def test_ioctl_clear_input(self): |
| 147 | + os.write(self.slave_fd, b'abcdef') |
| 148 | + self.assertEqual(os.read(self.master_fd, 2), b'ab') |
| 149 | + fcntl.ioctl(self.master_fd, termios.TCFLSH, termios.TCOFLUSH) # don't flush input |
| 150 | + self.assertEqual(os.read(self.master_fd, 2), b'cd') |
| 151 | + fcntl.ioctl(self.master_fd, termios.TCFLSH, termios.TCIFLUSH) # flush input |
| 152 | + os.write(self.slave_fd, b'ABCDEF') |
| 153 | + self.assertEqual(os.read(self.master_fd, 1024), b'ABCDEF') |
| 154 | + |
| 155 | + def test_tcflow_suspend_and_resume_output(self): |
| 156 | + write_suspended = threading.Event() |
| 157 | + write_finished = threading.Event() |
| 158 | + |
| 159 | + def writer(): |
| 160 | + os.write(self.slave_fd, b'abc') |
| 161 | + write_suspended.wait() |
| 162 | + os.write(self.slave_fd, b'def') |
| 163 | + write_finished.set() |
| 164 | + |
| 165 | + with threading_helper.start_threads([threading.Thread(target=writer)]): |
| 166 | + self.assertEqual(os.read(self.master_fd, 1024), b'abc') |
| 167 | + termios.tcflow(self.slave_fd, termios.TCOOFF) |
| 168 | + write_suspended.set() |
| 169 | + self.assertFalse(write_finished.wait(0.5)) |
| 170 | + termios.tcflow(self.slave_fd, termios.TCOON) |
| 171 | + self.assertTrue(write_finished.wait(0.5)) |
| 172 | + self.assertEqual(os.read(self.master_fd, 1024), b'def') |
| 173 | + |
141 | 174 | def test_ioctl_set_window_size(self):
|
142 |
| - mfd, sfd = pty.openpty() |
143 |
| - try: |
144 |
| - # (rows, columns, xpixel, ypixel) |
145 |
| - our_winsz = struct.pack("HHHH", 20, 40, 0, 0) |
146 |
| - result = fcntl.ioctl(mfd, termios.TIOCSWINSZ, our_winsz) |
147 |
| - new_winsz = struct.unpack("HHHH", result) |
148 |
| - self.assertEqual(new_winsz[:2], (20, 40)) |
149 |
| - finally: |
150 |
| - os.close(mfd) |
151 |
| - os.close(sfd) |
| 175 | + # (rows, columns, xpixel, ypixel) |
| 176 | + our_winsz = struct.pack("HHHH", 20, 40, 0, 0) |
| 177 | + result = fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, our_winsz) |
| 178 | + new_winsz = struct.unpack("HHHH", result) |
| 179 | + self.assertEqual(new_winsz[:2], (20, 40)) |
152 | 180 |
|
153 | 181 |
|
154 | 182 | if __name__ == "__main__":
|
|
0 commit comments