|
5 | 5 | from io import BytesIO |
6 | 6 | import os |
7 | 7 | import pickle |
| 8 | +import glob |
8 | 9 | import random |
9 | 10 | import subprocess |
10 | 11 | import sys |
@@ -51,6 +52,19 @@ class BaseTest(unittest.TestCase): |
51 | 52 | EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' |
52 | 53 | BAD_DATA = b'this is not a valid bzip2 file' |
53 | 54 |
|
| 55 | + # Some tests need more than one block of uncompressed data. Since one block |
| 56 | + # is at least 100 kB, we gather some data dynamically and compress it. |
| 57 | + # Note that this assumes that compression works correctly, so we cannot |
| 58 | + # simply use the bigger test data for all tests. |
| 59 | + test_size = 0 |
| 60 | + BIG_TEXT = bytearray(128*1024) |
| 61 | + for fname in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')): |
| 62 | + with open(fname, 'rb') as fh: |
| 63 | + test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) |
| 64 | + if test_size > 128*1024: |
| 65 | + break |
| 66 | + BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) |
| 67 | + |
54 | 68 | def setUp(self): |
55 | 69 | self.filename = support.TESTFN |
56 | 70 |
|
@@ -707,6 +721,95 @@ def testPickle(self): |
707 | 721 | with self.assertRaises(TypeError): |
708 | 722 | pickle.dumps(BZ2Decompressor(), proto) |
709 | 723 |
|
| 724 | + def testDecompressorChunksMaxsize(self): |
| 725 | + bzd = BZ2Decompressor() |
| 726 | + max_length = 100 |
| 727 | + out = [] |
| 728 | + |
| 729 | + # Feed some input |
| 730 | + len_ = len(self.BIG_DATA) - 64 |
| 731 | + out.append(bzd.decompress(self.BIG_DATA[:len_], |
| 732 | + max_length=max_length)) |
| 733 | + self.assertFalse(bzd.needs_input) |
| 734 | + self.assertEqual(len(out[-1]), max_length) |
| 735 | + |
| 736 | + # Retrieve more data without providing more input |
| 737 | + out.append(bzd.decompress(b'', max_length=max_length)) |
| 738 | + self.assertFalse(bzd.needs_input) |
| 739 | + self.assertEqual(len(out[-1]), max_length) |
| 740 | + |
| 741 | + # Retrieve more data while providing more input |
| 742 | + out.append(bzd.decompress(self.BIG_DATA[len_:], |
| 743 | + max_length=max_length)) |
| 744 | + self.assertLessEqual(len(out[-1]), max_length) |
| 745 | + |
| 746 | + # Retrieve remaining uncompressed data |
| 747 | + while not bzd.eof: |
| 748 | + out.append(bzd.decompress(b'', max_length=max_length)) |
| 749 | + self.assertLessEqual(len(out[-1]), max_length) |
| 750 | + |
| 751 | + out = b"".join(out) |
| 752 | + self.assertEqual(out, self.BIG_TEXT) |
| 753 | + self.assertEqual(bzd.unused_data, b"") |
| 754 | + |
| 755 | + def test_decompressor_inputbuf_1(self): |
| 756 | + # Test reusing input buffer after moving existing |
| 757 | + # contents to beginning |
| 758 | + bzd = BZ2Decompressor() |
| 759 | + out = [] |
| 760 | + |
| 761 | + # Create input buffer and fill it |
| 762 | + self.assertEqual(bzd.decompress(self.DATA[:100], |
| 763 | + max_length=0), b'') |
| 764 | + |
| 765 | + # Retrieve some results, freeing capacity at beginning |
| 766 | + # of input buffer |
| 767 | + out.append(bzd.decompress(b'', 2)) |
| 768 | + |
| 769 | + # Add more data that fits into input buffer after |
| 770 | + # moving existing data to beginning |
| 771 | + out.append(bzd.decompress(self.DATA[100:105], 15)) |
| 772 | + |
| 773 | + # Decompress rest of data |
| 774 | + out.append(bzd.decompress(self.DATA[105:])) |
| 775 | + self.assertEqual(b''.join(out), self.TEXT) |
| 776 | + |
| 777 | + def test_decompressor_inputbuf_2(self): |
| 778 | + # Test reusing input buffer by appending data at the |
| 779 | + # end right away |
| 780 | + bzd = BZ2Decompressor() |
| 781 | + out = [] |
| 782 | + |
| 783 | + # Create input buffer and empty it |
| 784 | + self.assertEqual(bzd.decompress(self.DATA[:200], |
| 785 | + max_length=0), b'') |
| 786 | + out.append(bzd.decompress(b'')) |
| 787 | + |
| 788 | + # Fill buffer with new data |
| 789 | + out.append(bzd.decompress(self.DATA[200:280], 2)) |
| 790 | + |
| 791 | + # Append some more data, not enough to require resize |
| 792 | + out.append(bzd.decompress(self.DATA[280:300], 2)) |
| 793 | + |
| 794 | + # Decompress rest of data |
| 795 | + out.append(bzd.decompress(self.DATA[300:])) |
| 796 | + self.assertEqual(b''.join(out), self.TEXT) |
| 797 | + |
| 798 | + def test_decompressor_inputbuf_3(self): |
| 799 | + # Test reusing input buffer after extending it |
| 800 | + |
| 801 | + bzd = BZ2Decompressor() |
| 802 | + out = [] |
| 803 | + |
| 804 | + # Create almost full input buffer |
| 805 | + out.append(bzd.decompress(self.DATA[:200], 5)) |
| 806 | + |
| 807 | + # Add even more data to it, requiring resize |
| 808 | + out.append(bzd.decompress(self.DATA[200:300], 5)) |
| 809 | + |
| 810 | + # Decompress rest of data |
| 811 | + out.append(bzd.decompress(self.DATA[300:])) |
| 812 | + self.assertEqual(b''.join(out), self.TEXT) |
710 | 813 |
|
711 | 814 | class CompressDecompressTest(BaseTest): |
712 | 815 | def testCompress(self): |
|
0 commit comments