Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1722][Projection Transform]Changes done for converting double to float. #1837

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void init(Schema inputSchema, FailureCollector collector) {
}
if (containAllFields) {
collector.addFailure(
"'Fields to drop' cannot contain all the fields of the input schema.", null)
"'Fields to drop' cannot contain all the fields of the input schema.", null)
.withConfigProperty(ProjectionTransformConfig.DROP);
}
}
Expand All @@ -230,8 +230,8 @@ private void init(Schema inputSchema, FailureCollector collector) {
for (String field : fieldsToKeep) {
if (inputSchema.getField(field) == null) {
collector.addFailure(
String.format("Field '%s' provided in 'Fields to keep' must be present in the input schema.",
field), null)
String.format("Field '%s' provided in 'Fields to keep' must be present in the input schema.",
field), null)
.withConfigElement(ProjectionTransformConfig.KEEP, field);
}
}
Expand All @@ -244,22 +244,22 @@ private void init(Schema inputSchema, FailureCollector collector) {
String val = keyVal.getValue();
if (inputSchema != null && inputSchema.getField(key) == null) {
collector.addFailure(
String.format("Field '%s' provided in 'Fields to rename' must be present in the input schema.", key),
null)
String.format("Field '%s' provided in 'Fields to rename' must be present in the input schema.", key),
null)
.withConfigElement(ProjectionTransformConfig.RENAME, String.format("%s:%s", key, val));
}
try {
String oldVal = fieldsToRename.put(key, val);
if (oldVal != null) {
collector.addFailure(
String.format("Cannot rename '%s' to both '%s' and '%s'.", key, oldVal, val), null)
String.format("Cannot rename '%s' to both '%s' and '%s'.", key, oldVal, val), null)
.withConfigElement(ProjectionTransformConfig.RENAME, String.format("%s:%s", key, oldVal))
.withConfigElement(ProjectionTransformConfig.RENAME, String.format("%s:%s", key, val));
}
} catch (IllegalArgumentException e) {
// purely so that we can give a more descriptive error message
collector.addFailure(
String.format("Cannot rename more than one field to '%s'.", val), null)
String.format("Cannot rename more than one field to '%s'.", val), null)
.withConfigProperty(ProjectionTransformConfig.RENAME);
}
}
Expand All @@ -271,21 +271,21 @@ private void init(Schema inputSchema, FailureCollector collector) {
String typeStr = keyVal.getValue();
if (inputSchema != null && inputSchema.getField(name) == null) {
collector.addFailure(
String.format(
"Field '%s' provided in 'Convert' is not present in the input schema.", name), null)
String.format(
"Field '%s' provided in 'Convert' is not present in the input schema.", name), null)
.withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", name, typeStr));
}
Schema.Type type = Schema.Type.valueOf(typeStr.toUpperCase());
if (!type.isSimpleType() || type == Schema.Type.NULL) {
collector.addFailure(
String.format(
"Cannot convert field '%s' to a '%s'.", name, typeStr),
"Only simple types are supported.")
String.format(
"Cannot convert field '%s' to a '%s'.", name, typeStr),
"Only simple types are supported.")
.withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", name, typeStr));
}
if (fieldsToConvert.containsKey(name)) {
collector.addFailure(
String.format("Cannot convert '%s' to multiple types.", name), null)
String.format("Cannot convert '%s' to multiple types.", name), null)
.withConfigProperty(ProjectionTransformConfig.CONVERT);
}
fieldsToConvert.put(name, type);
Expand Down Expand Up @@ -411,14 +411,16 @@ private Object convertPrimitive(String fieldName, Object val, Schema.Type inputT
return (int) Math.round(doubleVal);
case LONG:
return Math.round(doubleVal);
case FLOAT:
return getFloatFromDouble(doubleVal);
}
break;
}

String typeStr = outputType.toString().toLowerCase();
collector.addFailure(
String.format("Cannot convert field '%s' from type '%s' to type '%s'.", fieldName, inputType, outputType),
null)
String.format("Cannot convert field '%s' from type '%s' to type '%s'.", fieldName, inputType, outputType),
null)
.withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", fieldName, typeStr));
throw collector.getOrThrowException();
}
Expand Down Expand Up @@ -454,8 +456,8 @@ private Schema getOutputSchema(Schema inputSchema, FailureCollector collector) {
if (!inputFieldType.isSimpleType() || inputFieldType == Schema.Type.NULL) {
String typeStr = outputFieldSchema.getType().toString().toLowerCase();
collector.addFailure(
String.format(
"Field '%s' is of unconvertible type '%s'.", inputFieldName, inputFieldType), null)
String.format(
"Field '%s' is of unconvertible type '%s'.", inputFieldName, inputFieldType), null)
.withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", inputFieldName, typeStr));
collector.getOrThrowException();
}
Expand All @@ -473,4 +475,16 @@ private Schema getOutputSchema(Schema inputSchema, FailureCollector collector) {
schemaCache.put(inputSchema, output);
return output;
}

/**
* @param doubleValue
* @return It will return float value if it is within float range otherwise will return null.
*/
private Float getFloatFromDouble(Double doubleValue) {
boolean isNegative = doubleValue.doubleValue() < 0.0;
if ((isNegative && doubleValue >= -Float.MAX_VALUE) || (!isNegative && doubleValue <= Float.MAX_VALUE)) {
return doubleValue.floatValue();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ public void testConvertToFloat() throws Exception {
Assert.assertEquals("bar", output.get("stringField"));
}


@Test
public void testConvertToDouble() throws Exception {
ProjectionTransform.ProjectionTransformConfig config = new ProjectionTransform
Expand Down Expand Up @@ -715,4 +716,50 @@ public void testRenameFieldsValidations() {
Assert.assertEquals(expectedCause, e.getFailures().get(0).getCauses().get(0));
}
}

@Test
public void testConvertFloatToDouble() throws Exception {
ProjectionTransform.ProjectionTransformConfig config = new ProjectionTransform
.ProjectionTransformConfig(null, null,
"doubleField1:float,doubleField2:float,doubleField3:float,doubleField4:float," +
"doubleField5:float",
null);
Transform<StructuredRecord, StructuredRecord> transform = new ProjectionTransform(config);
TransformContext transformContext = new MockTransformContext();
transform.initialize(transformContext);

MockEmitter<StructuredRecord> emitter = new MockEmitter<>();
final Schema SCHEMA =
Schema.recordOf("record",
Schema.Field.of("doubleField1", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("doubleField2", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
Schema.Field.of("doubleField3", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("doubleField4", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
Schema.Field.of("doubleField5", Schema.of(Schema.Type.DOUBLE)));
final StructuredRecord SINGLE = StructuredRecord.builder(SCHEMA)
.set("doubleField1", 3.14)
.set("doubleField2", Double.MAX_VALUE)
.set("doubleField3", Double.MIN_VALUE)
.set("doubleField4", -Double.MAX_VALUE)
.set("doubleField5", -12.12d)
.build();
transform.transform(SINGLE, emitter);
StructuredRecord output = emitter.getEmitted().get(0);

Schema expectedSchema = Schema.recordOf("record.projected",
Schema.Field.of("doubleField1", Schema.of(Schema.Type.FLOAT)),
Schema.Field.of("doubleField2",
Schema.nullableOf(Schema.of(Schema.Type.FLOAT))),
Schema.Field.of("doubleField3", Schema.of(Schema.Type.FLOAT)),
Schema.Field.of("doubleField4",
Schema.nullableOf(Schema.of(Schema.Type.FLOAT))),
Schema.Field.of("doubleField5", Schema.of(Schema.Type.FLOAT)));

Assert.assertEquals(expectedSchema, output.getSchema());
Assert.assertTrue(Math.abs(3.14 - (Float) output.get("doubleField1")) < 0.000001);
Assert.assertNull(output.get("doubleField2"));
Assert.assertTrue(Math.abs(Double.MIN_VALUE - (Float) output.get("doubleField3")) < 0.000001);
Assert.assertNull(output.get("doubleField4"));
Assert.assertTrue(Math.abs(-12.12 - (Float) output.get("doubleField5")) < 0.000001);
}
}