forked from ssh-mitm/ssh-mitm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauthentication.py
More file actions
433 lines (378 loc) · 16.6 KB
/
authentication.py
File metadata and controls
433 lines (378 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
import argparse
import logging
import os
import sys
import socket
from typing import (
TYPE_CHECKING,
Optional,
List,
Tuple,
Text
)
from colored.colored import stylize, attr, fg # type: ignore
from paramiko import PKey
from rich._emoji_codes import EMOJI
from enhancements.modules import BaseModule
import paramiko
from sshpubkeys import SSHKey # type: ignore
from typeguard import typechecked
import ssh_proxy_server
from ssh_proxy_server.clients.ssh import SSHClient, AuthenticationMethod
from ssh_proxy_server.exceptions import MissingHostException
if TYPE_CHECKING:
from ssh_proxy_server.session import Session
@typechecked
def probe_host(hostname_or_ip: Text, port: int, username: Text, public_key: paramiko.pkey.PublicBlob) -> bool:
@typechecked
def valid(self, msg: paramiko.message.Message) -> None: # type: ignore
self.auth_event.set()
self.authenticated = True
@typechecked
def parse_service_accept(self, m: paramiko.message.Message) -> None: # type: ignore
# https://tools.ietf.org/html/rfc4252#section-7
service = m.get_text()
if not (service == "ssh-userauth" and self.auth_method == "publickey"):
return self._parse_service_accept(m) # type: ignore
m = paramiko.message.Message()
m.add_byte(paramiko.common.cMSG_USERAUTH_REQUEST)
m.add_string(self.username)
m.add_string("ssh-connection")
m.add_string(self.auth_method)
m.add_boolean(False)
m.add_string(self.private_key.public_blob.key_type)
m.add_string(self.private_key.public_blob.key_blob)
self.transport._send_message(m)
valid_key = False
try:
client_handler_table = paramiko.auth_handler.AuthHandler._client_handler_table # type: ignore
client_handler_table[paramiko.common.MSG_USERAUTH_INFO_REQUEST] = valid
client_handler_table[paramiko.common.MSG_SERVICE_ACCEPT] = parse_service_accept
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((hostname_or_ip, port))
transport = paramiko.transport.Transport(sock)
transport.start_client()
# For compatibility with paramiko, we need to generate a random private key and replace
# the public key with our data.
key: PKey = paramiko.RSAKey.generate(2048)
key.public_blob = public_key
transport.auth_publickey(username, key)
valid_key = True
except paramiko.ssh_exception.AuthenticationException:
pass
finally:
client_handler_table[paramiko.common.MSG_USERAUTH_INFO_REQUEST] = paramiko.auth_handler.AuthHandler._parse_userauth_info_request # type: ignore
client_handler_table[paramiko.common.MSG_SERVICE_ACCEPT] = paramiko.auth_handler.AuthHandler._parse_service_accept # type: ignore
return valid_key
class RemoteCredentials():
@typechecked
def __init__(
self, *,
username: Text,
password: Optional[Text] = None,
key: Optional[PKey]=None,
host: Optional[Text] = None,
port: Optional[int] = None
) -> None:
self.username: Text = username
self.password: Optional[Text] = password
self.key: Optional[PKey] = key
self.host: Optional[Text] = host
self.port: Optional[int] = port
class Authenticator(BaseModule):
REQUEST_AGENT_BREAKIN = False
@classmethod
@typechecked
def parser_arguments(cls) -> None:
plugin_group = cls.parser().add_argument_group(
cls.__name__,
"options for remote authentication"
)
plugin_group.add_argument(
'--remote-host',
dest='remote_host',
help='remote host to connect to (default 127.0.0.1)'
)
plugin_group.add_argument(
'--remote-port',
type=int,
dest='remote_port',
help='remote port to connect to (default 22)'
)
plugin_group.add_argument(
'--auth-username',
dest='auth_username',
help='username for remote authentication'
)
plugin_group.add_argument(
'--auth-password',
dest='auth_password',
help='password for remote authentication'
)
plugin_group.add_argument(
'--hide-credentials',
dest='auth_hide_credentials',
action='store_true',
help='do not log credentials (usefull for presentations)'
)
honeypot_group = cls.parser().add_argument_group('Authentication Fallback')
honeypot_group.add_argument(
'--enable-auth-fallback',
action='store_true',
default=False,
help="use a honeypot if no agent was forwarded to login with publickey auth "
)
honeypot_group.add_argument(
'--fallback-host',
dest='fallback_host',
required='--enable-auth-fallback' in sys.argv,
help='fallback host for the honeypot'
)
honeypot_group.add_argument(
'--fallback-port',
dest='fallback_port',
type=int,
default=22,
help='fallback port for the honeypot'
)
honeypot_group.add_argument(
'--fallback-username',
dest='fallback_username',
required='--enable-auth-fallback' in sys.argv,
help='username for the honeypot'
)
honeypot_group.add_argument(
'--fallback-password',
dest='fallback_password',
required='--enable-auth-fallback' in sys.argv,
help='password for the honeypot'
)
@typechecked
def __init__(self, session: 'ssh_proxy_server.session.Session') -> None:
super().__init__()
self.session = session
@typechecked
def get_remote_host_credentials(
self,
username: Text,
password: Optional[Text] = None,
key: Optional[PKey] = None
) -> RemoteCredentials:
if self.session.proxyserver.transparent:
return RemoteCredentials(
username=self.args.auth_username or username,
password=self.args.auth_password or password,
key=key,
host=self.args.remote_host or self.session.socket_remote_address[0],
port=self.args.remote_port or self.session.socket_remote_address[1]
)
return RemoteCredentials(
username=self.args.auth_username or username,
password=self.args.auth_password or password,
key=key,
host=self.args.remote_host or '127.0.0.1',
port=self.args.remote_port or 22
)
@classmethod
@typechecked
def get_auth_methods(cls, host: Text, port: int) -> Optional[List[Text]]:
auth_methods = None
t = paramiko.Transport((host, port))
try:
t.connect()
except paramiko.ssh_exception.SSHException:
t.close()
return auth_methods
try:
t.auth_none('')
except paramiko.BadAuthenticationType as err:
auth_methods = err.allowed_types
finally:
t.close()
return auth_methods
@typechecked
def authenticate(
self,
username: Optional[Text] = None,
password: Optional[Text] = None,
key: Optional[PKey] = None,
store_credentials: bool = True
) -> int:
if store_credentials:
self.session.username_provided = username
self.session.password_provided = password
if username:
remote_credentials: RemoteCredentials = self.get_remote_host_credentials(username, password, key)
self.session.username = remote_credentials.username
self.session.password = remote_credentials.password
self.session.remote_key = remote_credentials.key
self.session.remote_address = (remote_credentials.host, remote_credentials.port)
if key and not self.session.remote_key:
self.session.remote_key = key
if self.session.remote_address[0] is None or self.session.remote_address[1] is None:
logging.error("no remote host")
return paramiko.common.AUTH_FAILED
try:
if self.session.agent:
return self.auth_agent(
self.session.username,
self.session.remote_address[0],
self.session.remote_address[1]
)
if self.session.password:
return self.auth_password(
self.session.username,
self.session.remote_address[0],
self.session.remote_address[1],
self.session.password
)
if self.session.remote_key:
return self.auth_publickey(
self.session.username,
self.session.remote_address[0],
self.session.remote_address[1],
self.session.remote_key
)
except MissingHostException:
logging.error("no remote host")
except Exception:
logging.exception("internal error, abort authentication!")
return paramiko.common.AUTH_FAILED
@typechecked
def auth_agent(self, username: Text, host: Text, port: int) -> int:
raise NotImplementedError("authentication must be implemented")
@typechecked
def auth_password(self, username: Text, host: Text, port: int, password: Text) -> int:
raise NotImplementedError("authentication must be implemented")
@typechecked
def auth_publickey(self, username: Text, host: Text, port: int, key: PKey) -> int:
raise NotImplementedError("authentication must be implemented")
@typechecked
def auth_fallback(self, username: Text) -> int:
if not self.args.fallback_host:
logging.error("\n".join([
stylize(EMOJI['exclamation'] + " ssh agent not forwarded. Login to remote host not possible with publickey authentication.", fg('red') + attr('bold')),
stylize(EMOJI['information'] + " To intercept clients without a forwarded agent, you can provide credentials for a honeypot.", fg('yellow') + attr('bold'))
]))
return paramiko.common.AUTH_FAILED
auth_status = self.connect(
user=self.args.fallback_username or username,
password=self.args.fallback_password,
host=self.args.fallback_host,
port=self.args.fallback_port,
method=AuthenticationMethod.password
)
if auth_status == paramiko.common.AUTH_SUCCESSFUL:
logging.warning(
stylize(EMOJI['warning'] + " publickey authentication failed - no agent forwarded - connecting to honeypot!", fg('yellow') + attr('bold')),
)
else:
logging.error(
stylize(EMOJI['exclamation'] + " Authentication against honeypot failed!", fg('red') + attr('bold')),
)
return auth_status
@typechecked
def connect(self, user: Text, host: Text, port: int, method: AuthenticationMethod, password: Optional[Text] = None, key: Optional[PKey] = None) -> int:
if not host:
raise MissingHostException()
auth_status = paramiko.common.AUTH_FAILED
self.session.ssh_client = SSHClient(
host,
port,
method,
password,
user,
key,
self.session
)
self.pre_auth_action()
try:
if self.session.ssh_client is not None and self.session.ssh_client.connect():
auth_status = paramiko.common.AUTH_SUCCESSFUL
except paramiko.SSHException:
logging.error(stylize("Connection to remote server refused", fg('red') + attr('bold')))
return paramiko.common.AUTH_FAILED
self.post_auth_action(auth_status == paramiko.common.AUTH_SUCCESSFUL)
return auth_status
@typechecked
def pre_auth_action(self) -> None:
pass
@typechecked
def post_auth_action(self, success: bool) -> None:
pass
class AuthenticatorPassThrough(Authenticator):
"""pass the authentication to the remote server (reuses the credentials)
"""
@typechecked
def auth_agent(self, username: Text, host: Text, port: int) -> int:
return self.connect(username, host, port, AuthenticationMethod.agent)
@typechecked
def auth_password(self, username: Text, host: Text, port: int, password: Text) -> int:
return self.connect(username, host, port, AuthenticationMethod.password, password=password)
@typechecked
def auth_publickey(self, username: Text, host: Text, port: int, key: PKey) -> int:
ssh_pub_key = SSHKey(f"{key.get_name()} {key.get_base64()}")
ssh_pub_key.parse()
if key.can_sign():
logging.debug("AuthenticatorPassThrough.auth_publickey: username=%s, key=%s %s %sbits", username, key.get_name(), ssh_pub_key.hash_sha256(), ssh_pub_key.bits)
return self.connect(username, host, port, AuthenticationMethod.publickey, key=key)
# Ein Publickey wird nur direkt von check_auth_publickey
# übergeben. In dem Fall müssen wir den Client authentifizieren,
# damit wir auf den Agent warten können!
publickey = paramiko.pkey.PublicBlob(key.get_name(), key.asbytes())
if probe_host(host, port, username, publickey):
logging.debug(f"Found valid key for host {host}:{port} username={username}, key={key.get_name()} {ssh_pub_key.hash_sha256()} {ssh_pub_key.bits}bits")
return paramiko.common.AUTH_SUCCESSFUL
return paramiko.common.AUTH_FAILED
@typechecked
def post_auth_action(self, success: bool) -> None:
@typechecked
def get_agent_pubkeys() -> List[Tuple[Text, SSHKey, bool, Text]]:
pubkeyfile_path = None
keys_parsed: List[Tuple[Text, SSHKey, bool, Text]] = []
if self.session.agent is None:
return keys_parsed
keys = self.session.agent.get_keys()
for k in keys:
ssh_pub_key = SSHKey(f"{k.get_name()} {k.get_base64()}")
ssh_pub_key.parse()
keys_parsed.append((k.get_name(), ssh_pub_key, k.can_sign(), k.get_base64()))
if self.session.session_log_dir:
os.makedirs(self.session.session_log_dir, exist_ok=True)
pubkeyfile_path = os.path.join(self.session.session_log_dir, 'publickeys')
with open(pubkeyfile_path, 'a+') as pubkeyfile:
pubkeyfile.write("".join([
f"{k[0]} {k[3]} saved-from-agent\n"
for k in keys_parsed
]))
return keys_parsed
logmessage = []
if success:
logmessage.append(stylize("Remote authentication succeeded", fg('green') + attr('bold')))
else:
logmessage.append(stylize("Remote authentication failed", fg('red')))
if self.session.ssh_client is not None:
logmessage.append(f"\tRemote Address: {self.session.ssh_client.host}:{self.session.ssh_client.port}")
logmessage.append(f"\tUsername: {self.session.username_provided}")
if self.session.password_provided:
display_password = None
if not self.args.auth_hide_credentials:
display_password = self.session.password_provided
logmessage.append(f"\tPassword: {display_password or stylize('*******', fg('dark_gray'))}")
if self.session.accepted_key is not None and self.session.remote_key != self.session.accepted_key:
ssh_pub_key = SSHKey(f"{self.session.accepted_key.get_name()} {self.session.accepted_key.get_base64()}")
ssh_pub_key.parse()
logmessage.append(f"\tAccepted-Publickey: {self.session.accepted_key.get_name()} {ssh_pub_key.hash_sha256()} {ssh_pub_key.bits}bits")
if self.session.remote_key is not None:
ssh_pub_key = SSHKey(f"{self.session.remote_key.get_name()} {self.session.remote_key.get_base64()}")
ssh_pub_key.parse()
logmessage.append(f"\tRemote-Publickey: {self.session.remote_key.get_name()} {ssh_pub_key.hash_sha256()} {ssh_pub_key.bits}bits")
ssh_keys = None
if self.session.agent:
ssh_keys = get_agent_pubkeys()
logmessage.append(f"\tAgent: {f'available keys: {len(ssh_keys or [])}' if ssh_keys else 'no agent'}")
if ssh_keys is not None:
logmessage.append("\n".join(
[f"\t\tAgent-Key: {k[0]} {k[1].hash_sha256()} {k[1].bits}bits, can sign: {k[2]}" for k in ssh_keys]
))
logging.info("\n".join(logmessage))