-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathtest_scripts.py
More file actions
executable file
·171 lines (142 loc) · 6.58 KB
/
test_scripts.py
File metadata and controls
executable file
·171 lines (142 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import unittest
import ethtool
import sys
import os
from io import TextIOWrapper, BytesIO
try:
from imp import load_source
except ImportError:
import importlib.machinery
import importlib.util
# from https://docs.python.org/3.12/whatsnew/3.12.html#imp
def load_source(modname, filename):
loader = importlib.machinery.SourceFileLoader(modname, filename)
spec = importlib.util.spec_from_file_location(modname, filename, loader=loader)
module = importlib.util.module_from_spec(spec)
# The module is always executed and not cached in sys.modules.
# Uncomment the following line to cache the module.
# sys.modules[module.__name__] = module
loader.exec_module(module)
return module
pifc = load_source('pifc', 'scripts/pifconfig')
peth = load_source('peth', 'scripts/pethtool')
def find_suitable_device():
active_devices = ethtool.get_active_devices()
for device in active_devices:
device_flags = ethtool.get_flags(device)
str_flags = pifc.flags2str(device_flags)
try:
wireless_protocol = ethtool.get_wireless_protocol(device)
except Exception:
wireless_protocol = None
if 'UP' in str_flags and 'RUNNING' in str_flags \
and 'BROADCAST' in str_flags and not wireless_protocol:
return device
return None
loopback = 'lo'
device = find_suitable_device()
class ScriptsTests(unittest.TestCase):
def setUp(self):
self._old_stdout = sys.stdout
self._stdout = sys.stdout = TextIOWrapper(BytesIO(), sys.stdout.encoding)
def _output(self):
self._stdout.seek(0)
return self._stdout.read()
def tearDown(self):
sys.stdout = self._old_stdout
self._stdout.close()
def test_flags2str(self):
self.assertEqual(pifc.flags2str(0), '')
self.assertEqual(pifc.flags2str(73), 'UP LOOPBACK RUNNING')
self.assertEqual(pifc.flags2str(4305), 'UP POINTOPOINT RUNNING NOARP MULTICAST')
self.assertEqual(pifc.flags2str(4163), 'UP BROADCAST RUNNING MULTICAST')
# Tests for loopback
def test_driver_lo(self):
self.assertIsNone(peth.show_driver(loopback))
self.assertEqual(self._output(),
'driver: not implemented\nbus-info: not available\n'
)
def test_show_ring_lo(self):
self.assertIsNone(peth.show_ring(loopback))
self.assertEqual(self._output(),
'Ring parameters for {}:\n NOT supported!\n'.format(loopback)
)
def test_show_coalesce_lo(self):
self.assertIsNone(peth.show_coalesce(loopback))
self.assertEqual(self._output(),
'Coalesce parameters for {}:\n NOT supported!\n'.format(loopback)
)
def test_show_offload_lo(self):
self.assertIsNone(peth.show_offload(loopback))
# Check if we have rights to obtain ufo and set proper expected output
try:
ethtool.get_ufo(loopback)
expected_ufo = 'on'
except (OSError, IOError):
expected_ufo = 'not supported'
self.assertEqual(self._output(),
'''scatter-gather: on
tcp segmentation offload: on
udp fragmentation offload: {expected_ufo}
generic segmentation offload: on
generic receive offload: on
'''.format(expected_ufo=expected_ufo)
)
# Tests for another device
if device:
def test_driver_eth(self):
self.assertIsNone(peth.show_driver(device))
expected_lines_start = ['driver: ', 'bus-info: ']
lines = self._output().split('\n')
for expected_start, line in zip(expected_lines_start, lines):
self.assertTrue(line.startswith(expected_start))
def test_show_ring_eth(self):
self.assertIsNone(peth.show_ring(device))
expected_lines_start = ['Ring parameters for ',
'Pre-set maximums:', 'RX:', 'RX Mini:',
'RX Jumbo:', 'TX', 'Current hardware settings:',
'RX:', 'RX Mini:', 'RX Jumbo:', 'TX:']
lines = self._output().split('\n')
for expected_start, line in zip(expected_lines_start, lines):
self.assertTrue(line.startswith(expected_start))
@unittest.skipIf('TRAVIS' in os.environ and os.environ['TRAVIS'] == 'true',
'Skipping this test on Travis CI because show '
'coalesce is not supported on ethernet device in VM.')
def test_show_coalesce_eth(self):
self.assertIsNone(peth.show_coalesce(device))
expected_lines_start = ['Coalesce parameters for',
'Adaptive RX:',
'stats-block-usecs:',
'sample-interval:',
'pkt-rate-low:',
'pkt-rate-high:',
'rx-usecs:',
'rx-frames:',
'rx-usecs-irq:',
'rx-frames-irq:',
'tx-usecs:',
'tx-frames:',
'tx-usecs-irq:',
'tx-frames-irq:',
'rx-usecs-low:',
'rx-frame-low:',
'tx-usecs-low:',
'tx-frame-low:',
'rx-usecs-high:',
'rx-frame-high:',
'tx-usecs-high:',
'tx-frame-high:']
lines = self._output().split('\n')
for expected_start, line in zip(expected_lines_start, lines):
self.assertTrue(line.startswith(expected_start))
def test_show_offload_eth(self):
self.assertIsNone(peth.show_offload(device))
expected_lines_start = ['scatter-gather:',
'tcp segmentation offload:',
'udp fragmentation offload:',
'generic segmentation offload:']
lines = self._output().split('\n')
for expected_start, line in zip(expected_lines_start, lines):
self.assertTrue(line.startswith(expected_start))
if __name__ == '__main__':
unittest.main()