diff --git a/core-plugins/src/main/java/io/cdap/plugin/transform/ProjectionTransform.java b/core-plugins/src/main/java/io/cdap/plugin/transform/ProjectionTransform.java index 386105e82..844799130 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/transform/ProjectionTransform.java +++ b/core-plugins/src/main/java/io/cdap/plugin/transform/ProjectionTransform.java @@ -220,7 +220,7 @@ private void init(Schema inputSchema, FailureCollector collector) { } if (containAllFields) { collector.addFailure( - "'Fields to drop' cannot contain all the fields of the input schema.", null) + "'Fields to drop' cannot contain all the fields of the input schema.", null) .withConfigProperty(ProjectionTransformConfig.DROP); } } @@ -230,8 +230,8 @@ private void init(Schema inputSchema, FailureCollector collector) { for (String field : fieldsToKeep) { if (inputSchema.getField(field) == null) { collector.addFailure( - String.format("Field '%s' provided in 'Fields to keep' must be present in the input schema.", - field), null) + String.format("Field '%s' provided in 'Fields to keep' must be present in the input schema.", + field), null) .withConfigElement(ProjectionTransformConfig.KEEP, field); } } @@ -244,22 +244,22 @@ private void init(Schema inputSchema, FailureCollector collector) { String val = keyVal.getValue(); if (inputSchema != null && inputSchema.getField(key) == null) { collector.addFailure( - String.format("Field '%s' provided in 'Fields to rename' must be present in the input schema.", key), - null) + String.format("Field '%s' provided in 'Fields to rename' must be present in the input schema.", key), + null) .withConfigElement(ProjectionTransformConfig.RENAME, String.format("%s:%s", key, val)); } try { String oldVal = fieldsToRename.put(key, val); if (oldVal != null) { collector.addFailure( - String.format("Cannot rename '%s' to both '%s' and '%s'.", key, oldVal, val), null) + String.format("Cannot rename '%s' to both '%s' and '%s'.", key, oldVal, val), null) .withConfigElement(ProjectionTransformConfig.RENAME, String.format("%s:%s", key, oldVal)) .withConfigElement(ProjectionTransformConfig.RENAME, String.format("%s:%s", key, val)); } } catch (IllegalArgumentException e) { // purely so that we can give a more descriptive error message collector.addFailure( - String.format("Cannot rename more than one field to '%s'.", val), null) + String.format("Cannot rename more than one field to '%s'.", val), null) .withConfigProperty(ProjectionTransformConfig.RENAME); } } @@ -271,21 +271,21 @@ private void init(Schema inputSchema, FailureCollector collector) { String typeStr = keyVal.getValue(); if (inputSchema != null && inputSchema.getField(name) == null) { collector.addFailure( - String.format( - "Field '%s' provided in 'Convert' is not present in the input schema.", name), null) + String.format( + "Field '%s' provided in 'Convert' is not present in the input schema.", name), null) .withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", name, typeStr)); } Schema.Type type = Schema.Type.valueOf(typeStr.toUpperCase()); if (!type.isSimpleType() || type == Schema.Type.NULL) { collector.addFailure( - String.format( - "Cannot convert field '%s' to a '%s'.", name, typeStr), - "Only simple types are supported.") + String.format( + "Cannot convert field '%s' to a '%s'.", name, typeStr), + "Only simple types are supported.") .withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", name, typeStr)); } if (fieldsToConvert.containsKey(name)) { collector.addFailure( - String.format("Cannot convert '%s' to multiple types.", name), null) + String.format("Cannot convert '%s' to multiple types.", name), null) .withConfigProperty(ProjectionTransformConfig.CONVERT); } fieldsToConvert.put(name, type); @@ -411,14 +411,16 @@ private Object convertPrimitive(String fieldName, Object val, Schema.Type inputT return (int) Math.round(doubleVal); case LONG: return Math.round(doubleVal); + case FLOAT: + return getFloatFromDouble(doubleVal); } break; } String typeStr = outputType.toString().toLowerCase(); collector.addFailure( - String.format("Cannot convert field '%s' from type '%s' to type '%s'.", fieldName, inputType, outputType), - null) + String.format("Cannot convert field '%s' from type '%s' to type '%s'.", fieldName, inputType, outputType), + null) .withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", fieldName, typeStr)); throw collector.getOrThrowException(); } @@ -454,8 +456,8 @@ private Schema getOutputSchema(Schema inputSchema, FailureCollector collector) { if (!inputFieldType.isSimpleType() || inputFieldType == Schema.Type.NULL) { String typeStr = outputFieldSchema.getType().toString().toLowerCase(); collector.addFailure( - String.format( - "Field '%s' is of unconvertible type '%s'.", inputFieldName, inputFieldType), null) + String.format( + "Field '%s' is of unconvertible type '%s'.", inputFieldName, inputFieldType), null) .withConfigElement(ProjectionTransformConfig.CONVERT, String.format("%s:%s", inputFieldName, typeStr)); collector.getOrThrowException(); } @@ -473,4 +475,16 @@ private Schema getOutputSchema(Schema inputSchema, FailureCollector collector) { schemaCache.put(inputSchema, output); return output; } + + /** + * @param doubleValue + * @return It will return float value if it is within float range otherwise will return null. + */ + private Float getFloatFromDouble(Double doubleValue) { + boolean isNegative = doubleValue.doubleValue() < 0.0; + if ((isNegative && doubleValue >= -Float.MAX_VALUE) || (!isNegative && doubleValue <= Float.MAX_VALUE)) { + return doubleValue.floatValue(); + } + return null; + } } diff --git a/core-plugins/src/test/java/io/cdap/plugin/transform/ProjectionTransformTest.java b/core-plugins/src/test/java/io/cdap/plugin/transform/ProjectionTransformTest.java index 02fd3b722..657acc63c 100644 --- a/core-plugins/src/test/java/io/cdap/plugin/transform/ProjectionTransformTest.java +++ b/core-plugins/src/test/java/io/cdap/plugin/transform/ProjectionTransformTest.java @@ -568,6 +568,7 @@ public void testConvertToFloat() throws Exception { Assert.assertEquals("bar", output.get("stringField")); } + @Test public void testConvertToDouble() throws Exception { ProjectionTransform.ProjectionTransformConfig config = new ProjectionTransform @@ -715,4 +716,50 @@ public void testRenameFieldsValidations() { Assert.assertEquals(expectedCause, e.getFailures().get(0).getCauses().get(0)); } } + + @Test + public void testConvertFloatToDouble() throws Exception { + ProjectionTransform.ProjectionTransformConfig config = new ProjectionTransform + .ProjectionTransformConfig(null, null, + "doubleField1:float,doubleField2:float,doubleField3:float,doubleField4:float," + + "doubleField5:float", + null); + Transform transform = new ProjectionTransform(config); + TransformContext transformContext = new MockTransformContext(); + transform.initialize(transformContext); + + MockEmitter emitter = new MockEmitter<>(); + final Schema SCHEMA = + Schema.recordOf("record", + Schema.Field.of("doubleField1", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("doubleField2", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))), + Schema.Field.of("doubleField3", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("doubleField4", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))), + Schema.Field.of("doubleField5", Schema.of(Schema.Type.DOUBLE))); + final StructuredRecord SINGLE = StructuredRecord.builder(SCHEMA) + .set("doubleField1", 3.14) + .set("doubleField2", Double.MAX_VALUE) + .set("doubleField3", Double.MIN_VALUE) + .set("doubleField4", -Double.MAX_VALUE) + .set("doubleField5", -12.12d) + .build(); + transform.transform(SINGLE, emitter); + StructuredRecord output = emitter.getEmitted().get(0); + + Schema expectedSchema = Schema.recordOf("record.projected", + Schema.Field.of("doubleField1", Schema.of(Schema.Type.FLOAT)), + Schema.Field.of("doubleField2", + Schema.nullableOf(Schema.of(Schema.Type.FLOAT))), + Schema.Field.of("doubleField3", Schema.of(Schema.Type.FLOAT)), + Schema.Field.of("doubleField4", + Schema.nullableOf(Schema.of(Schema.Type.FLOAT))), + Schema.Field.of("doubleField5", Schema.of(Schema.Type.FLOAT))); + + Assert.assertEquals(expectedSchema, output.getSchema()); + Assert.assertTrue(Math.abs(3.14 - (Float) output.get("doubleField1")) < 0.000001); + Assert.assertNull(output.get("doubleField2")); + Assert.assertTrue(Math.abs(Double.MIN_VALUE - (Float) output.get("doubleField3")) < 0.000001); + Assert.assertNull(output.get("doubleField4")); + Assert.assertTrue(Math.abs(-12.12 - (Float) output.get("doubleField5")) < 0.000001); + } }