Skip to content

Propagate file settings health info to the health node #127397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b0d718e
Initial testHealthIndicator that fails
prdoyle Apr 23, 2025
32a7f7d
Refactor: FileSettingsHealthInfo record
prdoyle Apr 23, 2025
458a9b3
Propagate file settings health indicator to health node
prdoyle Apr 25, 2025
9339c3a
ensureStableCluster
prdoyle Apr 25, 2025
936813b
Try to induce a failure from returning node-local info
prdoyle Apr 28, 2025
4bee8d8
Remove redundant node from client() call
prdoyle Apr 29, 2025
e8c2a42
Use local node ID in UpdateHealthInfoCacheAction.Request
prdoyle Apr 29, 2025
411489f
Move logger to top
prdoyle Apr 29, 2025
434933d
Test node-local health on master and health nodes
prdoyle Apr 29, 2025
209902d
Fix calculate to use the given info
prdoyle Apr 29, 2025
e2c05f7
mutateFileSettingsHealthInfo
prdoyle Apr 29, 2025
bd5fed8
Test status from local current info
prdoyle Apr 29, 2025
4f6102b
FileSettingsHealthTracker
prdoyle Apr 30, 2025
12d7244
Spruce up HealthInfoTests
prdoyle Apr 30, 2025
095f377
spotless
prdoyle Apr 30, 2025
484d85f
randomNonNegativeLong
prdoyle Apr 30, 2025
203aad0
Merge remote-tracking branch 'upstream/main' into health
prdoyle Apr 30, 2025
a91b807
Rename variable
prdoyle Apr 30, 2025
696235b
Address Niels' comments
prdoyle Apr 30, 2025
6598e1f
Test one- and two-node clusters
prdoyle Apr 30, 2025
23fc675
Merge remote-tracking branch 'origin/health' into health
prdoyle Apr 30, 2025
07b2d8b
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 30, 2025
ce015e7
Ensure there's a master node
prdoyle Apr 30, 2025
4e0978e
Merge branch 'main' into health
prdoyle Apr 30, 2025
debbf90
setBootstrapMasterNodeIndex
prdoyle Apr 30, 2025
98b7506
Merge branch 'main' into health
prdoyle May 5, 2025
e2cfbd7
Merge branch 'main' into health
prdoyle May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.health.node.ProjectIndexName;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -167,6 +168,6 @@ public void testMultiProject() {
}

private HealthInfo constructHealthInfo(DataStreamLifecycleHealthInfo dslHealthInfo) {
return new HealthInfo(Map.of(), dslHealthInfo, Map.of());
return new HealthInfo(Map.of(), dslHealthInfo, Map.of(), FileSettingsHealthInfo.INDETERMINATE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;

Expand Down Expand Up @@ -144,7 +145,16 @@ public void clusterChanged(ClusterChangedEvent event) {
states.add(
new RoutingNodesAndHealth(
event.state().getRoutingNodes(),
service.calculate(false, 1, new HealthInfo(Map.of(), DataStreamLifecycleHealthInfo.NO_DSL_ERRORS, Map.of()))
service.calculate(
false,
1,
new HealthInfo(
Map.of(),
DataStreamLifecycleHealthInfo.NO_DSL_ERRORS,
Map.of(),
FileSettingsHealthInfo.INDETERMINATE
)
)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.health.GetHealthAction;
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;
Expand All @@ -37,9 +39,12 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static org.elasticsearch.health.HealthStatus.YELLOW;
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.test.NodeRoles.dataNode;
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
import static org.elasticsearch.test.NodeRoles.masterNode;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -498,6 +503,82 @@ public void testSettingsAppliedOnMasterReElection() throws Exception {
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
}

public void testHealthIndicatorWithSingleNode() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start the node");
String nodeName = internalCluster().startNode();
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, nodeName);
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));

ensureStableCluster(1);

testHealthIndicatorOnError(nodeName, nodeName);
}

public void testHealthIndicatorWithSeparateHealthNode() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start a data node to act as the health node");
String healthNode = internalCluster().startNode(
Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")
);

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode(
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
);
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));

ensureStableCluster(2);

testHealthIndicatorOnError(masterNode, healthNode);
}

/**
* {@code masterNode} and {@code healthNode} can be the same node.
*/
private void testHealthIndicatorOnError(String masterNode, String healthNode) throws Exception {
logger.info("--> ensure all is well before the error");
assertBusy(() -> {
FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute(
FetchHealthInfoCacheAction.INSTANCE,
new FetchHealthInfoCacheAction.Request()
).get();
assertEquals(0, healthNodeResponse.getHealthInfo().fileSettingsHealthInfo().failureStreak());
});

logger.info("--> induce an error and wait for it to be processed");
var savedClusterState = setupClusterStateListenerForError(masterNode);
writeJSONFile(masterNode, testErrorJSON, logger, versionCounter.incrementAndGet());
boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);
assertTrue(awaitSuccessful);

logger.info("--> ensure the health node also reports it");
assertBusy(() -> {
FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute(
FetchHealthInfoCacheAction.INSTANCE,
new FetchHealthInfoCacheAction.Request()
).get();
assertEquals(
"Cached info on health node should report one failure",
1,
healthNodeResponse.getHealthInfo().fileSettingsHealthInfo().failureStreak()
);

for (var node : Stream.of(masterNode, healthNode).distinct().toList()) {
GetHealthAction.Response getHealthResponse = client(node).execute(
GetHealthAction.INSTANCE,
new GetHealthAction.Request(false, 123)
).get();
assertEquals(
"Health should be yellow on node " + node,
YELLOW,
getHealthResponse.findIndicator(FileSettingsService.FileSettingsHealthIndicatorService.NAME).status()
);
}
});
}

private void assertHasErrors(AtomicLong waitForMetadataVersion, String expectedError) {
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
assertThat(errorMetadata, is(notNullValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_SAGEMAKER = def(9_069_0_00);
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
26 changes: 21 additions & 5 deletions server/src/main/java/org/elasticsearch/health/node/HealthInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,46 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;

import java.io.IOException;
import java.util.Map;

import static java.util.Objects.requireNonNull;
import static org.elasticsearch.health.node.DataStreamLifecycleHealthInfo.NO_DSL_ERRORS;
import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;

/**
* This class wraps all the data returned by the health node.
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
* @param dslHealthInfo The data stream lifecycle health information
*
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
* @param dslHealthInfo The data stream lifecycle health information
* @param repositoriesInfoByNode A Map of node id to RepositoriesHealthInfo for that node
* @param fileSettingsHealthInfo The file-based settings health information
*/
public record HealthInfo(
Map<String, DiskHealthInfo> diskInfoByNode,
@Nullable DataStreamLifecycleHealthInfo dslHealthInfo,
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode,
FileSettingsHealthInfo fileSettingsHealthInfo
) implements Writeable {

public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of());
public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of(), INDETERMINATE);

public HealthInfo {
requireNonNull(fileSettingsHealthInfo);
}

public HealthInfo(StreamInput input) throws IOException {
this(
input.readMap(DiskHealthInfo::new),
input.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)
? input.readOptionalWriteable(DataStreamLifecycleHealthInfo::new)
: null,
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of()
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of(),
input.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)
? input.readOptionalWriteable(FileSettingsHealthInfo::new)
: INDETERMINATE
);
}

Expand All @@ -53,5 +66,8 @@ public void writeTo(StreamOutput output) throws IOException {
if (output.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
output.writeMap(repositoriesInfoByNode, StreamOutput::writeWriteable);
}
if (output.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)) {
output.writeOptionalWriteable(fileSettingsHealthInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.reservedstate.service.FileSettingsService;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;

/**
* Keeps track of several health statuses per node that can be used in health.
*/
Expand All @@ -31,6 +34,7 @@ public class HealthInfoCache implements ClusterStateListener {
@Nullable
private volatile DataStreamLifecycleHealthInfo dslHealthInfo = null;
private volatile ConcurrentHashMap<String, RepositoriesHealthInfo> repositoriesInfoByNode = new ConcurrentHashMap<>();
private volatile FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo = INDETERMINATE;

private HealthInfoCache() {}

Expand All @@ -44,7 +48,8 @@ public void updateNodeHealth(
String nodeId,
@Nullable DiskHealthInfo diskHealthInfo,
@Nullable DataStreamLifecycleHealthInfo latestDslHealthInfo,
@Nullable RepositoriesHealthInfo repositoriesHealthInfo
@Nullable RepositoriesHealthInfo repositoriesHealthInfo,
@Nullable FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo
) {
if (diskHealthInfo != null) {
diskInfoByNode.put(nodeId, diskHealthInfo);
Expand All @@ -55,6 +60,9 @@ public void updateNodeHealth(
if (repositoriesHealthInfo != null) {
repositoriesInfoByNode.put(nodeId, repositoriesHealthInfo);
}
if (fileSettingsHealthInfo != null) {
this.fileSettingsHealthInfo = fileSettingsHealthInfo;
}
}

@Override
Expand All @@ -77,6 +85,7 @@ public void clusterChanged(ClusterChangedEvent event) {
diskInfoByNode = new ConcurrentHashMap<>();
dslHealthInfo = null;
repositoriesInfoByNode = new ConcurrentHashMap<>();
fileSettingsHealthInfo = INDETERMINATE;
}
}

Expand All @@ -86,6 +95,6 @@ public void clusterChanged(ClusterChangedEvent event) {
*/
public HealthInfo getHealthInfo() {
// A shallow copy is enough because the inner data is immutable.
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode));
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode), fileSettingsHealthInfo);
}
}
Loading