Skip to content

Commit 2589b95

Browse files
author
Grzegorz Kołakowski
committed
Refactoring
1 parent 1ad0fa9 commit 2589b95

File tree

3 files changed

+35
-37
lines changed

3 files changed

+35
-37
lines changed

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ
4646
private final Counter numRecordsSendErrorsCounter;
4747

4848
public HttpSinkWriter(
49-
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
50-
Sink.InitContext context,
51-
int maxBatchSize,
52-
int maxInFlightRequests,
53-
int maxBufferedRequests,
54-
long maxBatchSizeInBytes,
55-
long maxTimeInBufferMS,
56-
long maxRecordSizeInBytes,
57-
String endpointUrl,
58-
SinkHttpClient sinkHttpClient,
59-
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
60-
Properties properties) {
49+
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
50+
Sink.InitContext context,
51+
int maxBatchSize,
52+
int maxInFlightRequests,
53+
int maxBufferedRequests,
54+
long maxBatchSizeInBytes,
55+
long maxTimeInBufferMS,
56+
long maxRecordSizeInBytes,
57+
String endpointUrl,
58+
SinkHttpClient sinkHttpClient,
59+
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
60+
Properties properties) {
6161

6262
super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
6363
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
@@ -82,8 +82,8 @@ public HttpSinkWriter(
8282
// TODO: Reintroduce retries by adding backoff policy
8383
@Override
8484
protected void submitRequestEntries(
85-
List<HttpSinkRequestEntry> requestEntries,
86-
Consumer<List<HttpSinkRequestEntry>> requestResult) {
85+
List<HttpSinkRequestEntry> requestEntries,
86+
Consumer<List<HttpSinkRequestEntry>> requestResult) {
8787
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
8888
future.whenCompleteAsync((response, err) -> {
8989
if (err != null) {

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,12 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
110110
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
111111

112112
// TODO Add response processor here and orchestrate it with statusCodeChecker.
113-
if (optResponse.isEmpty() ||
114-
statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals(
115-
HttpResponseStatus.FAILURE_RETRYABLE) ||
116-
statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals(
117-
HttpResponseStatus.FAILURE_NOT_RETRYABLE)) {
118-
failedResponses.add(sinkRequestEntry);
119-
} else {
113+
if (optResponse.isPresent() &&
114+
statusCodeChecker.checkStatus(optResponse.get().statusCode())
115+
.equals(HttpResponseStatus.SUCCESS)) {
120116
successfulResponses.add(sinkRequestEntry);
117+
} else {
118+
failedResponses.add(sinkRequestEntry);
121119
}
122120
}
123121

src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java

+16-16
Original file line numberDiff line numberDiff line change
@@ -88,25 +88,25 @@ private Optional<Predicate<Integer>> prepareErrorCodes(String statusCodesStr) {
8888
return Arrays.stream(statusCodesStr.split(HttpConnectorConfigConstants.PROP_DELIM))
8989
.filter(code -> !isNullOrWhitespaceOnly(code))
9090
.map(code -> code.toUpperCase().trim())
91-
.map(codeStr -> {
92-
Preconditions.checkArgument(
93-
codeStr.length() == 3,
94-
"Status code should contain three characters. Provided [%s]",
95-
codeStr);
96-
97-
// at this point we have trim, upper case 3 character status code.
98-
if (isTypeCode(codeStr)) {
99-
int code = Integer.parseInt(codeStr.replace("X", ""));
100-
return new TypeStatusCodeCheckerPredicate(
101-
HttpResponseCodeType.getByCode(code));
102-
} else {
103-
return new SingleValueHttpStatusCodeCheckerPredicate(
104-
Integer.parseInt(codeStr));
105-
}
106-
})
91+
.map(this::prepareErrorCode)
10792
.reduce(Predicate::or);
10893
}
10994

95+
private Predicate<Integer> prepareErrorCode(String codeString) {
96+
Preconditions.checkArgument(
97+
codeString.length() == 3,
98+
"Status code should contain three characters. Provided [%s]",
99+
codeString);
100+
101+
// at this point we have trim, upper case 3 character status code.
102+
if (isTypeCode(codeString)) {
103+
int code = Integer.parseInt(codeString.replace("X", ""));
104+
return new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.getByCode(code));
105+
} else {
106+
return new SingleValueHttpStatusCodeCheckerPredicate(Integer.parseInt(codeString));
107+
}
108+
}
109+
110110
/**
111111
* This method checks if "code" param matches "digit + XX" mask. This method expects that
112112
* provided string will be 3 elements long, trim and upper case.

0 commit comments

Comments
 (0)