22
33import com .azure .cosmos .CosmosAsyncContainer ;
44import com .azure .cosmos .CosmosAsyncDatabase ;
5- import com .azure .cosmos .CosmosException ;
5+ import com .azure .cosmos .implementation . NotFoundException ;
66import com .azure .cosmos .models .*;
77import com .fasterxml .jackson .databind .JsonNode ;
88import com .fasterxml .jackson .databind .ObjectMapper ;
99import com .genexus .db .Cursor ;
1010import com .genexus .db .ServiceCursorBase ;
1111import com .genexus .db .driver .GXConnection ;
12+ import com .genexus .db .service .Query ;
1213import com .genexus .db .service .QueryType ;
1314import com .genexus .db .service .ServicePreparedStatement ;
1415import com .genexus .db .service .VarValue ;
2728import java .util .*;
2829import java .util .concurrent .CountDownLatch ;
2930
30- import java .util .concurrent .atomic .AtomicInteger ;
31+ import java .util .concurrent .atomic .AtomicReference ;
3132import java .util .regex .Matcher ;
3233import java .util .regex .Pattern ;
3334import java .util .stream .Collectors ;
@@ -69,20 +70,19 @@ private int _executeQuery(CosmosDBResultSet resultSet) throws Exception {
6970 {
7071 case QUERY :
7172 {
72- if (querybyPK (query ) && keyCondition != null )
73+ if (isQueryByPK (query ) && keyCondition != null )
7374 {
74- if (keyCondition .isEmpty () || !keyCondition .containsKey ("id" ) || !keyCondition .containsKey (query .getPartitionKey ())) {
75- AtomicInteger statusCode = executeReadByPK (keyCondition .get ("id" ).toString (),keyCondition .get (query .getPartitionKey ()));
76- if (statusCode != null && statusCode .get () == 404 )
75+ if (keyCondition .containsKey ("id" ) && keyCondition .containsKey (query .getPartitionKey ())) {
76+ int [] statusCode = new int [1 ];
77+ resultSet .iterator = executeReadByPK (keyCondition .get ("id" ).toString (),keyCondition .get (query .getPartitionKey ()), statusCode );
78+ if (statusCode != null && statusCode [0 ] == 404 )
7779 return Cursor .EOF ;
80+ return 0 ;
7881 }
7982 }
80- else
81- {
82- String sqlQuery = CosmosDBHelper .createCosmosQuery (query , cursor , parms );
83- resultSet .iterator = queryWithPaging (sqlQuery , new CosmosQueryRequestOptions ());
84- return 0 ;
85- }
83+ String sqlQuery = CosmosDBHelper .createCosmosQuery (query , cursor , parms );
84+ resultSet .iterator = queryWithPaging (sqlQuery , new CosmosQueryRequestOptions ());
85+ return 0 ;
8686 }
8787 case INS :
8888 {
@@ -140,39 +140,58 @@ private void getContainer(String containerName) throws SQLException {
140140 container = getDatabase ().getContainer (containerName );
141141 }
142142
143- private boolean querybyPK (CosmosDBQuery query ){
144- if (query .filters .length > 0 )
143+ private boolean isQueryByPK (CosmosDBQuery query ){
144+ //Find out if the query is by PK and by equality
145+ if (query .filters .length == 1 )
145146 {
146- String equalFilterPattern = "\\ ((.*) = :(.*)\\ ) and \\ ((.*) = :(.*)\\ )" ;
147- //ToDo
147+ String equalFilterPattern = "^\\ (\\ (id = :([a-zA-Z0-9]+):\\ ) and \\ (([a-zA-Z0-9]+) = :([a-zA-Z0-9]+):\\ )\\ )" ;
148+ Matcher matcher = Pattern .compile (equalFilterPattern ).matcher (query .filters [0 ]);
149+ if (matcher .matches ()) {
150+ String attItem = matcher .group (2 );
151+ String pkParmValue ;
152+ if (attItem .equals (query .getPartitionKey ()))
153+ {
154+ pkParmValue = matcher .group (3 );
155+ getVarValuesFromQuery (pkParmValue ,attItem ,query );
156+
157+ String idParmValue = matcher .group (1 );
158+ getVarValuesFromQuery (idParmValue ,"id" ,query );
159+ return true ;
160+ }
161+ }
148162 }
149163 return false ;
150164 }
151-
152- private AtomicInteger executeReadByPK (String idValue , Object partitionKey ) throws Exception {
153- // Read document by ID
154- AtomicInteger statusCode = null ;
165+ private Iterator <HashMap <String , Object >> executeReadByPK (String idValue , Object partitionKey , int [] statusCode ) throws Exception {
166+ // Read document by ID
167+ Iterator <HashMap <String , Object >> iterator = null ;
155168 if (container != null ) {
156- CountDownLatch latch = new CountDownLatch (1 );
169+ AtomicReference <JsonNode > itemRef = new AtomicReference <>();
170+ List <HashMap <String , Object >> hashMapList = new ArrayList <>();
157171
158- Mono <CosmosItemResponse <JsonNode >> itemResponseMono = container .readItem (idValue , toPartitionKey (partitionKey ), JsonNode .class );
172+ try {
173+ container .readItem (idValue , toPartitionKey (partitionKey ), new CosmosItemRequestOptions (), JsonNode .class )
174+ .map (CosmosItemResponse ::getItem )
175+ .doOnNext (item -> {
176+ itemRef .set (item );
177+ })
178+ .block (); // Wait for the read to complete
159179
160- itemResponseMono .doOnSuccess ((response ) -> {
161- latch .countDown (); // signal completion
162- })
163- .doOnError (Exception .class , exception -> {
164- latch .countDown (); // signal completion
165- logger .error (String .format ("Fail: %1" ,exception .getMessage ()));
166- if (exception instanceof CosmosException && ((CosmosException ) exception ).getStatusCode () == 404 )
167- statusCode .set (404 );
168- }).subscribe ();
169-
170- latch .await (); // wait for completion
171- }
172- else {
173- throw new Exception ("CosmosDB Insert By PK Execution failed. Container not found." );
180+ JsonNode item = itemRef .get ();
181+ HashMap <String , Object > pageResult = CosmosDBHelper .jsonNodeToHashMap (item );
182+ hashMapList .add (pageResult );
183+ if (!hashMapList .isEmpty ())
184+ return iterator = hashMapList .iterator ();
185+ else
186+ return null ;
187+ }
188+ catch (NotFoundException ex ) {
189+ statusCode [0 ] = 404 ;
190+ }
174191 }
175- return statusCode ;
192+ else
193+ throw new Exception ("CosmosDB Read By PK Execution failed. Container not found." );
194+ return null ;
176195 }
177196 private Iterator <HashMap <String , Object >> queryWithPaging (String sqlQuery , CosmosQueryRequestOptions options ) throws Exception {
178197
@@ -216,7 +235,6 @@ private int[] deleteDocument(String idValue, Object partitionKey) throws Excepti
216235 logger .debug (String .format ("Deleted document- id: %1 partitionkey: %2" ,idValue ,partitionKey .toString ()));
217236 logger .debug (String .format ("Status Code: %1" ,itemResponse .getStatusCode ()));
218237 statusCode [0 ] = itemResponse .getStatusCode ();
219- //latch.countDown();
220238 })
221239 .doOnError (error -> {
222240 //logger.error(String.format("Fail: %1",error.getMessage()));
@@ -240,7 +258,6 @@ private int[] createDocument(JSONObject jsonObject, Object partitionKey) throws
240258 if (container != null ) {
241259 ObjectMapper mapper = new ObjectMapper ();
242260 String jsonStr = jsonObject .toString ();
243- //Parse string to extract nulls
244261 jsonStr = jsonStr .replaceAll ("\\ \" (null)\\ \" " , "$1" );
245262
246263 JsonNode jsonNode = mapper .readTree (jsonStr );
@@ -252,7 +269,6 @@ private int[] createDocument(JSONObject jsonObject, Object partitionKey) throws
252269 logger .debug (String .format ("Inserted document: %1" ,response .getItem ().toString ()));
253270 logger .debug (String .format ("Status Code: %1" ,response .getStatusCode ()));
254271 statusCode [0 ] = response .getStatusCode ();
255- //latch.countDown();
256272 })
257273 .doOnError (error -> {
258274 //logger.error(String.format("Fail: %1",error.getMessage()));
@@ -289,7 +305,6 @@ private int[] replaceDocument(JSONObject jsonObject, String idValue , Object par
289305 logger .debug (String .format ("Replaced document- id: %1 partitionkey: %2" ,idValue ,partitionKey .toString ()));
290306 logger .debug (String .format ("Status Code: %1" ,itemResponse .getStatusCode ()));
291307 statusCode [0 ] = itemResponse .getStatusCode ();
292- //latch.countDown();
293308 })
294309 .doOnError (error -> {
295310 //logger.error(String.format("Fail: %1",error.getMessage()));
@@ -307,6 +322,20 @@ private int[] replaceDocument(JSONObject jsonObject, String idValue , Object par
307322 throw new Exception ("CosmosDB Replace Execution failed. Container not found." );
308323 }
309324 }
325+ private void getVarValuesFromQuery (String varName , String name , Query query )
326+ {
327+ varName = varName .substring (1 , varName .length () - 1 );
328+ String varNameM = ":" + varName ;
329+ VarValue varValue = null ;
330+ for (Map .Entry <String , VarValue > entry : query .getVars ().entrySet ()) {
331+ if (entry .getKey ().toString ().equals (varNameM )) {
332+ varValue = entry .getValue ();
333+ break ;
334+ }
335+ }
336+ if (varValue != null )
337+ keyCondition .put (name , varValue .value );
338+ }
310339 private JSONObject setUpJsonPayload (boolean isUpdate ) throws JSONException , SQLException {
311340 // Setup the json payload to execute the insert or update query.
312341
@@ -370,7 +399,7 @@ private JSONObject setUpJsonPayload(boolean isUpdate) throws JSONException, SQLE
370399 }
371400
372401 private PartitionKey toPartitionKey (Object value ) throws Exception {
373- if (Double .class .isInstance (value )) //Double.valueOf(value) instanceof Double)
402+ if (Double .class .isInstance (value ))
374403 return new PartitionKey ((double )value );
375404 if (value instanceof Boolean )
376405 return new PartitionKey ((boolean )value );
0 commit comments