Skip to content

Commit c5911a6

Browse files
committed
Reorganize AES module
1 parent f0c801b commit c5911a6

2 files changed

Lines changed: 103 additions & 60 deletions

File tree

pyrogram/client/client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,11 +1213,13 @@ def get_file(self,
12131213
chunk = r2.bytes
12141214

12151215
# https://core.telegram.org/cdn#decrypting-files
1216-
decrypted_chunk = AES.ctr_decrypt(
1216+
decrypted_chunk = AES.ctr256_decrypt(
12171217
chunk,
12181218
r.encryption_key,
1219-
r.encryption_iv,
1220-
offset
1219+
bytearray(
1220+
r.encryption_iv[:-4]
1221+
+ (offset // 16).to_bytes(4, "big")
1222+
)
12211223
)
12221224

12231225
hashes = session.send(

pyrogram/crypto/aes.py

Lines changed: 98 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,72 +22,113 @@
2222

2323
try:
2424
import tgcrypto
25+
26+
log.info("Using TgCrypto")
27+
28+
29+
class AES:
30+
@classmethod
31+
def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
32+
return tgcrypto.ige256_encrypt(data, key, iv)
33+
34+
@classmethod
35+
def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
36+
return tgcrypto.ige256_decrypt(data, key, iv)
37+
38+
@staticmethod
39+
def ctr256_encrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
40+
return tgcrypto.ctr256_encrypt(data, key, iv, state or bytearray(1))
41+
42+
@staticmethod
43+
def ctr256_decrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
44+
return tgcrypto.ctr256_decrypt(data, key, iv, state or bytearray(1))
45+
46+
@staticmethod
47+
def xor(a: bytes, b: bytes) -> bytes:
48+
return int.to_bytes(
49+
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
50+
len(a),
51+
"big",
52+
)
2553
except ImportError:
54+
import pyaes
55+
2656
log.warning(
2757
"TgCrypto is missing! "
2858
"Pyrogram will work the same, but at a much slower speed. "
2959
"More info: https://docs.pyrogram.ml/resources/TgCrypto"
3060
)
31-
is_fast = False
32-
import pyaes
33-
else:
34-
log.info("Using TgCrypto")
35-
is_fast = True
3661

3762

38-
# TODO: Ugly IFs
39-
class AES:
40-
@classmethod
41-
def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
42-
if is_fast:
43-
return tgcrypto.ige256_encrypt(data, key, iv)
44-
else:
63+
class AES:
64+
@classmethod
65+
def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
4566
return cls.ige(data, key, iv, True)
4667

47-
@classmethod
48-
def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
49-
if is_fast:
50-
return tgcrypto.ige256_decrypt(data, key, iv)
51-
else:
68+
@classmethod
69+
def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
5270
return cls.ige(data, key, iv, False)
5371

54-
@staticmethod
55-
def ctr256_encrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes:
56-
if is_fast:
57-
return tgcrypto.ctr256_decrypt(data, key, iv, state)
58-
else:
59-
ctr = pyaes.AESModeOfOperationCTR(key)
60-
ctr._counter._counter = list(iv)
61-
return ctr.decrypt(data)
62-
63-
@staticmethod
64-
def ctr256_decrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes:
65-
return AES.ctr256_encrypt(data, key, iv, state)
66-
67-
@staticmethod
68-
def xor(a: bytes, b: bytes) -> bytes:
69-
return int.to_bytes(
70-
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
71-
len(a),
72-
"big",
73-
)
74-
75-
@classmethod
76-
def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes:
77-
cipher = pyaes.AES(key)
78-
79-
iv_1 = iv[:16]
80-
iv_2 = iv[16:]
81-
82-
data = [data[i: i + 16] for i in range(0, len(data), 16)]
83-
84-
if encrypt:
85-
for i, chunk in enumerate(data):
86-
iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2)
87-
iv_2 = chunk
88-
else:
89-
for i, chunk in enumerate(data):
90-
iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1)
91-
iv_1 = chunk
92-
93-
return b"".join(data)
72+
@classmethod
73+
def ctr256_encrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
74+
return cls.ctr(data, key, iv, state or bytearray(1))
75+
76+
@classmethod
77+
def ctr256_decrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
78+
return cls.ctr(data, key, iv, state or bytearray(1))
79+
80+
@staticmethod
81+
def xor(a: bytes, b: bytes) -> bytes:
82+
return int.to_bytes(
83+
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
84+
len(a),
85+
"big",
86+
)
87+
88+
@classmethod
89+
def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes:
90+
cipher = pyaes.AES(key)
91+
92+
iv_1 = iv[:16]
93+
iv_2 = iv[16:]
94+
95+
data = [data[i: i + 16] for i in range(0, len(data), 16)]
96+
97+
if encrypt:
98+
for i, chunk in enumerate(data):
99+
iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2)
100+
iv_2 = chunk
101+
else:
102+
for i, chunk in enumerate(data):
103+
iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1)
104+
iv_1 = chunk
105+
106+
return b"".join(data)
107+
108+
@classmethod
109+
def ctr(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray) -> bytes:
110+
cipher = pyaes.AES(key)
111+
112+
out = bytearray(data)
113+
chunk = cipher.encrypt(iv)
114+
115+
for i in range(0, len(data), 16):
116+
for j in range(0, min(len(data) - i, 16)):
117+
out[i + j] ^= chunk[state[0]]
118+
119+
state[0] += 1
120+
121+
if state[0] >= 16:
122+
state[0] = 0
123+
124+
if state[0] == 0:
125+
for k in range(15, -1, -1):
126+
try:
127+
iv[k] += 1
128+
break
129+
except ValueError:
130+
iv[k] = 0
131+
132+
chunk = cipher.encrypt(iv)
133+
134+
return out

0 commit comments

Comments
 (0)