forked from localstack/localstack
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproxy.py
More file actions
97 lines (72 loc) · 3.5 KB
/
proxy.py
File metadata and controls
97 lines (72 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Adapters and other utilities to use ASF together with the edge proxy.
"""
import logging
from typing import Any, Optional
from botocore.model import ServiceModel
from werkzeug.datastructures import Headers
from localstack import constants
from localstack.aws.api import RequestContext
from localstack.aws.skeleton import Skeleton
from localstack.aws.spec import load_service
from localstack.http import Request, Response
from localstack.http.adapters import ProxyListenerAdapter
from localstack.services.generic_proxy import ProxyListener
from localstack.services.messages import MessagePayload
from localstack.utils.aws.request_context import extract_region_from_headers
from localstack.utils.persistence import PersistingProxyListener
LOG = logging.getLogger(__name__)
def get_region(request: Request) -> str:
return extract_region_from_headers(request.headers)
def get_account_id(_: Request) -> str:
# TODO: at some point we may want to get the account id from credentials
return constants.TEST_AWS_ACCOUNT_ID
class AwsApiListener(ProxyListenerAdapter):
service: ServiceModel
def __init__(self, api: str, delegate: Any):
self.service = load_service(api)
self.skeleton = Skeleton(self.service, delegate)
def request(self, request: Request) -> Response:
context = self.create_request_context(request)
response = self.skeleton.invoke(context)
return response
def create_request_context(self, request: Request) -> RequestContext:
context = RequestContext()
context.service = self.service
context.request = request
context.region = get_region(request)
context.account_id = get_account_id(request)
return context
def _raise_not_implemented_error(*args, **kwargs):
raise NotImplementedError
class AsfWithFallbackListener(AwsApiListener):
"""
An AwsApiListener that does not return a default error response if a particular method has not been implemented,
but instead calls a second ProxyListener. This is useful to migrate service providers to ASF providers.
"""
api: str
delegate: Any
fallback: ProxyListener
def __init__(self, api: str, delegate: Any, fallback: ProxyListener):
super().__init__(api, delegate)
self.fallback = fallback
self.skeleton.on_not_implemented_error = _raise_not_implemented_error
def forward_request(self, method, path, data, headers):
try:
return super().forward_request(method, path, data, headers)
except (NotImplementedError, KeyError):
# FIXME: KeyError may be an ASF parser error, that indicates that the request cannot be parsed
LOG.debug("no ASF handler for %s %s, using fallback listener", method, path)
return self.fallback.forward_request(method, path, data, headers)
def return_response(
self, method: str, path: str, data: MessagePayload, headers: Headers, response: Response
) -> Optional[Response]:
return self.fallback.return_response(method, path, data, headers, response)
def get_forward_url(self, method: str, path: str, data, headers):
return self.fallback.get_forward_url(method, path, data, headers)
class AsfWithPersistingFallbackListener(AsfWithFallbackListener, PersistingProxyListener):
fallback: PersistingProxyListener
def __init__(self, api: str, delegate: Any, fallback: PersistingProxyListener):
super().__init__(api, delegate, fallback)
def api_name(self):
return self.fallback.api_name()