Skip to content

Commit 32a3e25

Browse files
committed
[FLINK-38205][format][pb] Discard unknown fields by default
1 parent f859462 commit 32a3e25

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
/** Keeps protobuf constants separately. */
2222
public class PbConstant {
2323
public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
24-
public static final String PB_METHOD_PARSE_FROM = "parseFrom";
24+
public static final String PB_METHOD_PARSER = "parser";
2525
public static final String GENERATED_DECODE_METHOD = "decode";
2626
public static final String GENERATED_ENCODE_METHOD = "encode";
2727
public static final String PB_MAP_KEY_NAME = "key";

flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.google.protobuf.ByteString;
3838
import com.google.protobuf.Descriptors;
3939
import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
40+
import com.google.protobuf.DiscardUnknownFieldsParser;
41+
import com.google.protobuf.Parser;
4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
4244

@@ -53,8 +55,8 @@
5355
*/
5456
public class ProtoToRowConverter {
5557
private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class);
56-
private final Method parseFromMethod;
5758
private final Method decodeMethod;
59+
private final Parser<?> protoParser;
5860
private boolean isCodeSplit = false;
5961

6062
public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
@@ -124,14 +126,16 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
124126
codegenAppender.code());
125127
decodeMethod =
126128
generatedClass.getMethod(PbConstant.GENERATED_DECODE_METHOD, messageClass);
127-
parseFromMethod = messageClass.getMethod(PbConstant.PB_METHOD_PARSE_FROM, byte[].class);
129+
Method parserMethod = messageClass.getMethod(PbConstant.PB_METHOD_PARSER);
130+
Parser originalProtoParser = (Parser) parserMethod.invoke(null);
131+
protoParser = DiscardUnknownFieldsParser.wrap(originalProtoParser);
128132
} catch (Exception ex) {
129133
throw new PbCodegenException(ex);
130134
}
131135
}
132136

133137
public RowData convertProtoBinaryToRow(byte[] data) throws Exception {
134-
Object messageObj = parseFromMethod.invoke(null, data);
138+
Object messageObj = protoParser.parseFrom(data);
135139
return (RowData) decodeMethod.invoke(null, messageObj);
136140
}
137141

0 commit comments

Comments
 (0)