|
22 | 22 |
|
23 | 23 | try: |
24 | 24 | import tgcrypto |
| 25 | + |
| 26 | + log.info("Using TgCrypto") |
| 27 | + |
| 28 | + |
| 29 | + class AES: |
| 30 | + @classmethod |
| 31 | + def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: |
| 32 | + return tgcrypto.ige256_encrypt(data, key, iv) |
| 33 | + |
| 34 | + @classmethod |
| 35 | + def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: |
| 36 | + return tgcrypto.ige256_decrypt(data, key, iv) |
| 37 | + |
| 38 | + @staticmethod |
| 39 | + def ctr256_encrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: |
| 40 | + return tgcrypto.ctr256_encrypt(data, key, iv, state or bytearray(1)) |
| 41 | + |
| 42 | + @staticmethod |
| 43 | + def ctr256_decrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: |
| 44 | + return tgcrypto.ctr256_decrypt(data, key, iv, state or bytearray(1)) |
| 45 | + |
| 46 | + @staticmethod |
| 47 | + def xor(a: bytes, b: bytes) -> bytes: |
| 48 | + return int.to_bytes( |
| 49 | + int.from_bytes(a, "big") ^ int.from_bytes(b, "big"), |
| 50 | + len(a), |
| 51 | + "big", |
| 52 | + ) |
25 | 53 | except ImportError: |
| 54 | + import pyaes |
| 55 | + |
26 | 56 | log.warning( |
27 | 57 | "TgCrypto is missing! " |
28 | 58 | "Pyrogram will work the same, but at a much slower speed. " |
29 | 59 | "More info: https://docs.pyrogram.ml/resources/TgCrypto" |
30 | 60 | ) |
31 | | - is_fast = False |
32 | | - import pyaes |
33 | | -else: |
34 | | - log.info("Using TgCrypto") |
35 | | - is_fast = True |
36 | 61 |
|
37 | 62 |
|
38 | | -# TODO: Ugly IFs |
39 | | -class AES: |
40 | | - @classmethod |
41 | | - def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: |
42 | | - if is_fast: |
43 | | - return tgcrypto.ige256_encrypt(data, key, iv) |
44 | | - else: |
| 63 | + class AES: |
| 64 | + @classmethod |
| 65 | + def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: |
45 | 66 | return cls.ige(data, key, iv, True) |
46 | 67 |
|
47 | | - @classmethod |
48 | | - def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: |
49 | | - if is_fast: |
50 | | - return tgcrypto.ige256_decrypt(data, key, iv) |
51 | | - else: |
| 68 | + @classmethod |
| 69 | + def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: |
52 | 70 | return cls.ige(data, key, iv, False) |
53 | 71 |
|
54 | | - @staticmethod |
55 | | - def ctr256_encrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes: |
56 | | - if is_fast: |
57 | | - return tgcrypto.ctr256_decrypt(data, key, iv, state) |
58 | | - else: |
59 | | - ctr = pyaes.AESModeOfOperationCTR(key) |
60 | | - ctr._counter._counter = list(iv) |
61 | | - return ctr.decrypt(data) |
62 | | - |
63 | | - @staticmethod |
64 | | - def ctr256_decrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes: |
65 | | - return AES.ctr256_encrypt(data, key, iv, state) |
66 | | - |
67 | | - @staticmethod |
68 | | - def xor(a: bytes, b: bytes) -> bytes: |
69 | | - return int.to_bytes( |
70 | | - int.from_bytes(a, "big") ^ int.from_bytes(b, "big"), |
71 | | - len(a), |
72 | | - "big", |
73 | | - ) |
74 | | - |
75 | | - @classmethod |
76 | | - def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes: |
77 | | - cipher = pyaes.AES(key) |
78 | | - |
79 | | - iv_1 = iv[:16] |
80 | | - iv_2 = iv[16:] |
81 | | - |
82 | | - data = [data[i: i + 16] for i in range(0, len(data), 16)] |
83 | | - |
84 | | - if encrypt: |
85 | | - for i, chunk in enumerate(data): |
86 | | - iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2) |
87 | | - iv_2 = chunk |
88 | | - else: |
89 | | - for i, chunk in enumerate(data): |
90 | | - iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1) |
91 | | - iv_1 = chunk |
92 | | - |
93 | | - return b"".join(data) |
| 72 | + @classmethod |
| 73 | + def ctr256_encrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: |
| 74 | + return cls.ctr(data, key, iv, state or bytearray(1)) |
| 75 | + |
| 76 | + @classmethod |
| 77 | + def ctr256_decrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: |
| 78 | + return cls.ctr(data, key, iv, state or bytearray(1)) |
| 79 | + |
| 80 | + @staticmethod |
| 81 | + def xor(a: bytes, b: bytes) -> bytes: |
| 82 | + return int.to_bytes( |
| 83 | + int.from_bytes(a, "big") ^ int.from_bytes(b, "big"), |
| 84 | + len(a), |
| 85 | + "big", |
| 86 | + ) |
| 87 | + |
| 88 | + @classmethod |
| 89 | + def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes: |
| 90 | + cipher = pyaes.AES(key) |
| 91 | + |
| 92 | + iv_1 = iv[:16] |
| 93 | + iv_2 = iv[16:] |
| 94 | + |
| 95 | + data = [data[i: i + 16] for i in range(0, len(data), 16)] |
| 96 | + |
| 97 | + if encrypt: |
| 98 | + for i, chunk in enumerate(data): |
| 99 | + iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2) |
| 100 | + iv_2 = chunk |
| 101 | + else: |
| 102 | + for i, chunk in enumerate(data): |
| 103 | + iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1) |
| 104 | + iv_1 = chunk |
| 105 | + |
| 106 | + return b"".join(data) |
| 107 | + |
| 108 | + @classmethod |
| 109 | + def ctr(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray) -> bytes: |
| 110 | + cipher = pyaes.AES(key) |
| 111 | + |
| 112 | + out = bytearray(data) |
| 113 | + chunk = cipher.encrypt(iv) |
| 114 | + |
| 115 | + for i in range(0, len(data), 16): |
| 116 | + for j in range(0, min(len(data) - i, 16)): |
| 117 | + out[i + j] ^= chunk[state[0]] |
| 118 | + |
| 119 | + state[0] += 1 |
| 120 | + |
| 121 | + if state[0] >= 16: |
| 122 | + state[0] = 0 |
| 123 | + |
| 124 | + if state[0] == 0: |
| 125 | + for k in range(15, -1, -1): |
| 126 | + try: |
| 127 | + iv[k] += 1 |
| 128 | + break |
| 129 | + except ValueError: |
| 130 | + iv[k] = 0 |
| 131 | + |
| 132 | + chunk = cipher.encrypt(iv) |
| 133 | + |
| 134 | + return out |
0 commit comments