Skip to content

Commit c102f5a

Browse files
Priyanka K UJoel-hanson
authored andcommitted
fix: Handle BigDecimal data type
Contributes to: event-integration/eventstreams-planning#12841 Signed-off-by: Priyanka.K.U [email protected] Signed-off-by: Joel Hanson <[email protected]>
1 parent 54fd315 commit c102f5a

File tree

6 files changed

+73
-13
lines changed

6 files changed

+73
-13
lines changed

src/main/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/engines/StructToXmlBytes.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*/
1616
package com.ibm.eventstreams.kafkaconnect.plugins.xml.engines;
1717

18+
import java.math.BigDecimal;
1819
import java.util.Base64;
1920
import java.util.Collection;
2021
import java.util.Map;
2122

2223
import javax.xml.XMLConstants;
2324

25+
import org.apache.kafka.connect.data.Decimal;
2426
import org.apache.kafka.connect.data.Field;
2527
import org.apache.kafka.connect.data.Schema;
2628
import org.apache.kafka.connect.data.Schema.Type;
@@ -233,19 +235,21 @@ private void processStruct(Document doc, Element parentElement, Struct source) {
233235

234236
case BYTES:
235237
final Element bytesElement = doc.createElement(field.name());
236-
if (source.getBytes(field.name()) != null) {
237-
final byte[] bytes = source.getBytes(field.name());
238-
239-
String strRepresentation;
240-
if (bytes.length == 1 && "xs:byte".equals(fieldSchema.doc())) {
241-
strRepresentation = Byte.toString(bytes[0]);
242-
}
243-
else {
244-
strRepresentation = new String(Base64.getEncoder().encode(bytes));
245-
}
246-
247-
bytesElement.appendChild(doc.createTextNode(strRepresentation));
238+
final byte[] bytes ;
239+
String strRepresentation;
240+
241+
if (Decimal.LOGICAL_NAME.equals(fieldSchema.name())){
242+
strRepresentation = source.get(field.name()).toString();
243+
} else {
244+
bytes = source.getBytes(field.name());
245+
if (bytes.length == 1 && "xs:byte".equals(fieldSchema.doc())) {
246+
strRepresentation = Byte.toString(bytes[0]);
247+
} else {
248+
strRepresentation = Base64.getEncoder().encodeToString(bytes);
249+
}
248250
}
251+
bytesElement.appendChild(doc.createTextNode(strRepresentation));
252+
249253
parentElement.appendChild(bytesElement);
250254
break;
251255

src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/XmlConverterSourceTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public static Collection<Object[]> testCases() {
9292
// { "040", true, true }
9393
// { "043", true, true },
9494
{ "044", false, false },
95-
{ "049", false, false }
95+
{ "049", false, false },
96+
{ "055", false, false }
9697
});
9798
}
9899

src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/MapGenerators.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.ibm.eventstreams.kafkaconnect.plugins.xml.testutils;
1717

18+
import java.math.BigDecimal;
1819
import java.util.ArrayList;
1920
import java.util.Base64;
2021
import java.util.LinkedHashMap;
@@ -23,6 +24,8 @@
2324
import java.util.Map;
2425
import java.util.Set;
2526

27+
import org.apache.kafka.connect.data.Decimal;
28+
2629
public class MapGenerators {
2730

2831
public static Map<?, ?> get(String testCaseId) {
@@ -1028,6 +1031,13 @@ public class MapGenerators {
10281031
value.put("data", data);
10291032
break;
10301033
}
1034+
case "055":{
1035+
value.put("ProductID", 1);
1036+
value.put("ProductName", "TestProductName");
1037+
value.put("Price",new BigDecimal("10.000"));
1038+
break;
1039+
1040+
}
10311041
}
10321042
return value;
10331043
}

src/test/java/com/ibm/eventstreams/kafkaconnect/plugins/xml/testutils/StructGenerators.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*/
1616
package com.ibm.eventstreams.kafkaconnect.plugins.xml.testutils;
1717

18+
import java.math.BigDecimal;
1819
import java.util.ArrayList;
1920
import java.util.Collections;
2021
import java.util.LinkedHashMap;
2122
import java.util.List;
2223
import java.util.Map;
2324

25+
import org.apache.kafka.connect.data.Decimal;
2426
import org.apache.kafka.connect.data.Schema;
2527
import org.apache.kafka.connect.data.SchemaAndValue;
2628
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -1809,6 +1811,25 @@ public static SchemaAndValue get(String testCaseId) {
18091811
value.put("version", "7.0.1001.2");
18101812
value.put("data", data);
18111813

1814+
return new SchemaAndValue(schema, value);
1815+
}
1816+
case "055":{
1817+
1818+
final Schema decimalSchema = Decimal.builder(3).build();
1819+
final Schema schema = SchemaBuilder.struct()
1820+
.name("root")
1821+
.field("ProductID", Schema.INT32_SCHEMA)
1822+
.field("ProductName", Schema.STRING_SCHEMA)
1823+
.field("Price", decimalSchema)
1824+
.build();
1825+
final Struct value = new Struct(schema);
1826+
1827+
BigDecimal bigDecimal = new BigDecimal(10).setScale(3);
1828+
1829+
value.put("ProductID", 1);
1830+
value.put("ProductName", "TestProductName");
1831+
value.put("Price", bigDecimal);
1832+
18121833
return new SchemaAndValue(schema, value);
18131834
}
18141835
}

src/test/resources/055.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
2+
<root>
3+
<ProductID>1</ProductID>
4+
<ProductName>TestProductName</ProductName>
5+
<Price>10.000</Price>
6+
</root>

src/test/resources/055.xsd

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version = "1.0"?>
2+
<xs:schema xmlns:xs = "http://www.w3.org/2001/XMLSchema">
3+
<xs:element name="root">
4+
<xs:complexType>
5+
<xs:sequence>
6+
<xs:element type="xs:integer" name="ProductID"/>
7+
<xs:element type="xs:string" name="ProductName"/>
8+
<xs:element name="Price">
9+
<xs:simpleType>
10+
<xs:restriction base="xs:decimal">
11+
<xs:fractionDigits value="3"/>
12+
</xs:restriction>
13+
</xs:simpleType>
14+
</xs:element>
15+
</xs:sequence>
16+
</xs:complexType>
17+
</xs:element>
18+
</xs:schema>

0 commit comments

Comments
 (0)