|
| 1 | +package com.baeldung.apache.avro; |
| 2 | + |
| 3 | +import org.apache.avro.Conversion; |
| 4 | +import org.apache.avro.LogicalTypes; |
| 5 | +import org.apache.avro.Schema; |
| 6 | +import org.apache.avro.data.TimeConversions; |
| 7 | +import org.apache.avro.generic.GenericData; |
| 8 | +import org.apache.avro.generic.GenericDatumReader; |
| 9 | +import org.apache.avro.generic.GenericDatumWriter; |
| 10 | +import org.apache.avro.generic.GenericRecord; |
| 11 | +import org.apache.avro.io.DatumReader; |
| 12 | +import org.apache.avro.io.DatumWriter; |
| 13 | +import org.apache.avro.io.Decoder; |
| 14 | +import org.apache.avro.io.DecoderFactory; |
| 15 | +import org.apache.avro.io.Encoder; |
| 16 | +import org.apache.avro.io.EncoderFactory; |
| 17 | +import org.apache.commons.lang3.tuple.Pair; |
| 18 | +import org.junit.jupiter.api.Test; |
| 19 | + |
| 20 | +import java.io.ByteArrayOutputStream; |
| 21 | +import java.io.IOException; |
| 22 | +import java.time.Instant; |
| 23 | +import java.time.LocalDate; |
| 24 | +import java.time.ZoneId; |
| 25 | +import java.util.Date; |
| 26 | + |
| 27 | +import static org.junit.jupiter.api.Assertions.assertEquals; |
| 28 | + |
| 29 | +public class SerializeAndDeserializeDateUnitTest { |
| 30 | + |
| 31 | + @Test |
| 32 | + void whenSerializingDateWithLogicalType_thenDeserializesCorrectly() throws IOException { |
| 33 | + |
| 34 | + LocalDate expectedDate = LocalDate.now(); |
| 35 | + Instant expectedTimestamp = Instant.now(); |
| 36 | + |
| 37 | + byte[] serialized = serializeDateWithLogicalType(expectedDate, expectedTimestamp); |
| 38 | + Pair<LocalDate, Instant> deserialized = deserializeDateWithLogicalType(serialized); |
| 39 | + |
| 40 | + assertEquals(expectedDate, deserialized.getLeft()); |
| 41 | + |
| 42 | + // This is perfectly valid when using logical types |
| 43 | + assertEquals(expectedTimestamp.toEpochMilli(), deserialized.getRight().toEpochMilli(), |
| 44 | + "Timestamps should match exactly at millisecond precision"); |
| 45 | + } |
| 46 | + |
| 47 | + @Test |
| 48 | + void whenSerializingWithConversionApi_thenDeserializesCorrectly() throws IOException { |
| 49 | + |
| 50 | + LocalDate expectedDate = LocalDate.now(); |
| 51 | + Instant expectedTimestamp = Instant.now(); |
| 52 | + |
| 53 | + byte[] serialized = serializeWithConversionApi(expectedDate, expectedTimestamp); |
| 54 | + Pair<LocalDate, Instant> deserialized = deserializeWithConversionApi(serialized); |
| 55 | + |
| 56 | + assertEquals(expectedDate, deserialized.getLeft()); |
| 57 | + assertEquals(expectedTimestamp.toEpochMilli(), deserialized.getRight().toEpochMilli(), |
| 58 | + "Timestamps should match at millisecond precision"); |
| 59 | + } |
| 60 | + |
| 61 | + @Test |
| 62 | + void whenSerializingLegacyDate_thenConvertsCorrectly() throws IOException { |
| 63 | + |
| 64 | + Date legacyDate = new Date(); |
| 65 | + LocalDate expectedLocalDate = legacyDate.toInstant() |
| 66 | + .atZone(ZoneId.systemDefault()) |
| 67 | + .toLocalDate(); |
| 68 | + |
| 69 | + byte[] serialized = serializeLegacyDateAsModern(legacyDate); |
| 70 | + LocalDate deserialized = deserializeDateWithLogicalType(serialized).getKey(); |
| 71 | + |
| 72 | + assertEquals(expectedLocalDate, deserialized); |
| 73 | + } |
| 74 | + |
| 75 | + public static Schema createDateSchema() { |
| 76 | + String schemaJson = |
| 77 | + "{" |
| 78 | + + "\"type\": \"record\"," |
| 79 | + + "\"name\": \"DateRecord\"," |
| 80 | + + "\"fields\": [" |
| 81 | + + " {\"name\": \"date\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}}," |
| 82 | + + " {\"name\": \"timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}" |
| 83 | + + "]" |
| 84 | + + "}"; |
| 85 | + return new Schema.Parser().parse(schemaJson); |
| 86 | + } |
| 87 | + |
| 88 | + public static byte[] serializeDateWithLogicalType(LocalDate date, Instant timestamp) throws IOException { |
| 89 | + Schema schema = createDateSchema(); |
| 90 | + GenericRecord record = new GenericData.Record(schema); |
| 91 | + |
| 92 | + // Convert LocalDate to days since epoch |
| 93 | + record.put("date", (int) date.toEpochDay()); |
| 94 | + |
| 95 | + // Convert Instant to milliseconds since epoch |
| 96 | + record.put("timestamp", timestamp.toEpochMilli()); |
| 97 | + |
| 98 | + ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| 99 | + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); |
| 100 | + Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null); |
| 101 | + |
| 102 | + datumWriter.write(record, encoder); |
| 103 | + encoder.flush(); |
| 104 | + |
| 105 | + return baos.toByteArray(); |
| 106 | + } |
| 107 | + |
| 108 | + public static Pair<LocalDate, Instant> deserializeDateWithLogicalType(byte[] bytes) throws IOException { |
| 109 | + Schema schema = createDateSchema(); |
| 110 | + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); |
| 111 | + Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); |
| 112 | + |
| 113 | + GenericRecord record = datumReader.read(null, decoder); |
| 114 | + |
| 115 | + // Convert days since epoch back to LocalDate |
| 116 | + LocalDate date = LocalDate.ofEpochDay((int) record.get("date")); |
| 117 | + |
| 118 | + // Convert milliseconds since epoch back to Instant |
| 119 | + Instant timestamp = Instant.ofEpochMilli((long) record.get("timestamp")); |
| 120 | + |
| 121 | + return Pair.of(date, timestamp); |
| 122 | + } |
| 123 | + |
| 124 | + public static byte[] serializeWithConversionApi(LocalDate date, Instant timestamp) throws IOException { |
| 125 | + Schema schema = createDateSchema(); |
| 126 | + GenericRecord record = new GenericData.Record(schema); |
| 127 | + |
| 128 | + // Use LogicalTypes.date() for conversion |
| 129 | + Conversion<LocalDate> dateConversion = new org.apache.avro.data.TimeConversions.DateConversion(); |
| 130 | + LogicalTypes.date().addToSchema(schema.getField("date").schema()); |
| 131 | + |
| 132 | + // Use LogicalTypes.timestampMillis() for conversion |
| 133 | + Conversion<Instant> timestampConversion = new org.apache.avro.data.TimeConversions.TimestampMillisConversion(); |
| 134 | + LogicalTypes.timestampMillis().addToSchema(schema.getField("timestamp").schema()); |
| 135 | + |
| 136 | + record.put("date", dateConversion.toInt(date, schema.getField("date").schema(), LogicalTypes.date())); |
| 137 | + record.put("timestamp", timestampConversion.toLong(timestamp, schema.getField("timestamp").schema(), LogicalTypes.timestampMillis())); |
| 138 | + |
| 139 | + // Serialize as before |
| 140 | + ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| 141 | + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); |
| 142 | + Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null); |
| 143 | + |
| 144 | + datumWriter.write(record, encoder); |
| 145 | + encoder.flush(); |
| 146 | + |
| 147 | + return baos.toByteArray(); |
| 148 | + } |
| 149 | + |
| 150 | + public static Pair<LocalDate, Instant> deserializeWithConversionApi(byte[] bytes) throws IOException { |
| 151 | + Schema schema = createDateSchema(); |
| 152 | + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); |
| 153 | + Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); |
| 154 | + |
| 155 | + GenericRecord record = datumReader.read(null, decoder); |
| 156 | + |
| 157 | + // Use LogicalTypes.date() for conversion |
| 158 | + Conversion<LocalDate> dateConversion = new TimeConversions.DateConversion(); |
| 159 | + LogicalTypes.date().addToSchema(schema.getField("date").schema()); |
| 160 | + |
| 161 | + // Use LogicalTypes.timestampMillis() for conversion |
| 162 | + Conversion<Instant> timestampConversion = new TimeConversions.TimestampMillisConversion(); |
| 163 | + LogicalTypes.timestampMillis().addToSchema(schema.getField("timestamp").schema()); |
| 164 | + |
| 165 | + // Get the primitive values from the record |
| 166 | + int daysSinceEpoch = (int) record.get("date"); |
| 167 | + long millisSinceEpoch = (long) record.get("timestamp"); |
| 168 | + |
| 169 | + // Convert back to Java types using the conversion API |
| 170 | + LocalDate date = dateConversion.fromInt( |
| 171 | + daysSinceEpoch, |
| 172 | + schema.getField("date").schema(), |
| 173 | + LogicalTypes.date() |
| 174 | + ); |
| 175 | + |
| 176 | + Instant timestamp = timestampConversion.fromLong( |
| 177 | + millisSinceEpoch, |
| 178 | + schema.getField("timestamp").schema(), |
| 179 | + LogicalTypes.timestampMillis() |
| 180 | + ); |
| 181 | + |
| 182 | + return Pair.of(date, timestamp); |
| 183 | + } |
| 184 | + |
| 185 | + public static byte[] serializeLegacyDateAsModern(Date legacyDate) throws IOException { |
| 186 | + // Convert java.util.Date to java.time.Instant |
| 187 | + Instant instant = legacyDate.toInstant(); |
| 188 | + |
| 189 | + // Convert to LocalDate if you need date-only information |
| 190 | + LocalDate localDate = instant.atZone(ZoneId.systemDefault()).toLocalDate(); |
| 191 | + |
| 192 | + // Then use one of our modern date serialization methods |
| 193 | + return serializeDateWithLogicalType(localDate, instant); |
| 194 | + } |
| 195 | +} |
0 commit comments