-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathVertxMethodHandler.java
More file actions
304 lines (278 loc) · 10.2 KB
/
VertxMethodHandler.java
File metadata and controls
304 lines (278 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
package feign;
import feign.InvocationHandlerFactory.MethodHandler;
import feign.codec.DecodeException;
import feign.codec.Decoder;
import feign.codec.ErrorDecoder;
import feign.vertx.VertxHttpClient;
import io.vertx.core.Future;
import io.vertx.core.VertxException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import static feign.FeignException.errorExecuting;
import static feign.FeignException.errorReading;
import static feign.Util.ensureClosed;
/**
* Method handler for asynchronous HTTP requests via {@link VertxHttpClient}.
* Inspired by {@link SynchronousMethodHandler}.
*
* @author Alexei KLENIN
* @author Gordon McKinney
*/
final class VertxMethodHandler implements MethodHandler {
private static final long MAX_RESPONSE_BUFFER_SIZE = 8192L;
private final MethodMetadata metadata;
private final Target<?> target;
private final VertxHttpClient client;
private final Retryer retryer;
private final List<RequestInterceptor> requestInterceptors;
private final Logger logger;
private final Logger.Level logLevel;
private final RequestTemplate.Factory buildTemplateFromArgs;
private final Decoder decoder;
private final ErrorDecoder errorDecoder;
private final boolean decode404;
private VertxMethodHandler(
final Target<?> target,
final VertxHttpClient client,
final Retryer retryer,
final List<RequestInterceptor> requestInterceptors,
final Logger logger,
final Logger.Level logLevel,
final MethodMetadata metadata,
final RequestTemplate.Factory buildTemplateFromArgs,
final Decoder decoder,
final ErrorDecoder errorDecoder,
final boolean decode404) {
this.target = target;
this.client = client;
this.retryer = retryer;
this.requestInterceptors = requestInterceptors;
this.logger = logger;
this.logLevel = logLevel;
this.metadata = metadata;
this.buildTemplateFromArgs = buildTemplateFromArgs;
this.errorDecoder = errorDecoder;
this.decoder = decoder;
this.decode404 = decode404;
}
@Override
@SuppressWarnings("unchecked")
public Future invoke(final Object[] argv) {
final RequestTemplate template = buildTemplateFromArgs.create(argv);
final Retryer retryer = this.retryer.clone();
final RetryRecoverer recoverer = new RetryRecoverer<>(template, retryer);
return executeAndDecode(template).recover(recoverer);
}
/**
* Executes request from {@code template} with {@code this.client} and decodes the response.
* Result or occurred error wrapped in returned Future.
*
* @param template request template
*
* @return future with decoded result or occurred error
*/
private Future<Object> executeAndDecode(final RequestTemplate template) {
final Request request = targetRequest(template);
logRequest(request);
final Instant start = Instant.now();
return client
.execute(request)
.compose(
response -> {
final long elapsedTime = Duration.between(start, Instant.now()).toMillis();
boolean shouldClose = true;
try {
// TODO: check why this buffering is needed
if (logLevel != Logger.Level.NONE) {
response = logger.logAndRebufferResponse(
metadata.configKey(),
logLevel,
response,
elapsedTime);
}
if (Response.class == metadata.returnType()) {
if (response.body() == null) {
return Future.succeededFuture(response);
} else if (response.body().length() == null
|| response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
shouldClose = false;
return Future.succeededFuture(response);
} else {
return Future.succeededFuture(Response.builder()
.status(response.status())
.reason(response.reason())
.headers(response.headers())
.request(response.request())
.body(response.body())
.build());
}
} else if (response.status() >= 200 && response.status() < 300) {
if (Void.class == metadata.returnType()) {
return Future.succeededFuture();
} else {
return Future.succeededFuture(decode(response, request));
}
} else if (decode404 && response.status() == 404) {
return Future.succeededFuture(decoder.decode(response, metadata.returnType()));
} else {
return Future.failedFuture(errorDecoder.decode(metadata.configKey(), response));
}
} catch (final IOException ioException) {
logIoException(ioException, elapsedTime);
return Future.failedFuture(errorReading(request, response, ioException));
} catch (FeignException exception) {
return Future.failedFuture(exception);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
},
failure -> {
if (failure instanceof VertxException || failure instanceof TimeoutException) {
return Future.failedFuture(failure);
} else if (failure.getCause() instanceof IOException) {
final long elapsedTime = Duration.between(start, Instant.now()).toMillis();
logIoException((IOException) failure.getCause(), elapsedTime);
return Future.failedFuture(errorExecuting(request, (IOException) failure.getCause()));
} else {
return Future.failedFuture(failure.getCause());
}
});
}
/**
* Associates request to defined target.
*
* @param template request template
*
* @return fully formed request
*/
private Request targetRequest(final RequestTemplate template) {
for (final RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template);
}
return target.apply(template);
}
/**
* Transforms HTTP response body into object using decoder.
*
* @param response HTTP response
* @param request HTTP request
*
* @return decoded result
*
* @throws IOException IO exception during the reading of InputStream of response
* @throws DecodeException when decoding failed due to a checked or unchecked exception besides
* IOException
* @throws FeignException when decoding succeeds, but conveys the operation failed
*/
private Object decode(final Response response, final Request request) throws IOException, FeignException {
try {
return decoder.decode(response, metadata.returnType());
} catch (final FeignException feignException) {
/* All feign exception including decode exceptions */
throw feignException;
} catch (final RuntimeException unexpectedException) {
/* Any unexpected exception */
throw new DecodeException(-1, unexpectedException.getMessage(), request, unexpectedException);
}
}
/**
* Logs request.
*
* @param request HTTP request
*/
private void logRequest(final Request request) {
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
}
/**
* Logs IO exception.
*
* @param exception IO exception
* @param elapsedTime time spent to execute request
*/
private void logIoException(final IOException exception, final long elapsedTime) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, exception, elapsedTime);
}
}
/**
* Logs retry.
*/
private void logRetry() {
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
}
static final class Factory {
private final VertxHttpClient client;
private final Retryer retryer;
private final List<RequestInterceptor> requestInterceptors;
private final Logger logger;
private final Logger.Level logLevel;
private final boolean decode404;
Factory(
final VertxHttpClient client,
final Retryer retryer,
final List<RequestInterceptor> requestInterceptors,
final Logger logger,
final Logger.Level logLevel,
final boolean decode404) {
this.client = client;
this.retryer = retryer;
this.requestInterceptors = requestInterceptors;
this.logger = logger;
this.logLevel = logLevel;
this.decode404 = decode404;
}
MethodHandler create(
final Target<?> target,
final MethodMetadata metadata,
final RequestTemplate.Factory buildTemplateFromArgs,
final Decoder decoder,
final ErrorDecoder errorDecoder) {
return new VertxMethodHandler(
target,
client,
retryer,
requestInterceptors,
logger,
logLevel,
metadata,
buildTemplateFromArgs,
decoder,
errorDecoder,
decode404);
}
}
/**
* Handler for failures able to retry execution of request. In this case handler passed to new request.
*
* @param <T> type of response
*/
private final class RetryRecoverer<T> implements Function<Throwable, Future<T>> {
private final RequestTemplate template;
private final Retryer retryer;
private RetryRecoverer(final RequestTemplate template, final Retryer retryer) {
this.template = template;
this.retryer = retryer;
}
@Override
@SuppressWarnings("unchecked")
public Future<T> apply(final Throwable throwable) {
if (throwable instanceof RetryableException) {
this.retryer.continueOrPropagate((RetryableException) throwable);
logRetry();
return ((Future<T>) executeAndDecode(this.template)).recover(this);
} else {
return Future.failedFuture(throwable);
}
}
}
}