39
39
import java .util .concurrent .atomic .AtomicLong ;
40
40
41
41
import org .apache .commons .io .FileUtils ;
42
- import org .junit .After ;
43
42
import org .junit .AfterClass ;
44
43
import org .junit .Assert ;
45
- import org .junit .Before ;
46
44
import org .junit .BeforeClass ;
47
45
import org .junit .Test ;
48
46
53
51
import com .marklogic .client .datamovement .ApplyTransformListener ;
54
52
import com .marklogic .client .datamovement .ApplyTransformListener .ApplyResult ;
55
53
import com .marklogic .client .datamovement .DataMovementManager ;
54
+ import com .marklogic .client .datamovement .DeleteListener ;
56
55
import com .marklogic .client .datamovement .JobTicket ;
57
56
import com .marklogic .client .datamovement .QueryBatch ;
58
57
import com .marklogic .client .datamovement .QueryBatcher ;
@@ -190,23 +189,12 @@ public static void tearDownAfterClass() throws Exception {
190
189
detachForest (dbName , dbName + "-" + (i + 1 ));
191
190
deleteForest (dbName + "-" + (i + 1 ));
192
191
}
193
-
194
192
deleteDB (dbName );
195
193
}
196
194
197
- @ Before
198
- public void setUp () throws Exception {
199
-
200
- }
201
-
202
- @ After
203
- public void tearDown () throws Exception {
204
-
205
- }
206
-
207
195
@ Test
208
196
public void jobReport () throws Exception {
209
-
197
+ System . out . println ( "In jobReport method" );
210
198
AtomicInteger batchCount = new AtomicInteger (0 );
211
199
AtomicInteger successCount = new AtomicInteger (0 );
212
200
AtomicLong count1 = new AtomicLong (0 );
@@ -267,11 +255,11 @@ public void jobReport() throws Exception {
267
255
Assert .assertEquals (dmManager .getJobReport (queryTicket ).getSuccessBatchesCount (), count1 .get ());
268
256
Assert .assertEquals (dmManager .getJobReport (queryTicket ).getSuccessBatchesCount (), count2 .get ());
269
257
Assert .assertEquals (dmManager .getJobReport (queryTicket ).getSuccessBatchesCount (), count3 .get ());
270
-
271
258
}
272
259
273
260
@ Test
274
261
public void testNullQdef () throws IOException , InterruptedException {
262
+ System .out .println ("In testNullQdef method" );
275
263
JsonNode node = null ;
276
264
JacksonHandle jacksonHandle = null ;
277
265
@@ -305,7 +293,7 @@ public void testNullQdef() throws IOException, InterruptedException {
305
293
306
294
@ Test
307
295
public void queryFailures () throws Exception {
308
-
296
+ System . out . println ( "In queryFailures method" );
309
297
Thread t1 = new Thread (new DisabledDBRunnable ());
310
298
t1 .setName ("Status Check -1" );
311
299
@@ -399,12 +387,11 @@ public void run() {
399
387
properties .put ("enabled" , "true" );
400
388
changeProperty (properties , "/manage/v2/databases/" + dbName + "/properties" );
401
389
}
402
-
403
390
}
404
391
405
392
@ Test
406
393
public void jobReportStopJob () throws Exception {
407
-
394
+ System . out . println ( "In jobReportStopJob method" );
408
395
QueryBatcher batcher = dmManager .newQueryBatcher (new StructuredQueryBuilder ().collection ("XmlTransform" ))
409
396
.withBatchSize (20 ).withThreadCount (20 );
410
397
AtomicInteger batchCount = new AtomicInteger (0 );
@@ -438,7 +425,7 @@ public void jobReportStopJob() throws Exception {
438
425
// Making sure we can stop jobs based on the JobId.
439
426
@ Test
440
427
public void stopJobUsingJobId () throws Exception {
441
-
428
+ System . out . println ( "In stopJobUsingJobId method" );
442
429
String jobId = UUID .randomUUID ().toString ();
443
430
444
431
QueryBatcher batcher = dmManager .newQueryBatcher (new StructuredQueryBuilder ().collection ("XmlTransform" ))
@@ -477,7 +464,7 @@ public void stopJobUsingJobId() throws Exception {
477
464
478
465
@ Test
479
466
public void jsMasstransformReplace () throws Exception {
480
-
467
+ System . out . println ( "In jsMasstransformReplace method" );
481
468
ServerTransform transform = new ServerTransform ("jsTransform" );
482
469
transform .put ("newValue" , "new Value" );
483
470
@@ -532,10 +519,9 @@ public void jsMasstransformReplace() throws Exception {
532
519
533
520
}
534
521
535
- // ISSUE # 106
536
522
@ Test
537
523
public void stopTransformJobTest () throws Exception {
538
-
524
+ System . out . println ( "In stopTransformJobTest method" );
539
525
ServerTransform transform = new ServerTransform ("add-attr-xquery-transform" );
540
526
transform .put ("name" , "Lang" );
541
527
transform .put ("value" , "French" );
@@ -606,19 +592,28 @@ public void stopTransformJobTest() throws Exception {
606
592
*/
607
593
@ Test
608
594
public void testStopBeforeListenerisComplete () throws Exception {
595
+ ArrayList <String > urisList = new ArrayList <String >();
596
+ final String qMaxBatches = "fn:count(cts:uri-match('/setMaxBatches*'))" ;
609
597
try {
610
- clearDB ( port );
598
+
611
599
System .out .println ("In testStopBeforeListenerisComplete method" );
612
-
613
- final String query1 = "fn:count(fn:doc())" ;
600
+
614
601
final AtomicInteger count = new AtomicInteger (0 );
615
602
final AtomicInteger failedBatch = new AtomicInteger (0 );
616
603
final AtomicInteger successBatch = new AtomicInteger (0 );
617
604
618
605
final AtomicInteger failedBatch2 = new AtomicInteger (0 );
619
606
final AtomicInteger successBatch2 = new AtomicInteger (0 );
620
-
621
- ArrayList <String > urisList = new ArrayList <String >();
607
+
608
+ String jsonDoc = "{" +
609
+ "\" employees\" : [" +
610
+ "{ \" firstName\" :\" John\" , \" lastName\" :\" Doe\" }," +
611
+ "{ \" firstName\" :\" Ann\" , \" lastName\" :\" Smith\" }," +
612
+ "{ \" firstName\" :\" Bob\" , \" lastName\" :\" Foo\" }]" +
613
+ "}" ;
614
+ StringHandle handle = new StringHandle ();
615
+ handle .setFormat (Format .JSON );
616
+ handle .set (jsonDoc );
622
617
623
618
WriteBatcher batcher = dmManager .newWriteBatcher ();
624
619
batcher .withBatchSize (99 );
@@ -638,10 +633,10 @@ class writeDocsThread implements Runnable {
638
633
public void run () {
639
634
640
635
for (int j = 0 ; j < 50000 ; j ++) {
641
- String uri = "/local/json -" + j + "-" + Thread .currentThread ().getId ();
642
- System .out .println ("Thread name: " + Thread .currentThread ().getName () + " URI:" + uri );
636
+ String uri = "/setMaxBatches -" + j + "-" + Thread .currentThread ().getId ();
637
+ // System.out.println("Thread name: " + Thread.currentThread().getName() + " URI:" + uri);
643
638
urisList .add (uri );
644
- batcher .add (uri , fileHandle );
639
+ batcher .add (uri , handle );
645
640
}
646
641
batcher .flushAndWait ();
647
642
}
@@ -678,9 +673,10 @@ public void run() {
678
673
countT .join ();
679
674
680
675
t1 .join ();
681
- int docCnt = dbClient .newServerEval ().xquery (query1 ).eval ().next ().getNumber ().intValue ();
676
+
677
+ int docCnt = dbClient .newServerEval ().xquery (qMaxBatches ).eval ().next ().getNumber ().intValue ();
682
678
System .out .println ("Doc count is " + docCnt );
683
- Assert .assertTrue ( docCnt == 50000 );
679
+ Assert .assertTrue (docCnt == 50000 );
684
680
685
681
Collection <String > batchResults = new LinkedHashSet <String >();
686
682
QueryBatcher qb = dmManager .newQueryBatcher (urisList .iterator ())
@@ -729,7 +725,6 @@ public void run() {
729
725
assertTrue ("Stop QueryBatcher with setMaxBatches set to 2035 is incorrect" , batchResults .size () == 24420 );
730
726
731
727
/* Test 2 setMaxBatches()
732
-
733
728
*/
734
729
Collection <String > batchResults2 = new LinkedHashSet <String >();
735
730
QueryBatcher qb2 = dmManager .newQueryBatcher (urisList .iterator ())
@@ -747,6 +742,7 @@ public void run() {
747
742
failedBatch2 .addAndGet (1 );
748
743
});
749
744
qb2 .setMaxBatches (203 );
745
+
750
746
class BatchesSoFarThread implements Runnable {
751
747
752
748
@ Override
@@ -760,14 +756,14 @@ public void run() {
760
756
qb2 .setMaxBatches ();
761
757
}
762
758
}
763
-
759
+
764
760
Thread tMBStop2 = new Thread (new BatchesSoFarThread ());
761
+ // Wait for the stop thread to initialize before starting DMSDK Job.
765
762
try {
766
763
Thread .sleep (3000 );
767
764
} catch (InterruptedException e ) {
768
765
e .printStackTrace ();
769
766
}
770
-
771
767
dmManager .startJob (qb2 );
772
768
773
769
int initialUrisSize = batchResults2 .size ();
@@ -776,14 +772,34 @@ public void run() {
776
772
qb2 .awaitCompletion ();
777
773
dmManager .stopJob (qb2 );
778
774
775
+ System .out .println ("Doc count in initialUrisSize " + initialUrisSize );
776
+ System .out .println ("Doc count after setMaxBatches() is called " + batchResults2 .size ());
777
+
779
778
assertTrue ("Batches of URIs collected so far" , batchResults2 .size () > 0 );
780
779
assertTrue ("Number of Uris collected does not fall in the range" , (batchResults2 .size ()>initialUrisSize && batchResults2 .size ()< 2436 ));
781
780
}
782
781
catch (Exception ex ) {
783
782
ex .printStackTrace ();
784
783
}
785
784
finally {
786
- clearDB (port );
785
+ // Delete all uris.
786
+ QueryBatcher deleteBatcher = dmManager .newQueryBatcher (urisList .iterator ())
787
+ .onUrisReady (new DeleteListener ())
788
+ .onUrisReady (batch -> {
789
+ //System.out.println("Items in batch " + batch.getItems().length);
790
+ }
791
+ )
792
+ .onQueryFailure (throwable -> {
793
+ System .out .println ("Query Failed" );
794
+ throwable .printStackTrace ();
795
+ })
796
+ .withBatchSize (5000 )
797
+ .withThreadCount (10 );
798
+ dmManager .startJob (deleteBatcher );
799
+ deleteBatcher .awaitCompletion (2 , TimeUnit .MINUTES );
800
+ int docCnt = dbClient .newServerEval ().xquery (qMaxBatches ).eval ().next ().getNumber ().intValue ();
801
+ System .out .println ("All setMaxBatches docs should have been deleted. Count after DeleteListener job is " + docCnt );
802
+ Assert .assertTrue (docCnt == 0 );
787
803
}
788
804
}
789
805
}
0 commit comments