diff --git a/build.properties b/build.properties index 08749ba0577a..961a7490a9d5 100644 --- a/build.properties +++ b/build.properties @@ -16,7 +16,7 @@ Name=Hive name=hive -version=0.9.0-amplab-4 +version=0.9.0-amplab-5 year=2011 javac.debug=on diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java index db037b127e76..77035fd3d04f 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java @@ -200,7 +200,7 @@ public IntWritable evaluate(TimestampWritable i) { if (i == null) { return null; } else { - intWritable.set(i.getSeconds()); + intWritable.set((int) i.getSeconds()); return intWritable; } } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java index b7f1efd2b68f..b6461e969abb 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java @@ -376,7 +376,7 @@ static Object deserialize(InputByteBuffer buffer, TypeInfo type, case TIMESTAMP: TimestampWritable t = (reuse == null ? new TimestampWritable() : (TimestampWritable) reuse); - byte[] bytes = new byte[8]; + byte[] bytes = new byte[TimestampWritable.BINARY_SORTABLE_LENGTH]; for (int i = 0; i < bytes.length; i++) { bytes[i] = buffer.read(invert); diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java b/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java index a295ecd47495..aa7c7c419b04 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java @@ -25,7 +25,6 @@ import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,8 +57,17 @@ public class TimestampWritable implements WritableComparable static final public byte[] nullBytes = {0x0, 0x0, 0x0, 0x0}; - private static final int NO_DECIMAL_MASK = 0x7FFFFFFF; - private static final int HAS_DECIMAL_MASK = 0x80000000; + private static final int DECIMAL_OR_SECOND_VINT_FLAG = 0x80000000; + private static final int LOWEST_31_BITS_OF_SEC_MASK = 0x7fffffff; + + private static final long SEVEN_BYTE_LONG_SIGN_FLIP = 0xff80L << 48; + + private static final BigDecimal BILLION_BIG_DECIMAL = BigDecimal.valueOf(1000000000); + + /** The maximum number of bytes required for a TimestampWritable */ + public static final int MAX_BYTES = 13; + + public static final int BINARY_SORTABLE_LENGTH = 11; private static final ThreadLocal threadLocalDateFormat = new ThreadLocal() { @@ -81,16 +89,12 @@ protected synchronized DateFormat initialValue() { /* Allow use of external byte[] for efficiency */ private byte[] currentBytes; - private final byte[] internalBytes = new byte[9]; + private final byte[] internalBytes = new byte[MAX_BYTES]; private byte[] externalBytes; private int offset; - /* Reused to read VInts */ - static private final VInt vInt = new VInt(); - /* Constructors */ public TimestampWritable() { - Arrays.fill(internalBytes, (byte) 0x0); bytesEmpty = false; currentBytes = internalBytes; offset = 0; @@ -155,11 +159,14 @@ public void writeToByteStream(Output byteStream) { * * @return seconds corresponding to this TimestampWritable */ - public int getSeconds() { - if (bytesEmpty) { - return (int) (timestamp.getTime() / 1000); + public long getSeconds() { + if (!timestampEmpty) { + return millisToSeconds(timestamp.getTime()); + } else if (!bytesEmpty) { + return TimestampWritable.getSeconds(currentBytes, offset); + } else { + throw new IllegalStateException("Both timestamp and bytes are empty"); } - return TimestampWritable.getSeconds(currentBytes, offset); } /** @@ -169,26 +176,33 @@ public int getSeconds() { public int getNanos() { if (!timestampEmpty) { return timestamp.getNanos(); + } else if (!bytesEmpty) { + return hasDecimalOrSecondVInt() ? + TimestampWritable.getNanos(currentBytes, offset + 4) : 0; + } else { + throw new IllegalStateException("Both timestamp and bytes are empty"); } - - return hasDecimal() ? TimestampWritable.getNanos(currentBytes, offset+4) : 0; } /** - * - * @return length of serialized TimestampWritable data + * @return length of serialized TimestampWritable data. As a side effect, populates the internal + * byte array if empty. */ - private int getTotalLength() { - return 4 + getDecimalLength(); + int getTotalLength() { + checkBytes(); + return getTotalLength(currentBytes, offset); } - /** - * - * @return number of bytes the variable length decimal takes up - */ - private int getDecimalLength() { - checkBytes(); - return hasDecimal() ? WritableUtils.decodeVIntSize(currentBytes[offset+4]) : 0; + public static int getTotalLength(byte[] bytes, int offset) { + int len = 4; + if (hasDecimalOrSecondVInt(bytes[offset])) { + int firstVIntLen = WritableUtils.decodeVIntSize(bytes[offset + 4]); + len += firstVIntLen; + if (hasSecondVInt(bytes[offset + 4])) { + len += WritableUtils.decodeVIntSize(bytes[offset + 4 + firstVIntLen]); + } + } + return len; } public Timestamp getTimestamp() { @@ -214,33 +228,45 @@ public byte[] getBytes() { /** * @return byte[] representation of TimestampWritable that is binary - * sortable (4 byte seconds, 4 bytes for nanoseconds) + * sortable (7 bytes for seconds, 4 bytes for nanoseconds) */ public byte[] getBinarySortable() { - byte[] b = new byte[8]; + byte[] b = new byte[BINARY_SORTABLE_LENGTH]; int nanos = getNanos(); - int seconds = HAS_DECIMAL_MASK | getSeconds(); - intToBytes(seconds, b, 0); - intToBytes(nanos, b, 4); + // We flip the highest-order bit of the seven-byte representation of seconds to make negative + // values come before positive ones. + long seconds = getSeconds() ^ SEVEN_BYTE_LONG_SIGN_FLIP; + sevenByteLongToBytes(seconds, b, 0); + intToBytes(nanos, b, 7); return b; } /** * Given a byte[] that has binary sortable data, initialize the internal * structures to hold that data - * @param bytes - * @param offset + * @param bytes the byte array that holds the binary sortable representation + * @param binSortOffset offset of the binary-sortable representation within the buffer. */ - public void setBinarySortable(byte[] bytes, int offset) { - int seconds = bytesToInt(bytes, offset); - int nanos = bytesToInt(bytes, offset+4); - if (nanos == 0) { - seconds &= NO_DECIMAL_MASK; + public void setBinarySortable(byte[] bytes, int binSortOffset) { + // Flip the sign bit (and unused bits of the high-order byte) of the seven-byte long back. + long seconds = readSevenByteLong(bytes, binSortOffset) ^ SEVEN_BYTE_LONG_SIGN_FLIP; + int nanos = bytesToInt(bytes, binSortOffset + 7); + int firstInt = (int) seconds; + boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE; + if (nanos != 0 || hasSecondVInt) { + firstInt |= DECIMAL_OR_SECOND_VINT_FLAG; } else { - seconds |= HAS_DECIMAL_MASK; + firstInt &= LOWEST_31_BITS_OF_SEC_MASK; } - intToBytes(seconds, internalBytes, 0); - setNanosBytes(nanos, internalBytes, 4); + + intToBytes(firstInt, internalBytes, 0); + setNanosBytes(nanos, internalBytes, 4, hasSecondVInt); + if (hasSecondVInt) { + LazyBinaryUtils.writeVLongToByteArray(internalBytes, + 4 + WritableUtils.decodeVIntSize(internalBytes[4]), + seconds >> 31); + } + currentBytes = internalBytes; this.offset = 0; } @@ -267,7 +293,7 @@ private void checkBytes() { public double getDouble() { double seconds, nanos; if (bytesEmpty) { - seconds = timestamp.getTime() / 1000; + seconds = millisToSeconds(timestamp.getTime()); nanos = timestamp.getNanos(); } else { seconds = getSeconds(); @@ -280,10 +306,31 @@ public double getDouble() { public void readFields(DataInput in) throws IOException { in.readFully(internalBytes, 0, 4); - if (TimestampWritable.hasDecimal(internalBytes[0])) { + if (TimestampWritable.hasDecimalOrSecondVInt(internalBytes[0])) { in.readFully(internalBytes, 4, 1); int len = (byte) WritableUtils.decodeVIntSize(internalBytes[4]); - in.readFully(internalBytes, 5, len-1); + if (len > 1) { + in.readFully(internalBytes, 5, len-1); + } + + long vlong = LazyBinaryUtils.readVLongFromByteArray(internalBytes, 4); + if (vlong < -1000000000 || vlong > 999999999) { + throw new IOException( + "Invalid first vint value (encoded nanoseconds) of a TimestampWritable: " + vlong + + ", expected to be between -1000000000 and 999999999."); + // Note that -1000000000 is a valid value corresponding to a nanosecond timestamp + // of 999999999, because if the second VInt is present, we use the value + // (-reversedNanoseconds - 1) as the second VInt. + } + if (vlong < 0) { + // This indicates there is a second VInt containing the additional bits of the seconds + // field. + in.readFully(internalBytes, 4 + len, 1); + int secondVIntLen = (byte) WritableUtils.decodeVIntSize(internalBytes[4 + len]); + if (secondVIntLen > 1) { + in.readFully(internalBytes, 5 + len, secondVIntLen - 1); + } + } } currentBytes = internalBytes; this.offset = 0; @@ -300,8 +347,8 @@ public void write(DataOutput out) throws IOException { public int compareTo(TimestampWritable t) { checkBytes(); - int s1 = this.getSeconds(); - int s2 = t.getSeconds(); + long s1 = this.getSeconds(); + long s2 = t.getSeconds(); if (s1 == s2) { int n1 = this.getNanos(); int n2 = t.getNanos(); @@ -310,7 +357,7 @@ public int compareTo(TimestampWritable t) { } return n1 - n2; } else { - return s1 - s2; + return s1 < s2 ? -1 : 1; } } @@ -341,7 +388,7 @@ public String toString() { @Override public int hashCode() { long seconds = getSeconds(); - seconds <<= 32; + seconds <<= 30; // the nanosecond part fits in 30 bits seconds |= getNanos(); return (int) ((seconds >>> 32) ^ seconds); } @@ -361,13 +408,30 @@ private void populateTimestamp() { * @param offset * @return the number of seconds */ - public static int getSeconds(byte[] bytes, int offset) { - return NO_DECIMAL_MASK & bytesToInt(bytes, offset); + public static long getSeconds(byte[] bytes, int offset) { + int lowest31BitsOfSecondsAndFlag = bytesToInt(bytes, offset); + if (lowest31BitsOfSecondsAndFlag >= 0 || // the "has decimal or second VInt" flag is not set + !hasSecondVInt(bytes[offset + 4])) { + // The entire seconds field is stored in the first 4 bytes. + return lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK; + } + + // We compose the seconds field from two parts. The lowest 31 bits come from the first four + // bytes. The higher-order bits come from the second VInt that follows the nanos field. + return ((long) (lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK)) | + (LazyBinaryUtils.readVLongFromByteArray(bytes, + offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4])) << 31); } public static int getNanos(byte[] bytes, int offset) { + VInt vInt = LazyBinaryUtils.threadLocalVInt.get(); LazyBinaryUtils.readVInt(bytes, offset, vInt); int val = vInt.value; + if (val < 0) { + // This means there is a second VInt present that specifies additional bits of the timestamp. + // The reversed nanoseconds value is still encoded in this VInt. + val = -val - 1; + } int len = (int) Math.floor(Math.log10(val)) + 1; // Reverse the value @@ -386,40 +450,33 @@ public static int getNanos(byte[] bytes, int offset) { } /** - * Writes a Timestamp's serialized value to byte array b at - * @param t - * @param b + * Writes a Timestamp's serialized value to byte array b at the given offset + * @param timestamp to convert to bytes + * @param b destination byte array + * @param offset destination offset in the byte array */ public static void convertTimestampToBytes(Timestamp t, byte[] b, int offset) { - if (b.length < 9) { - LOG.error("byte array too short"); - } long millis = t.getTime(); int nanos = t.getNanos(); - boolean hasDecimal = nanos != 0 && setNanosBytes(nanos, b, offset+4); - setSecondsBytes(millis, b, offset, hasDecimal); - } - - /** - * Given an integer representing seconds, write its serialized - * value to the byte array b at offset - * @param millis - * @param b - * @param offset - * @param hasDecimal - */ - private static void setSecondsBytes(long millis, byte[] b, int offset, boolean hasDecimal) { - int seconds = (int) (millis / 1000); + long seconds = millisToSeconds(millis); + boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE; + boolean hasDecimal = setNanosBytes(nanos, b, offset+4, hasSecondVInt); - if (!hasDecimal) { - seconds &= NO_DECIMAL_MASK; + int firstInt = (int) seconds; + if (hasDecimal || hasSecondVInt) { + firstInt |= DECIMAL_OR_SECOND_VINT_FLAG; } else { - seconds |= HAS_DECIMAL_MASK; + firstInt &= LOWEST_31_BITS_OF_SEC_MASK; } + intToBytes(firstInt, b, offset); - intToBytes(seconds, b, offset); + if (hasSecondVInt) { + LazyBinaryUtils.writeVLongToByteArray(b, + offset + 4 + WritableUtils.decodeVIntSize(b[offset + 4]), + seconds >> 31); + } } /** @@ -431,7 +488,7 @@ private static void setSecondsBytes(long millis, byte[] b, int offset, boolean h * @param offset * @return */ - private static boolean setNanosBytes(int nanos, byte[] b, int offset) { + private static boolean setNanosBytes(int nanos, byte[] b, int offset, boolean hasSecondVInt) { int decimal = 0; if (nanos != 0) { int counter = 0; @@ -443,7 +500,11 @@ private static boolean setNanosBytes(int nanos, byte[] b, int offset) { } } - LazyBinaryUtils.writeVLongToByteArray(b, offset, decimal); + if (hasSecondVInt || decimal != 0) { + // We use the sign of the reversed-nanoseconds field to indicate that there is a second VInt + // present. + LazyBinaryUtils.writeVLongToByteArray(b, offset, hasSecondVInt ? (-decimal - 1) : decimal); + } return decimal != 0; } @@ -457,11 +518,14 @@ public static Timestamp floatToTimestamp(float f) { } public static Timestamp decimalToTimestamp(BigDecimal d) { - BigDecimal seconds = new BigDecimal(d.longValue()); - long millis = d.multiply(new BigDecimal(1000)).longValue(); - int nanos = d.subtract(seconds).multiply(new BigDecimal(1000000000)).intValue(); - - Timestamp t = new Timestamp(millis); + BigDecimal nanoInstant = d.multiply(BILLION_BIG_DECIMAL); + int nanos = nanoInstant.remainder(BILLION_BIG_DECIMAL).intValue(); + if (nanos < 0) { + nanos += 1000000000; + } + long seconds = + nanoInstant.subtract(new BigDecimal(nanos)).divide(BILLION_BIG_DECIMAL).longValue(); + Timestamp t = new Timestamp(seconds * 1000); t.setNanos(nanos); return t; @@ -479,6 +543,10 @@ public static Timestamp doubleToTimestamp(double f) { // Convert to millis long millis = seconds * 1000; + if (nanos < 0) { + millis -= 1000; + nanos += 1000000000; + } Timestamp t = new Timestamp(millis); // Set remaining fractional portion to nanos @@ -487,10 +555,19 @@ public static Timestamp doubleToTimestamp(double f) { } public static void setTimestamp(Timestamp t, byte[] bytes, int offset) { - boolean hasDecimal = hasDecimal(bytes[offset]); - t.setTime(((long) TimestampWritable.getSeconds(bytes, offset)) * 1000); - if (hasDecimal) { - t.setNanos(TimestampWritable.getNanos(bytes, offset+4)); + boolean hasDecimalOrSecondVInt = hasDecimalOrSecondVInt(bytes[offset]); + long seconds = (long) TimestampWritable.getSeconds(bytes, offset); + int nanos = 0; + if (hasDecimalOrSecondVInt) { + nanos = TimestampWritable.getNanos(bytes, offset + 4); + if (hasSecondVInt(bytes[offset + 4])) { + seconds += LazyBinaryUtils.readVLongFromByteArray(bytes, + offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4])); + } + } + t.setTime(seconds * 1000); + if (nanos != 0) { + t.setNanos(nanos); } } @@ -500,17 +577,22 @@ public static Timestamp createTimestamp(byte[] bytes, int offset) { return t; } - public boolean hasDecimal() { - return hasDecimal(currentBytes[offset]); + private static boolean hasDecimalOrSecondVInt(byte b) { + return (b >> 7) != 0; } - /** - * - * @param b first byte in an encoded TimestampWritable - * @return true if it has a decimal portion, false otherwise - */ - public static boolean hasDecimal(byte b) { - return (b >> 7) != 0; + private static boolean hasSecondVInt(byte b) { + return WritableUtils.isNegativeVInt(b); + } + + private final boolean hasDecimalOrSecondVInt() { + return hasDecimalOrSecondVInt(currentBytes[offset]); + } + + public final boolean hasDecimal() { + return hasDecimalOrSecondVInt() || currentBytes[offset + 4] != -1; + // If the first byte of the VInt is -1, the VInt itself is -1, indicating that there is a + // second VInt but the nanoseconds field is actually 0. } /** @@ -526,6 +608,20 @@ private static void intToBytes(int value, byte[] dest, int offset) { dest[offset+3] = (byte) (value & 0xFF); } + /** + * Writes value into dest at offset as a seven-byte + * serialized long number. + */ + static void sevenByteLongToBytes(long value, byte[] dest, int offset) { + dest[offset] = (byte) ((value >> 48) & 0xFF); + dest[offset+1] = (byte) ((value >> 40) & 0xFF); + dest[offset+2] = (byte) ((value >> 32) & 0xFF); + dest[offset+3] = (byte) ((value >> 24) & 0xFF); + dest[offset+4] = (byte) ((value >> 16) & 0xFF); + dest[offset+5] = (byte) ((value >> 8) & 0xFF); + dest[offset+6] = (byte) (value & 0xFF); + } + /** * * @param bytes @@ -539,4 +635,27 @@ private static int bytesToInt(byte[] bytes, int offset) { | ((0xFF & bytes[offset+2]) << 8) | (0xFF & bytes[offset+3]); } + + static long readSevenByteLong(byte[] bytes, int offset) { + // We need to shift everything 8 bits left and then shift back to populate the sign field. + return (((0xFFL & bytes[offset]) << 56) + | ((0xFFL & bytes[offset+1]) << 48) + | ((0xFFL & bytes[offset+2]) << 40) + | ((0xFFL & bytes[offset+3]) << 32) + | ((0xFFL & bytes[offset+4]) << 24) + | ((0xFFL & bytes[offset+5]) << 16) + | ((0xFFL & bytes[offset+6]) << 8)) >> 8; + } + + /** + * Rounds the number of milliseconds relative to the epoch down to the nearest whole number of + * seconds. 500 would round to 0, -500 would round to -1. + */ + static long millisToSeconds(long millis) { + if (millis >= 0) { + return millis / 1000; + } else { + return (millis - 999) / 1000; + } + } } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java index 1f4b92749d23..05dc4b29d2aa 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java @@ -202,10 +202,7 @@ public static void checkObjectByteInfo(ObjectInspector objectInspector, break; case TIMESTAMP: recordInfo.elementOffset = 0; - recordInfo.elementSize = 4; - if(TimestampWritable.hasDecimal(bytes[offset])) { - recordInfo.elementSize += (byte) WritableUtils.decodeVIntSize(bytes[offset+4]); - } + recordInfo.elementSize = TimestampWritable.getTotalLength(bytes, offset); break; default: { throw new RuntimeException("Unrecognized primitive type: " @@ -277,6 +274,13 @@ public VInt() { public byte length; }; + public static final ThreadLocal threadLocalVInt = new ThreadLocal() { + @Override + protected VInt initialValue() { + return new VInt(); + } + }; + /** * Reads a zero-compressed encoded int from a byte array and returns it. * @@ -315,6 +319,28 @@ public static void writeVInt(Output byteStream, int i) { writeVLong(byteStream, i); } + /** + * Read a zero-compressed encoded long from a byte array. + * + * @param bytes the byte array + * @param offset the offset in the byte array where the VLong is stored + * @return the long + */ + public static long readVLongFromByteArray(final byte[] bytes, int offset) { + byte firstByte = bytes[offset++]; + int len = WritableUtils.decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len-1; idx++) { + byte b = bytes[offset++]; + i = i << 8; + i = i | (b & 0xFF); + } + return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i); + } + /** * Write a zero-compressed encoded long to a byte array. * diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java b/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java new file mode 100644 index 000000000000..86ce6976ad01 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java @@ -0,0 +1,452 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.io; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.TimeZone; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +public class TestTimestampWritable extends TestCase { + + private static DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private static final int HAS_DECIMAL_MASK = 0x80000000; + + private static final long MAX_ADDITIONAL_SECONDS_BITS = 0x418937; + + private static long MIN_FOUR_DIGIT_YEAR_MILLIS = parseToMillis("0001-01-01 00:00:00"); + private static long MAX_FOUR_DIGIT_YEAR_MILLIS = parseToMillis("9999-01-01 00:00:00"); + + private static int BILLION = 1000 * 1000 * 1000; + + private static long getSeconds(Timestamp ts) { + // To compute seconds, we first subtract the milliseconds stored in the nanos field of the + // Timestamp from the result of getTime(). + long seconds = (ts.getTime() - ts.getNanos() / 1000000) / 1000; + + // It should also be possible to calculate this based on ts.getTime() only. + assertEquals(seconds, TimestampWritable.millisToSeconds(ts.getTime())); + + return seconds; + } + + private static long parseToMillis(String s) { + try { + return DATE_FORMAT.parse(s).getTime(); + } catch (ParseException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void setUp() { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + } + + private static String normalizeTimestampStr(String timestampStr) { + if (timestampStr.endsWith(".0")) { + return timestampStr.substring(0, timestampStr.length() - 2); + } + return timestampStr; + } + + private static void assertTSWEquals(TimestampWritable expected, TimestampWritable actual) { + assertEquals(normalizeTimestampStr(expected.toString()), + normalizeTimestampStr(actual.toString())); + assertEquals(expected, actual); + assertEquals(expected.getTimestamp(), actual.getTimestamp()); + } + + private static TimestampWritable deserializeFromBytes(byte[] tsBytes) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(tsBytes); + DataInputStream dis = new DataInputStream(bais); + TimestampWritable deserTSW = new TimestampWritable(); + deserTSW.readFields(dis); + return deserTSW; + } + + private static int reverseNanos(int nanos) { + if (nanos == 0) { + return 0; + } + if (nanos < 0 || nanos >= 1000 * 1000 * 1000) { + throw new IllegalArgumentException("Invalid nanosecond value: " + nanos); + } + + int x = nanos; + StringBuilder reversed = new StringBuilder(); + while (x != 0) { + reversed.append((char)('0' + x % 10)); + x /= 10; + } + + int result = Integer.parseInt(reversed.toString()); + while (nanos < 100 * 1000 * 1000) { + result *= 10; + nanos *= 10; + } + return result; + } + + private static byte[] serializeToBytes(Writable w) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + w.write(dos); + return baos.toByteArray(); + } + + private static List toList(byte[] a) { + List list = new ArrayList(a.length); + for (byte b : a) { + list.add(b); + } + return list; + } + + /** + * Pad the given byte array with the given number of bytes in the beginning. The padding bytes + * deterministically depend on the passed data. + */ + private static byte[] padBytes(byte[] bytes, int count) { + byte[] result = new byte[bytes.length + count]; + for (int i = 0; i < count; ++i) { + // Fill the prefix bytes with deterministic data based on the actual meaningful data. + result[i] = (byte) (bytes[i % bytes.length] * 37 + 19); + } + System.arraycopy(bytes, 0, result, count, bytes.length); + return result; + } + + private static TimestampWritable serializeDeserializeAndCheckTimestamp(Timestamp ts) + throws IOException { + TimestampWritable tsw = new TimestampWritable(ts); + assertEquals(ts, tsw.getTimestamp()); + + byte[] tsBytes = serializeToBytes(tsw); + TimestampWritable deserTSW = deserializeFromBytes(tsBytes); + assertTSWEquals(tsw, deserTSW); + assertEquals(ts, deserTSW.getTimestamp()); + assertEquals(tsBytes.length, tsw.getTotalLength()); + + // Also convert to/from binary-sortable representation. + int binarySortableOffset = Math.abs(tsw.hashCode()) % 10; + byte[] binarySortableBytes = padBytes(tsw.getBinarySortable(), binarySortableOffset); + TimestampWritable fromBinSort = new TimestampWritable(); + fromBinSort.setBinarySortable(binarySortableBytes, binarySortableOffset); + assertTSWEquals(tsw, fromBinSort); + + long timeSeconds = ts.getTime() / 1000; + if (0 <= timeSeconds && timeSeconds <= Integer.MAX_VALUE) { + assertEquals(new Timestamp(timeSeconds * 1000), + fromIntAndVInts((int) timeSeconds, 0).getTimestamp()); + + int nanos = reverseNanos(ts.getNanos()); + assertEquals(ts, + fromIntAndVInts((int) timeSeconds | (nanos != 0 ? HAS_DECIMAL_MASK : 0), + nanos).getTimestamp()); + } + + assertEquals(ts.getNanos(), tsw.getNanos()); + assertEquals(getSeconds(ts), tsw.getSeconds()); + + // Test various set methods and copy constructors. + { + TimestampWritable tsSet1 = new TimestampWritable(); + // make the offset non-zero to keep things interesting. + int offset = Math.abs(ts.hashCode() % 32); + byte[] shiftedBytes = padBytes(tsBytes, offset); + tsSet1.set(shiftedBytes, offset); + assertTSWEquals(tsw, tsSet1); + + TimestampWritable tswShiftedBytes = new TimestampWritable(shiftedBytes, offset); + assertTSWEquals(tsw, tswShiftedBytes); + assertTSWEquals(tsw, deserializeFromBytes(serializeToBytes(tswShiftedBytes))); + } + + { + TimestampWritable tsSet2 = new TimestampWritable(); + tsSet2.set(ts); + assertTSWEquals(tsw, tsSet2); + } + + { + TimestampWritable tsSet3 = new TimestampWritable(); + tsSet3.set(tsw); + assertTSWEquals(tsw, tsSet3); + } + + { + TimestampWritable tsSet4 = new TimestampWritable(); + tsSet4.set(deserTSW); + assertTSWEquals(tsw, tsSet4); + } + + double expectedDbl = getSeconds(ts) + 1e-9d * ts.getNanos(); + assertTrue(Math.abs(tsw.getDouble() - expectedDbl) < 1e-10d); + + return deserTSW; + } + + private static int randomNanos(Random rand, int decimalDigits) { + // Only keep the most significant decimalDigits digits. + int nanos = rand.nextInt(BILLION); + return nanos - nanos % (int) Math.pow(10, 9 - decimalDigits); + } + + private static int randomNanos(Random rand) { + return randomNanos(rand, rand.nextInt(10)); + } + + private static void checkTimestampWithAndWithoutNanos(Timestamp ts, int nanos) + throws IOException { + serializeDeserializeAndCheckTimestamp(ts); + + ts.setNanos(nanos); + assertEquals(serializeDeserializeAndCheckTimestamp(ts).getNanos(), nanos); + } + + private static TimestampWritable fromIntAndVInts(int i, long... vints) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(i); + if ((i & HAS_DECIMAL_MASK) != 0) { + for (long vi : vints) { + WritableUtils.writeVLong(dos, vi); + } + } + byte[] bytes = baos.toByteArray(); + TimestampWritable tsw = deserializeFromBytes(bytes); + assertEquals(toList(bytes), toList(serializeToBytes(tsw))); + return tsw; + } + + public void testReverseNanos() { + assertEquals(0, reverseNanos(0)); + assertEquals(120000000, reverseNanos(21)); + assertEquals(32100000, reverseNanos(1230)); + assertEquals(5, reverseNanos(500000000)); + assertEquals(987654321, reverseNanos(123456789)); + assertEquals(12345678, reverseNanos(876543210)); + } + + /** + * Test serializing and deserializing timestamps that can be represented by a number of seconds + * from 0 to 2147483647 since the UNIX epoch. + */ + public void testTimestampsWithinPositiveIntRange() throws IOException { + Random rand = new Random(294722773L); + for (int i = 0; i < 10000; ++i) { + long millis = ((long) rand.nextInt(Integer.MAX_VALUE)) * 1000; + checkTimestampWithAndWithoutNanos(new Timestamp(millis), randomNanos(rand)); + } + } + + private static long randomMillis(long minMillis, long maxMillis, Random rand) { + return minMillis + (long) ((maxMillis - minMillis) * rand.nextDouble()); + } + + /** + * Test timestamps that don't necessarily fit between 1970 and 2038. This depends on HIVE-4525 + * being fixed. + */ + public void testTimestampsOutsidePositiveIntRange() throws IOException { + Random rand = new Random(789149717L); + for (int i = 0; i < 10000; ++i) { + long millis = randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand); + checkTimestampWithAndWithoutNanos(new Timestamp(millis), randomNanos(rand)); + } + } + + public void testTimestampsInFullRange() throws IOException { + Random rand = new Random(2904974913L); + for (int i = 0; i < 10000; ++i) { + checkTimestampWithAndWithoutNanos(new Timestamp(rand.nextLong()), randomNanos(rand)); + } + } + + public void testToFromDouble() { + Random rand = new Random(294729777L); + for (int nanosPrecision = 0; nanosPrecision <= 4; ++nanosPrecision) { + for (int i = 0; i < 10000; ++i) { + long millis = randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand); + Timestamp ts = new Timestamp(millis); + int nanos = randomNanos(rand, nanosPrecision); + ts.setNanos(nanos); + TimestampWritable tsw = new TimestampWritable(ts); + double asDouble = tsw.getDouble(); + int recoveredNanos = + (int) (Math.round((asDouble - Math.floor(asDouble)) * Math.pow(10, nanosPrecision)) * + Math.pow(10, 9 - nanosPrecision)); + assertEquals(String.format("Invalid nanosecond part recovered from %f", asDouble), + nanos, recoveredNanos); + assertEquals(ts, TimestampWritable.doubleToTimestamp(asDouble)); + // decimalToTimestamp should be consistent with doubleToTimestamp for this level of + // precision. + assertEquals(ts, TimestampWritable.decimalToTimestamp( + BigDecimal.valueOf(asDouble))); + } + } + } + + private static BigDecimal timestampToDecimal(Timestamp ts) { + BigDecimal d = new BigDecimal(getSeconds(ts)); + return d.add(new BigDecimal(ts.getNanos()).divide(new BigDecimal(BILLION))); + } + + public void testDecimalToTimestampRandomly() { + Random rand = new Random(294729777L); + for (int i = 0; i < 10000; ++i) { + Timestamp ts = new Timestamp( + randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand)); + ts.setNanos(randomNanos(rand, 9)); // full precision + assertEquals(ts, TimestampWritable.decimalToTimestamp(timestampToDecimal(ts))); + } + } + + public void testDecimalToTimestampCornerCases() { + Timestamp ts = new Timestamp(parseToMillis("1969-03-04 05:44:33")); + assertEquals(0, ts.getTime() % 1000); + for (int nanos : new int[] { 100000, 900000, 999100000, 999900000 }) { + ts.setNanos(nanos); + BigDecimal d = timestampToDecimal(ts); + assertEquals(ts, TimestampWritable.decimalToTimestamp(d)); + assertEquals(ts, TimestampWritable.doubleToTimestamp(d.doubleValue())); + } + } + + public void testSerializationFormatDirectly() throws IOException { + assertEquals("1970-01-01 00:00:00", fromIntAndVInts(0).toString()); + assertEquals("1970-01-01 00:00:01", fromIntAndVInts(1).toString()); + assertEquals("1970-01-01 00:05:00", fromIntAndVInts(300).toString()); + assertEquals("1970-01-01 02:00:00", fromIntAndVInts(7200).toString()); + assertEquals("2000-01-02 03:04:05", fromIntAndVInts(946782245).toString()); + + // This won't have a decimal part because the HAS_DECIMAL_MASK bit is not set. + assertEquals("2000-01-02 03:04:05", fromIntAndVInts(946782245, 3210).toString()); + + assertEquals("2000-01-02 03:04:05.0123", + fromIntAndVInts(946782245 | HAS_DECIMAL_MASK, 3210).toString()); + + assertEquals("2038-01-19 03:14:07", fromIntAndVInts(Integer.MAX_VALUE).toString()); + assertEquals("2038-01-19 03:14:07.012345678", + fromIntAndVInts(Integer.MAX_VALUE | HAS_DECIMAL_MASK, // this is really just -1 + 876543210).toString()); + + // Timestamps with a second VInt storing additional bits of the seconds field. + long seconds = 253392390415L; + assertEquals("9999-09-08 07:06:55", + fromIntAndVInts((int) (seconds & 0x7fffffff) | (1 << 31), -1L, seconds >> 31).toString()); + assertEquals("9999-09-08 07:06:55.0123", + fromIntAndVInts((int) (seconds & 0x7fffffff) | (1 << 31), + -3210 - 1, seconds >> 31).toString()); + } + + public void testMaxSize() { + // This many bytes are necessary to store the reversed nanoseconds. + assertEquals(5, WritableUtils.getVIntSize(999999999)); + assertEquals(5, WritableUtils.getVIntSize(-2 - 999999999)); + + // Bytes necessary to store extra bits of the second timestamp if storing a timestamp + // before 1970 or after 2038. + assertEquals(3, WritableUtils.getVIntSize(Short.MAX_VALUE)); + assertEquals(3, WritableUtils.getVIntSize(Short.MIN_VALUE)); + + // Test that MAX_ADDITIONAL_SECONDS_BITS is really the maximum value of the + // additional bits (beyond 31 bits) of the seconds-since-epoch part of timestamp. + assertTrue((((long) MAX_ADDITIONAL_SECONDS_BITS) << 31) * 1000 < Long.MAX_VALUE); + assertTrue((((double) MAX_ADDITIONAL_SECONDS_BITS + 1) * (1L << 31)) * 1000 > + Long.MAX_VALUE); + + // This is how many bytes we need to store those additonal bits as a VInt. + assertEquals(4, WritableUtils.getVIntSize(MAX_ADDITIONAL_SECONDS_BITS)); + + // Therefore, the maximum total size of a serialized timestamp is 4 + 5 + 4 = 13. + } + + public void testMillisToSeconds() { + assertEquals(0, TimestampWritable.millisToSeconds(0)); + assertEquals(-1, TimestampWritable.millisToSeconds(-1)); + assertEquals(-1, TimestampWritable.millisToSeconds(-999)); + assertEquals(-1, TimestampWritable.millisToSeconds(-1000)); + assertEquals(-2, TimestampWritable.millisToSeconds(-1001)); + assertEquals(-2, TimestampWritable.millisToSeconds(-1999)); + assertEquals(-2, TimestampWritable.millisToSeconds(-2000)); + assertEquals(-3, TimestampWritable.millisToSeconds(-2001)); + assertEquals(-99, TimestampWritable.millisToSeconds(-99000)); + assertEquals(-100, TimestampWritable.millisToSeconds(-99001)); + assertEquals(-100, TimestampWritable.millisToSeconds(-100000)); + assertEquals(1, TimestampWritable.millisToSeconds(1500)); + assertEquals(19, TimestampWritable.millisToSeconds(19999)); + assertEquals(20, TimestampWritable.millisToSeconds(20000)); + } + + private static int compareEqualLengthByteArrays(byte[] a, byte[] b) { + assertEquals(a.length, b.length); + for (int i = 0; i < a.length; ++i) { + if (a[i] != b[i]) { + return (a[i] & 0xff) - (b[i] & 0xff); + } + } + return 0; + } + + private static int normalizeComparisonResult(int result) { + return result < 0 ? -1 : (result > 0 ? 1 : 0); + } + + public void testBinarySortable() { + Random rand = new Random(5972977L); + List tswList = new ArrayList(); + for (int i = 0; i < 50; ++i) { + Timestamp ts = new Timestamp(rand.nextLong()); + ts.setNanos(randomNanos(rand)); + tswList.add(new TimestampWritable(ts)); + } + for (TimestampWritable tsw1 : tswList) { + byte[] bs1 = tsw1.getBinarySortable(); + for (TimestampWritable tsw2 : tswList) { + byte[] bs2 = tsw2.getBinarySortable(); + int binaryComparisonResult = + normalizeComparisonResult(compareEqualLengthByteArrays(bs1, bs2)); + int comparisonResult = normalizeComparisonResult(tsw1.compareTo(tsw2)); + if (binaryComparisonResult != comparisonResult) { + throw new AssertionError("TimestampWritables " + tsw1 + " and " + tsw2 + " compare as " + + comparisonResult + " using compareTo but as " + binaryComparisonResult + " using " + + "getBinarySortable"); + } + } + } + } + +}