From ca7aaff04e208838cfc4ce2b244cb1a83c1387f0 Mon Sep 17 00:00:00 2001 From: Alexander von Trostorff Date: Tue, 4 Mar 2025 13:19:37 +0100 Subject: [PATCH 1/2] SchemaRegistrySerde: Avro deserialization via topic name Avro deserialization without magic byte using lookup by topic name as fallback like in serialization Closes #4520 --- .../builtin/sr/SchemaRegistrySerde.java | 24 +++++++++----- .../builtin/sr/SchemaRegistrySerdeTest.java | 33 +++++++++++++++++++ 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java index 4ef0bbe5dd4..69841636161 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -279,7 +279,7 @@ public Serializer serializer(String topic, Target type) { @Override public Deserializer deserializer(String topic, Target type) { return (headers, data) -> { - var schemaId = extractSchemaIdFromMsg(data); + int schemaId = getSchemaIdFromMessageOrTopic(data, topic, type); SchemaType format = getMessageFormatBySchemaId(schemaId); MessageFormatter formatter = schemaRegistryFormatters.get(format); return new DeserializeResult( @@ -293,6 +293,18 @@ public Deserializer deserializer(String topic, Target type) { }; } + private int getSchemaIdFromMessageOrTopic(byte[] data, String topic, Target type) { + return extractSchemaIdFromMsg(data).orElseGet( + () -> { + String subject = schemaSubject(topic, type); + return getSchemaBySubject(subject) + .map(SchemaMetadata::getId) + .orElseThrow(() -> new ValidationException( + String.format("No schema for subject '%s' found and no magic byte in avro data", subject))); + } + ); + } + private SchemaType getMessageFormatBySchemaId(int schemaId) { return getSchemaById(schemaId) .map(ParsedSchema::schemaType) @@ -300,15 +312,11 @@ private SchemaType getMessageFormatBySchemaId(int schemaId) { .orElseThrow(() -> new ValidationException(String.format("Schema for id '%d' not found ", schemaId))); } - private int extractSchemaIdFromMsg(byte[] data) { + private Optional extractSchemaIdFromMsg(byte[] data) { ByteBuffer buffer = ByteBuffer.wrap(data); if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) { - return buffer.getInt(); + return Optional.of(buffer.getInt()); } - throw new ValidationException( - String.format( - "Data doesn't contain magic byte and schema id prefix, so it can't be deserialized with %s serde", - name()) - ); + return Optional.empty(); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index b70450cea5c..5a9c304db7b 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -130,6 +130,39 @@ void deserializeReturnsJsonAvroMsgJsonRepresentation() throws RestClientExceptio .contains(Map.entry("schemaId", schemaId)); } + @Test + void deserializeReturnsJsonAvroMsgJsonRepresentationViaTopicNameOnly() throws RestClientException, IOException { + AvroSchema schema = new AvroSchema( + "{" + + " \"type\": \"record\"," + + " \"name\": \"TestAvroRecord1\"," + + " \"fields\": [" + + " {" + + " \"name\": \"field1\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"name\": \"field2\"," + + " \"type\": \"int\"" + + " }" + + " ]" + + "}" + ); + String jsonValue = "{ \"field1\":\"testStr\", \"field2\": 123 }"; + + String topic = "test"; + int schemaId = registryClient.register(topic + "-value", schema); + + byte[] data = jsonToAvro(jsonValue, schema); // No magic byte no schema id registered + var result = serde.deserializer(topic, Serde.Target.VALUE).deserialize(null, data); + + assertJsonsEqual(jsonValue, result.getResult()); + assertThat(result.getType()).isEqualTo(DeserializeResult.Type.JSON); + assertThat(result.getAdditionalProperties()) + .contains(Map.entry("type", "AVRO")) + .contains(Map.entry("schemaId", schemaId)); + } + @Nested class SerdeWithDisabledSubjectExistenceCheck { From 9f209cbb646fa3e4f35215c9a92aedb0c61a03cf Mon Sep 17 00:00:00 2001 From: Alexander von Trostorff Date: Tue, 4 Mar 2025 14:01:26 +0100 Subject: [PATCH 2/2] Fix pipeline --- .github/workflows/e2e-checks.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/e2e-checks.yaml b/.github/workflows/e2e-checks.yaml index e62cd724a8f..24903e804d0 100644 --- a/.github/workflows/e2e-checks.yaml +++ b/.github/workflows/e2e-checks.yaml @@ -46,8 +46,8 @@ jobs: id: compose_app # use the following command until #819 will be fixed run: | - docker-compose -f kafka-ui-e2e-checks/docker/selenoid-git.yaml up -d - docker-compose -f ./documentation/compose/e2e-tests.yaml up -d && until [ "$(docker exec kafka-ui wget --spider --server-response http://localhost:8080/actuator/health 2>&1 | grep -c 'HTTP/1.1 200 OK')" == "1" ]; do echo "Waiting for kafka-ui ..." && sleep 1; done + docker compose -f kafka-ui-e2e-checks/docker/selenoid-git.yaml up -d + docker compose -f ./documentation/compose/e2e-tests.yaml up -d && until [ "$(docker exec kafka-ui wget --spider --server-response http://localhost:8080/actuator/health 2>&1 | grep -c 'HTTP/1.1 200 OK')" == "1" ]; do echo "Waiting for kafka-ui ..." && sleep 1; done - name: Run test suite run: | ./mvnw -B -ntp versions:set -DnewVersion=${{ github.event.pull_request.head.sha }}