33
33
import org .apache .drill .exec .server .options .OptionManager ;
34
34
import org .apache .drill .exec .vector .complex .writer .BaseWriter ;
35
35
import org .apache .drill .exec .vector .complex .writer .BaseWriter .ComplexWriter ;
36
-
36
+ @ SuppressWarnings ( "unused" )
37
37
public class JsonConvertFrom {
38
38
39
39
private JsonConvertFrom () {}
@@ -55,38 +55,39 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc {
55
55
ResultSetLoader rsLoader ;
56
56
57
57
@ Workspace
58
- org .apache .drill .exec .store .easy .json .loader .SingleElementIterator <java .io .InputStream > stream ;
58
+ org .apache .drill .exec .store .easy .json .loader .SingleElementIterator <java .io .InputStream > streamIter ;
59
59
60
60
@ Workspace
61
61
org .apache .drill .exec .store .easy .json .loader .JsonLoaderImpl jsonLoader ;
62
62
63
63
@ Override
64
64
public void setup () {
65
+ streamIter = new org .apache .drill .exec .store .easy .json .loader .SingleElementIterator <>();
65
66
rsLoader .startBatch ();
66
67
}
67
68
68
69
@ Override
69
70
public void eval () {
70
- if ( in . end == 0 ) {
71
- // Return empty map
71
+ // If the input is null or empty, return an empty map
72
+ if ( in . isSet == 0 || in . start == in . end ) {
72
73
return ;
73
74
}
74
75
75
76
java .io .InputStream inputStream = org .apache .drill .exec .vector .complex .fn .DrillBufInputStream .getStream (in .start , in .end , in .buffer );
76
77
77
78
try {
78
- stream .setValue (inputStream );
79
+ streamIter .setValue (inputStream );
79
80
80
81
if (jsonLoader == null ) {
81
- jsonLoader = org .apache .drill .exec .expr .fn .impl .conv .JsonConverterUtils .createJsonLoader (rsLoader , options , stream );
82
+ jsonLoader = org .apache .drill .exec .expr .fn .impl .conv .JsonConverterUtils .createJsonLoader (rsLoader , options , streamIter );
82
83
}
83
84
84
85
org .apache .drill .exec .physical .resultSet .RowSetLoader rowWriter = rsLoader .writer ();
85
86
rowWriter .start ();
86
87
if (jsonLoader .parser ().next ()) {
87
88
rowWriter .save ();
88
89
}
89
- inputStream .close ();
90
+ // inputStream.close();
90
91
91
92
} catch (Exception e ) {
92
93
throw org .apache .drill .common .exceptions .UserException .dataReadError (e )
@@ -108,7 +109,7 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc {
108
109
ComplexWriter writer ;
109
110
110
111
@ Workspace
111
- org .apache .drill .exec .store .easy .json .loader .SingleElementIterator <java .io .InputStream > stream ;
112
+ org .apache .drill .exec .store .easy .json .loader .SingleElementIterator <java .io .InputStream > streamIter ;
112
113
113
114
@ Inject
114
115
OptionManager options ;
@@ -119,25 +120,25 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc {
119
120
@ Workspace
120
121
org .apache .drill .exec .store .easy .json .loader .JsonLoaderImpl jsonLoader ;
121
122
122
-
123
123
@ Override
124
124
public void setup () {
125
+ streamIter = new org .apache .drill .exec .store .easy .json .loader .SingleElementIterator <>();
125
126
rsLoader .startBatch ();
126
127
}
127
128
128
129
@ Override
129
130
public void eval () {
130
- String jsonString = org .apache .drill .exec .expr .fn .impl .StringFunctionHelpers .toStringFromUTF8 (in .start , in .end , in .buffer );
131
-
132
131
// If the input is null or empty, return an empty map
133
- if (jsonString . length () == 0 ) {
132
+ if (in . isSet == 0 || in . start == in . end ) {
134
133
return ;
135
134
}
136
135
136
+ java .io .InputStream inputStream = org .apache .drill .exec .vector .complex .fn .DrillBufInputStream .getStream (in .start , in .end , in .buffer );
137
+
137
138
try {
138
- stream .setValue (org . apache . drill . exec . expr . fn . impl . conv . JsonConverterUtils . convertStringToInputStream ( jsonString ) );
139
+ streamIter .setValue (inputStream );
139
140
if (jsonLoader == null ) {
140
- jsonLoader = org .apache .drill .exec .expr .fn .impl .conv .JsonConverterUtils .createJsonLoader (rsLoader , options , stream );
141
+ jsonLoader = org .apache .drill .exec .expr .fn .impl .conv .JsonConverterUtils .createJsonLoader (rsLoader , options , streamIter );
141
142
}
142
143
org .apache .drill .exec .physical .resultSet .RowSetLoader rowWriter = rsLoader .writer ();
143
144
rowWriter .start ();
0 commit comments