diff --git a/src/main/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/engines/StructToXmlBytes.java b/src/main/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/engines/StructToXmlBytes.java index 2efc6b7..cb5853a 100644 --- a/src/main/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/engines/StructToXmlBytes.java +++ b/src/main/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/engines/StructToXmlBytes.java @@ -15,12 +15,14 @@ */ package com.ibm.eventstreams.kafkaconnect.plugins.xml.engines; +import java.math.BigDecimal; import java.util.Base64; import java.util.Collection; import java.util.Map; import javax.xml.XMLConstants; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; @@ -233,19 +235,21 @@ private void processStruct(Document doc, Element parentElement, Struct source) { case BYTES: final Element bytesElement = doc.createElement(field.name()); - if (source.getBytes(field.name()) != null) { - final byte[] bytes = source.getBytes(field.name()); - - String strRepresentation; - if (bytes.length == 1 && "xs:byte".equals(fieldSchema.doc())) { - strRepresentation = Byte.toString(bytes[0]); - } - else { - strRepresentation = new String(Base64.getEncoder().encode(bytes)); - } - - bytesElement.appendChild(doc.createTextNode(strRepresentation)); + final byte[] bytes ; + String strRepresentation; + + if (Decimal.LOGICAL_NAME.equals(fieldSchema.name())){ + strRepresentation = source.get(field.name()).toString(); + } else { + bytes = source.getBytes(field.name()); + if (bytes.length == 1 && "xs:byte".equals(fieldSchema.doc())) { + strRepresentation = Byte.toString(bytes[0]); + } else { + strRepresentation = Base64.getEncoder().encodeToString(bytes); + } } + bytesElement.appendChild(doc.createTextNode(strRepresentation)); + parentElement.appendChild(bytesElement); break; diff --git a/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/XmlConverterSourceTest.java b/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/XmlConverterSourceTest.java index a7bc0ec..a866ab7 100644 --- a/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/XmlConverterSourceTest.java +++ b/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/XmlConverterSourceTest.java @@ -92,7 +92,8 @@ public static Collection testCases() { // { "040", true, true } // { "043", true, true }, { "044", false, false }, - { "049", false, false } + { "049", false, false }, + { "055", false, false } }); } diff --git a/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/MapGenerators.java b/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/MapGenerators.java index 95838d0..efb20b9 100644 --- a/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/MapGenerators.java +++ b/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/MapGenerators.java @@ -15,6 +15,7 @@ */ package com.ibm.eventstreams.kafkaconnect.plugins.xml.testutils; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Base64; import java.util.LinkedHashMap; @@ -23,6 +24,8 @@ import java.util.Map; import java.util.Set; +import org.apache.kafka.connect.data.Decimal; + public class MapGenerators { public static Map get(String testCaseId) { @@ -1028,6 +1031,13 @@ public class MapGenerators { value.put("data", data); break; } + case "055":{ + value.put("ProductID", 1); + value.put("ProductName", "TestProductName"); + value.put("Price",new BigDecimal("10.000")); + break; + + } } return value; } diff --git a/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/StructGenerators.java b/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/StructGenerators.java index b849cc4..f7c2f28 100644 --- a/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/StructGenerators.java +++ b/src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/StructGenerators.java @@ -15,12 +15,14 @@ */ package com.ibm.eventstreams.kafkaconnect.plugins.xml.testutils; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; @@ -1809,6 +1811,25 @@ public static SchemaAndValue get(String testCaseId) { value.put("version", "7.0.1001.2"); value.put("data", data); + return new SchemaAndValue(schema, value); + } + case "055":{ + + final Schema decimalSchema = Decimal.builder(3).build(); + final Schema schema = SchemaBuilder.struct() + .name("root") + .field("ProductID", Schema.INT32_SCHEMA) + .field("ProductName", Schema.STRING_SCHEMA) + .field("Price", decimalSchema) + .build(); + final Struct value = new Struct(schema); + + BigDecimal bigDecimal = new BigDecimal(10).setScale(3); + + value.put("ProductID", 1); + value.put("ProductName", "TestProductName"); + value.put("Price", bigDecimal); + return new SchemaAndValue(schema, value); } } diff --git a/src/test/resources/055.xml b/src/test/resources/055.xml new file mode 100644 index 0000000..fe4ea94 --- /dev/null +++ b/src/test/resources/055.xml @@ -0,0 +1,6 @@ + + + 1 + TestProductName + 10.000 + \ No newline at end of file diff --git a/src/test/resources/055.xsd b/src/test/resources/055.xsd new file mode 100644 index 0000000..0316edb --- /dev/null +++ b/src/test/resources/055.xsd @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file