Skip to content

Commit 5df5cb8

Browse files
prdoylenielsbaumanelasticsearchmachine
authored
Propagate file settings health info to the health node (#127397)
* Initial testHealthIndicator that fails * Refactor: FileSettingsHealthInfo record * Propagate file settings health indicator to health node * ensureStableCluster * Try to induce a failure from returning node-local info * Remove redundant node from client() call * Use local node ID in UpdateHealthInfoCacheAction.Request * Move logger to top * Test node-local health on master and health nodes * Fix calculate to use the given info * mutateFileSettingsHealthInfo * Test status from local current info * FileSettingsHealthTracker * Spruce up HealthInfoTests * spotless * randomNonNegativeLong * Rename variable Co-authored-by: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> * Address Niels' comments * Test one- and two-node clusters * [CI] Auto commit changes from spotless * Ensure there's a master node Co-authored-by: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> * setBootstrapMasterNodeIndex --------- Co-authored-by: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
1 parent 23ab059 commit 5df5cb8

20 files changed

+564
-187
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorServiceTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.health.node.DslErrorInfo;
2222
import org.elasticsearch.health.node.HealthInfo;
2323
import org.elasticsearch.health.node.ProjectIndexName;
24+
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
2425
import org.elasticsearch.test.ESTestCase;
2526
import org.junit.Before;
2627

@@ -167,6 +168,6 @@ public void testMultiProject() {
167168
}
168169

169170
private HealthInfo constructHealthInfo(DataStreamLifecycleHealthInfo dslHealthInfo) {
170-
return new HealthInfo(Map.of(), dslHealthInfo, Map.of());
171+
return new HealthInfo(Map.of(), dslHealthInfo, Map.of(), FileSettingsHealthInfo.INDETERMINATE);
171172
}
172173
}

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceIT.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
2424
import org.elasticsearch.health.node.HealthInfo;
2525
import org.elasticsearch.indices.SystemIndices;
26+
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
2627
import org.elasticsearch.test.ESIntegTestCase;
2728
import org.hamcrest.Matcher;
2829

@@ -144,7 +145,16 @@ public void clusterChanged(ClusterChangedEvent event) {
144145
states.add(
145146
new RoutingNodesAndHealth(
146147
event.state().getRoutingNodes(),
147-
service.calculate(false, 1, new HealthInfo(Map.of(), DataStreamLifecycleHealthInfo.NO_DSL_ERRORS, Map.of()))
148+
service.calculate(
149+
false,
150+
1,
151+
new HealthInfo(
152+
Map.of(),
153+
DataStreamLifecycleHealthInfo.NO_DSL_ERRORS,
154+
Map.of(),
155+
FileSettingsHealthInfo.INDETERMINATE
156+
)
157+
)
148158
)
149159
);
150160
}

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java

+81
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.core.Strings;
2727
import org.elasticsearch.core.Tuple;
28+
import org.elasticsearch.health.GetHealthAction;
29+
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
2830
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
2931
import org.elasticsearch.test.ESIntegTestCase;
3032
import org.junit.Before;
@@ -37,9 +39,12 @@
3739
import java.util.concurrent.ExecutionException;
3840
import java.util.concurrent.TimeUnit;
3941
import java.util.concurrent.atomic.AtomicLong;
42+
import java.util.stream.Stream;
4043

44+
import static org.elasticsearch.health.HealthStatus.YELLOW;
4145
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
4246
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
47+
import static org.elasticsearch.test.NodeRoles.dataNode;
4348
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
4449
import static org.elasticsearch.test.NodeRoles.masterNode;
4550
import static org.hamcrest.Matchers.allOf;
@@ -498,6 +503,82 @@ public void testSettingsAppliedOnMasterReElection() throws Exception {
498503
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
499504
}
500505

506+
public void testHealthIndicatorWithSingleNode() throws Exception {
507+
internalCluster().setBootstrapMasterNodeIndex(0);
508+
logger.info("--> start the node");
509+
String nodeName = internalCluster().startNode();
510+
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, nodeName);
511+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
512+
513+
ensureStableCluster(1);
514+
515+
testHealthIndicatorOnError(nodeName, nodeName);
516+
}
517+
518+
public void testHealthIndicatorWithSeparateHealthNode() throws Exception {
519+
internalCluster().setBootstrapMasterNodeIndex(0);
520+
logger.info("--> start a data node to act as the health node");
521+
String healthNode = internalCluster().startNode(
522+
Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")
523+
);
524+
525+
logger.info("--> start master node");
526+
final String masterNode = internalCluster().startMasterOnlyNode(
527+
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
528+
);
529+
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
530+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
531+
532+
ensureStableCluster(2);
533+
534+
testHealthIndicatorOnError(masterNode, healthNode);
535+
}
536+
537+
/**
538+
* {@code masterNode} and {@code healthNode} can be the same node.
539+
*/
540+
private void testHealthIndicatorOnError(String masterNode, String healthNode) throws Exception {
541+
logger.info("--> ensure all is well before the error");
542+
assertBusy(() -> {
543+
FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute(
544+
FetchHealthInfoCacheAction.INSTANCE,
545+
new FetchHealthInfoCacheAction.Request()
546+
).get();
547+
assertEquals(0, healthNodeResponse.getHealthInfo().fileSettingsHealthInfo().failureStreak());
548+
});
549+
550+
logger.info("--> induce an error and wait for it to be processed");
551+
var savedClusterState = setupClusterStateListenerForError(masterNode);
552+
writeJSONFile(masterNode, testErrorJSON, logger, versionCounter.incrementAndGet());
553+
boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);
554+
assertTrue(awaitSuccessful);
555+
556+
logger.info("--> ensure the health node also reports it");
557+
assertBusy(() -> {
558+
FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute(
559+
FetchHealthInfoCacheAction.INSTANCE,
560+
new FetchHealthInfoCacheAction.Request()
561+
).get();
562+
assertEquals(
563+
"Cached info on health node should report one failure",
564+
1,
565+
healthNodeResponse.getHealthInfo().fileSettingsHealthInfo().failureStreak()
566+
);
567+
568+
for (var node : Stream.of(masterNode, healthNode).distinct().toList()) {
569+
GetHealthAction.Response getHealthResponse = client(node).execute(
570+
GetHealthAction.INSTANCE,
571+
new GetHealthAction.Request(false, 123)
572+
).get();
573+
assertEquals(
574+
"Health should be yellow on node " + node,
575+
YELLOW,
576+
getHealthResponse.findIndicator(FileSettingsService.FileSettingsHealthIndicatorService.NAME).status()
577+
);
578+
}
579+
});
580+
}
581+
501582
private void assertHasErrors(AtomicLong waitForMetadataVersion, String expectedError) {
502583
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
503584
assertThat(errorMetadata, is(notNullValue()));

server/src/main/java/org/elasticsearch/TransportVersions.java

+1
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ static TransportVersion def(int id) {
241241
public static final TransportVersion ML_INFERENCE_SAGEMAKER = def(9_069_0_00);
242242
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
243243
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
244+
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);
244245

245246
/*
246247
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/health/node/HealthInfo.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,46 @@
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
1616
import org.elasticsearch.core.Nullable;
17+
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
1718

1819
import java.io.IOException;
1920
import java.util.Map;
2021

22+
import static java.util.Objects.requireNonNull;
2123
import static org.elasticsearch.health.node.DataStreamLifecycleHealthInfo.NO_DSL_ERRORS;
24+
import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;
2225

2326
/**
2427
* This class wraps all the data returned by the health node.
25-
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
26-
* @param dslHealthInfo The data stream lifecycle health information
28+
*
29+
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
30+
* @param dslHealthInfo The data stream lifecycle health information
2731
* @param repositoriesInfoByNode A Map of node id to RepositoriesHealthInfo for that node
32+
* @param fileSettingsHealthInfo The file-based settings health information
2833
*/
2934
public record HealthInfo(
3035
Map<String, DiskHealthInfo> diskInfoByNode,
3136
@Nullable DataStreamLifecycleHealthInfo dslHealthInfo,
32-
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode
37+
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode,
38+
FileSettingsHealthInfo fileSettingsHealthInfo
3339
) implements Writeable {
3440

35-
public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of());
41+
public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of(), INDETERMINATE);
42+
43+
public HealthInfo {
44+
requireNonNull(fileSettingsHealthInfo);
45+
}
3646

3747
public HealthInfo(StreamInput input) throws IOException {
3848
this(
3949
input.readMap(DiskHealthInfo::new),
4050
input.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)
4151
? input.readOptionalWriteable(DataStreamLifecycleHealthInfo::new)
4252
: null,
43-
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of()
53+
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of(),
54+
input.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)
55+
? input.readOptionalWriteable(FileSettingsHealthInfo::new)
56+
: INDETERMINATE
4457
);
4558
}
4659

@@ -53,5 +66,8 @@ public void writeTo(StreamOutput output) throws IOException {
5366
if (output.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
5467
output.writeMap(repositoriesInfoByNode, StreamOutput::writeWriteable);
5568
}
69+
if (output.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)) {
70+
output.writeOptionalWriteable(fileSettingsHealthInfo);
71+
}
5672
}
5773
}

server/src/main/java/org/elasticsearch/health/node/HealthInfoCache.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
import org.elasticsearch.cluster.service.ClusterService;
1818
import org.elasticsearch.core.Nullable;
1919
import org.elasticsearch.health.node.selection.HealthNode;
20+
import org.elasticsearch.reservedstate.service.FileSettingsService;
2021

2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324

25+
import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;
26+
2427
/**
2528
* Keeps track of several health statuses per node that can be used in health.
2629
*/
@@ -31,6 +34,7 @@ public class HealthInfoCache implements ClusterStateListener {
3134
@Nullable
3235
private volatile DataStreamLifecycleHealthInfo dslHealthInfo = null;
3336
private volatile ConcurrentHashMap<String, RepositoriesHealthInfo> repositoriesInfoByNode = new ConcurrentHashMap<>();
37+
private volatile FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo = INDETERMINATE;
3438

3539
private HealthInfoCache() {}
3640

@@ -44,7 +48,8 @@ public void updateNodeHealth(
4448
String nodeId,
4549
@Nullable DiskHealthInfo diskHealthInfo,
4650
@Nullable DataStreamLifecycleHealthInfo latestDslHealthInfo,
47-
@Nullable RepositoriesHealthInfo repositoriesHealthInfo
51+
@Nullable RepositoriesHealthInfo repositoriesHealthInfo,
52+
@Nullable FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo
4853
) {
4954
if (diskHealthInfo != null) {
5055
diskInfoByNode.put(nodeId, diskHealthInfo);
@@ -55,6 +60,9 @@ public void updateNodeHealth(
5560
if (repositoriesHealthInfo != null) {
5661
repositoriesInfoByNode.put(nodeId, repositoriesHealthInfo);
5762
}
63+
if (fileSettingsHealthInfo != null) {
64+
this.fileSettingsHealthInfo = fileSettingsHealthInfo;
65+
}
5866
}
5967

6068
@Override
@@ -77,6 +85,7 @@ public void clusterChanged(ClusterChangedEvent event) {
7785
diskInfoByNode = new ConcurrentHashMap<>();
7886
dslHealthInfo = null;
7987
repositoriesInfoByNode = new ConcurrentHashMap<>();
88+
fileSettingsHealthInfo = INDETERMINATE;
8089
}
8190
}
8291

@@ -86,6 +95,6 @@ public void clusterChanged(ClusterChangedEvent event) {
8695
*/
8796
public HealthInfo getHealthInfo() {
8897
// A shallow copy is enough because the inner data is immutable.
89-
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode));
98+
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode), fileSettingsHealthInfo);
9099
}
91100
}

0 commit comments

Comments
 (0)