Skip to content

Commit 75634e0

Browse files
author
Daniel Bustamante Ospina
committed
Implement custom metrics reporter for message execution and errors
1 parent 9909a24 commit 75634e0

22 files changed

+107
-102
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/CommandListenersConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
77
import org.reactivecommons.async.impl.config.props.AsyncProps;
88
import org.reactivecommons.async.impl.converters.MessageConverter;
9-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
9+
import org.reactivecommons.async.impl.ext.CustomReporter;
1010
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1111
import org.springframework.beans.factory.annotation.Value;
1212
import org.springframework.context.annotation.Bean;
@@ -27,7 +27,7 @@ public class CommandListenersConfig {
2727
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
2828
HandlerResolver resolver, MessageConverter converter,
2929
DiscardNotifier discardNotifier,
30-
CustomErrorReporter errorReporter) {
30+
CustomReporter errorReporter) {
3131
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
3232
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
3333
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter);

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/EventListenersConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
77
import org.reactivecommons.async.impl.config.props.AsyncProps;
88
import org.reactivecommons.async.impl.converters.MessageConverter;
9-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
9+
import org.reactivecommons.async.impl.ext.CustomReporter;
1010
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
1111
import org.springframework.beans.factory.annotation.Value;
1212
import org.springframework.context.annotation.Bean;
@@ -25,7 +25,7 @@ public class EventListenersConfig {
2525

2626
@Bean
2727
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
28-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomErrorReporter errorReporter) {
28+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
2929

3030
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
3131
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/NotificacionListenersConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
77
import org.reactivecommons.async.impl.config.props.AsyncProps;
88
import org.reactivecommons.async.impl.converters.MessageConverter;
9-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
9+
import org.reactivecommons.async.impl.ext.CustomReporter;
1010
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
1111
import org.springframework.beans.factory.annotation.Value;
1212
import org.springframework.context.annotation.Bean;
@@ -25,7 +25,7 @@ public class NotificacionListenersConfig {
2525

2626
@Bean
2727
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
28-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomErrorReporter errorReporter) {
28+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
2929
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
3030
receiver,
3131
asyncProps.getDomain().getEvents().getExchange(),

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/QueryListenerConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
88
import org.reactivecommons.async.impl.config.props.AsyncProps;
99
import org.reactivecommons.async.impl.converters.MessageConverter;
10-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
10+
import org.reactivecommons.async.impl.ext.CustomReporter;
1111
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
1212
import org.springframework.beans.factory.annotation.Value;
1313
import org.springframework.context.annotation.Bean;
@@ -28,7 +28,7 @@ public class QueryListenerConfig {
2828
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
2929
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
3030
DiscardNotifier discardNotifier,
31-
CustomErrorReporter errorReporter) {
31+
CustomReporter errorReporter) {
3232
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
3333
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
3434
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.reactivecommons.async.impl.converters.json.DefaultObjectMapperSupplier;
2323
import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter;
2424
import org.reactivecommons.async.impl.converters.json.ObjectMapperSupplier;
25-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
25+
import org.reactivecommons.async.impl.ext.CustomReporter;
2626
import org.springframework.beans.factory.annotation.Value;
2727
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2828
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -141,8 +141,8 @@ public DiscardNotifier rabbitDiscardNotifier(ObjectMapperSupplier objectMapperSu
141141

142142
@Bean
143143
@ConditionalOnMissingBean
144-
public CustomErrorReporter reactiveCommonsCustomErrorReporter() {
145-
return new CustomErrorReporter() {
144+
public CustomReporter reactiveCommonsCustomErrorReporter() {
145+
return new CustomReporter() {
146146
@Override
147147
public Mono<Void> reportError(Throwable ex, Message rawMessage, Command<?> message, boolean redelivered) {
148148
return Mono.empty();

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/CommandListenersConfigTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import org.reactivecommons.async.impl.communications.TopologyCreator;
1313
import org.reactivecommons.async.impl.config.props.AsyncProps;
1414
import org.reactivecommons.async.impl.converters.MessageConverter;
15-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
15+
import org.reactivecommons.async.impl.ext.CustomReporter;
1616
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1717
import reactor.core.publisher.Flux;
1818
import reactor.core.publisher.Mono;
@@ -38,7 +38,7 @@ public class CommandListenersConfigTest {
3838
private final HandlerResolver handlerResolver = mock(HandlerResolver.class);
3939
private final MessageConverter messageConverter = mock(MessageConverter.class);
4040
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
41-
private final CustomErrorReporter customErrorReporter = mock(CustomErrorReporter.class);
41+
private final CustomReporter customReporter = mock(CustomReporter.class);
4242
private final Receiver receiver = mock(Receiver.class);
4343

4444
@BeforeEach
@@ -62,7 +62,7 @@ public void applicationCommandListener() {
6262
handlerResolver,
6363
messageConverter,
6464
discardNotifier,
65-
customErrorReporter
65+
customReporter
6666
);
6767
Assertions.assertThat(commandListener).isNotNull();
6868
}

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/EventListenersConfigTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.reactivecommons.async.impl.communications.TopologyCreator;
1111
import org.reactivecommons.async.impl.config.props.AsyncProps;
1212
import org.reactivecommons.async.impl.converters.MessageConverter;
13-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
13+
import org.reactivecommons.async.impl.ext.CustomReporter;
1414
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
1515
import reactor.core.publisher.Flux;
1616
import reactor.core.publisher.Mono;
@@ -31,7 +31,7 @@ public class EventListenersConfigTest {
3131
private final HandlerResolver handlerResolver = mock(HandlerResolver.class);
3232
private final MessageConverter messageConverter = mock(MessageConverter.class);
3333
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
34-
private final CustomErrorReporter customErrorReporter = mock(CustomErrorReporter.class);
34+
private final CustomReporter customReporter = mock(CustomReporter.class);
3535
private final Receiver receiver = mock(Receiver.class);
3636

3737
@BeforeEach
@@ -56,7 +56,7 @@ public void eventListener() {
5656
messageConverter,
5757
listener,
5858
discardNotifier,
59-
customErrorReporter
59+
customReporter
6060
);
6161

6262
Assertions.assertThat(eventListener).isNotNull();

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/NotificacionListenersConfigTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.reactivecommons.async.impl.communications.TopologyCreator;
1111
import org.reactivecommons.async.impl.config.props.AsyncProps;
1212
import org.reactivecommons.async.impl.converters.MessageConverter;
13-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
13+
import org.reactivecommons.async.impl.ext.CustomReporter;
1414
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
1515
import reactor.core.publisher.Flux;
1616
import reactor.core.publisher.Mono;
@@ -31,7 +31,7 @@ public class NotificacionListenersConfigTest {
3131
private final HandlerResolver handlerResolver = mock(HandlerResolver.class);
3232
private final MessageConverter messageConverter = mock(MessageConverter.class);
3333
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
34-
private final CustomErrorReporter customErrorReporter = mock(CustomErrorReporter.class);
34+
private final CustomReporter customReporter = mock(CustomReporter.class);
3535
private final Receiver receiver = mock(Receiver.class);
3636

3737
@BeforeEach
@@ -52,7 +52,7 @@ public void init() {
5252
@Test
5353
public void eventNotificationListener() {
5454
final ApplicationNotificationListener applicationEventListener = config.
55-
eventNotificationListener(handlerResolver, messageConverter, listener, discardNotifier, customErrorReporter);
55+
eventNotificationListener(handlerResolver, messageConverter, listener, discardNotifier, customReporter);
5656
Assertions.assertThat(applicationEventListener).isNotNull();
5757
}
5858
}

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/QueryListenerConfigTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.reactivecommons.async.impl.communications.TopologyCreator;
1212
import org.reactivecommons.async.impl.config.props.AsyncProps;
1313
import org.reactivecommons.async.impl.converters.MessageConverter;
14-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
14+
import org.reactivecommons.async.impl.ext.CustomReporter;
1515
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
1616
import reactor.core.publisher.Flux;
1717
import reactor.core.publisher.Mono;
@@ -32,7 +32,7 @@ public class QueryListenerConfigTest {
3232
private final HandlerResolver handlerResolver = mock(HandlerResolver.class);
3333
private final MessageConverter messageConverter = mock(MessageConverter.class);
3434
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
35-
private final CustomErrorReporter customErrorReporter = mock(CustomErrorReporter.class);
35+
private final CustomReporter customReporter = mock(CustomReporter.class);
3636
private final Receiver receiver = mock(Receiver.class);
3737
private final ReactiveMessageSender sender = mock(ReactiveMessageSender.class);
3838

@@ -53,7 +53,7 @@ public void init() {
5353

5454
@Test
5555
public void queryListener() {
56-
final ApplicationQueryListener queryListener = config.queryListener(messageConverter, handlerResolver, sender, listener, discardNotifier, customErrorReporter);
56+
final ApplicationQueryListener queryListener = config.queryListener(messageConverter, handlerResolver, sender, listener, discardNotifier, customReporter);
5757
Assertions.assertThat(queryListener).isNotNull();
5858
}
5959
}

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/RabbitMqConfigTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.reactivecommons.api.domain.DomainEvent;
88
import org.reactivecommons.async.api.AsyncQuery;
99
import org.reactivecommons.async.impl.communications.Message;
10-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
10+
import org.reactivecommons.async.impl.ext.CustomReporter;
1111
import reactor.core.publisher.Mono;
1212
import reactor.test.StepVerifier;
1313

@@ -46,18 +46,18 @@ public void retryInitialConnection() throws IOException, TimeoutException {
4646

4747
@Test
4848
public void shouldCreateDefaultErrorReporter() {
49-
final CustomErrorReporter errorReporter = config.reactiveCommonsCustomErrorReporter();
49+
final CustomReporter errorReporter = config.reactiveCommonsCustomErrorReporter();
5050
assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(Command.class), true)).isNotNull();
5151
assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(DomainEvent.class), true)).isNotNull();
5252
assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(AsyncQuery.class), true)).isNotNull();
5353
}
5454

5555
@Test
5656
public void shouldGenerateDefaultReeporter() {
57-
final CustomErrorReporter customErrorReporter = config.reactiveCommonsCustomErrorReporter();
58-
final Mono<Void> r1 = customErrorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(Command.class), true);
59-
final Mono<Void> r2 = customErrorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(DomainEvent.class), true);
60-
final Mono<Void> r3 = customErrorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(AsyncQuery.class), true);
57+
final CustomReporter customReporter = config.reactiveCommonsCustomErrorReporter();
58+
final Mono<Void> r1 = customReporter.reportError(mock(Throwable.class), mock(Message.class), mock(Command.class), true);
59+
final Mono<Void> r2 = customReporter.reportError(mock(Throwable.class), mock(Message.class), mock(DomainEvent.class), true);
60+
final Mono<Void> r3 = customReporter.reportError(mock(Throwable.class), mock(Message.class), mock(AsyncQuery.class), true);
6161

6262
assertThat(r1).isNotNull();
6363
assertThat(r2).isNotNull();

async/async-commons/src/main/java/org/reactivecommons/async/impl/ext/CustomErrorReporter.java renamed to async/async-commons/src/main/java/org/reactivecommons/async/impl/ext/CustomReporter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.reactivecommons.async.impl.communications.Message;
77
import reactor.core.publisher.Mono;
88

9-
public interface CustomErrorReporter {
9+
public interface CustomReporter {
1010

1111
String COMMAND_CLASS = "org.reactivecommons.api.domain.Command";
1212
String EVENT_CLASS = "org.reactivecommons.api.domain.DomainEvent";
@@ -25,6 +25,9 @@ default Mono<Void> reportError(Throwable ex, Message rawMessage, Object message,
2525
}
2626
}
2727

28+
default void reportMetric(String type, String handlerPath, Long duration, boolean success) {
29+
}
30+
2831
Mono<Void> reportError(Throwable ex, Message rawMessage, Command<?> message, boolean redelivered);
2932
Mono<Void> reportError(Throwable ex, Message rawMessage, DomainEvent<?> message, boolean redelivered);
3033
Mono<Void> reportError(Throwable ex, Message rawMessage, AsyncQuery<?> message, boolean redelivered);

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationCommandListener.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
import org.reactivecommons.async.impl.RabbitMessage;
1313
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1414
import org.reactivecommons.async.impl.communications.TopologyCreator;
15-
import org.reactivecommons.async.impl.ext.CustomErrorReporter;
15+
import org.reactivecommons.async.impl.ext.CustomReporter;
1616
import reactor.core.publisher.Mono;
1717
import reactor.rabbitmq.AcknowledgableDelivery;
1818
import reactor.rabbitmq.BindingSpecification;
1919
import reactor.rabbitmq.ExchangeSpecification;
20-
import reactor.rabbitmq.QueueSpecification;
2120

2221
import java.util.Optional;
2322
import java.util.function.Function;
@@ -33,7 +32,7 @@ public class ApplicationCommandListener extends GenericMessageListener {
3332
private final Optional<Integer> maxLengthBytes;
3433

3534
//TODO: change large constructor parameters number
36-
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomErrorReporter errorReporter) {
35+
public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
3736
super(queueName, listener, withDLQRetry, maxRetries, discardNotifier, "command", errorReporter);
3837
this.retryDelay = retryDelay;
3938
this.withDLQRetry = withDLQRetry;
@@ -44,16 +43,15 @@ public ApplicationCommandListener(ReactiveMessageListener listener, String queue
4443
}
4544

4645
protected Mono<Void> setUpBindings(TopologyCreator creator) {
46+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
4747
if (withDLQRetry) {
48-
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
4948
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(directExchange + ".DLQ").durable(true).type("direct"));
5049
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, directExchange + ".DLQ", maxLengthBytes);
5150
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, directExchange, retryDelay, maxLengthBytes);
5251
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
5352
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(directExchange + ".DLQ", queueName, queueName + ".DLQ"));
5453
return declareExchange.then(declareExchangeDLQ).then(declareDLQ).then(declareQueue).then(bindingDLQ).then(binding).then();
5554
} else {
56-
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(directExchange).durable(true).type("direct"));
5755
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, maxLengthBytes);
5856
final Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding(directExchange, queueName, queueName));
5957
return declareExchange.then(declareQueue).then(binding).then();

0 commit comments

Comments
 (0)