8
8
import lombok .Getter ;
9
9
import lombok .NonNull ;
10
10
import lombok .extern .log4j .Log4j2 ;
11
+ import org .opensearch .ResourceNotFoundException ;
12
+ import org .opensearch .action .LatchedActionListener ;
13
+ import org .opensearch .action .get .GetResponse ;
11
14
import org .opensearch .action .search .SearchRequest ;
12
15
import org .opensearch .action .search .SearchResponse ;
13
16
import org .opensearch .client .Client ;
14
17
import org .opensearch .common .util .concurrent .ThreadContext ;
15
18
import org .opensearch .common .xcontent .LoggingDeprecationHandler ;
16
19
import org .opensearch .common .xcontent .XContentType ;
20
+ import org .opensearch .core .action .ActionListener ;
17
21
import org .opensearch .core .xcontent .NamedXContentRegistry ;
18
22
import org .opensearch .core .xcontent .XContentParser ;
19
23
import org .opensearch .search .SearchHit ;
26
30
import java .util .HashMap ;
27
31
import java .util .List ;
28
32
import java .util .Map ;
33
+ import java .util .concurrent .CountDownLatch ;
34
+ import java .util .concurrent .atomic .AtomicBoolean ;
35
+ import java .util .concurrent .atomic .AtomicReference ;
29
36
import java .util .regex .Matcher ;
30
37
import java .util .regex .Pattern ;
31
38
39
+ import static java .util .concurrent .TimeUnit .SECONDS ;
40
+ import static org .opensearch .ml .common .CommonValue .MASTER_KEY ;
32
41
import static org .opensearch .ml .common .utils .StringUtils .gson ;
33
42
34
43
@ Log4j2
@@ -106,14 +115,15 @@ public Boolean validateStopWords(String input, Map<String, List<String>> stopWor
106
115
107
116
public Boolean validateStopWordsSingleIndex (String input , String indexName , List <String > fieldNames ) {
108
117
SearchRequest searchRequest ;
109
- SearchResponse searchResponse ;
118
+ AtomicBoolean hitStopWords = new AtomicBoolean ( false ) ;
110
119
String queryBody ;
111
120
Map <String , String > documentMap = new HashMap <>();
112
121
for (String field : fieldNames ) {
113
122
documentMap .put (field , input );
114
123
}
115
124
Map <String , Object > queryBodyMap = Map
116
125
.of ("query" , Map .of ("percolate" , Map .of ("field" , "query" , "document" , documentMap )));
126
+ CountDownLatch latch = new CountDownLatch (1 );
117
127
118
128
try (ThreadContext .StoredContext context = client .threadPool ().getThreadContext ().stashContext ()) {
119
129
queryBody = AccessController .doPrivileged ((PrivilegedExceptionAction <String >) () -> gson .toJson (queryBodyMap ));
@@ -122,17 +132,26 @@ public Boolean validateStopWordsSingleIndex(String input, String indexName, List
122
132
searchSourceBuilder .parseXContent (queryParser );
123
133
searchSourceBuilder .size (1 ); //Only need 1 doc returned, if hit.
124
134
searchRequest = new SearchRequest ().source (searchSourceBuilder ).indices (indexName );
125
- searchResponse = client .search (searchRequest ).actionGet (1000l * 30 );
126
135
context .restore ();
136
+ client .search (searchRequest , ActionListener .runBefore (new LatchedActionListener (ActionListener .<SearchResponse >wrap (r -> {
137
+ if (r == null || r .getHits () == null || r .getHits ().getTotalHits () == null || r .getHits ().getTotalHits ().value == 0 ) {
138
+ hitStopWords .set (true );
139
+ }
140
+ }, e -> {
141
+ log .error ("Failed to search stop words index {}" , indexName , e );
142
+ hitStopWords .set (true );
143
+ }), latch ), () -> context .restore ()));
127
144
} catch (Exception e ) {
128
145
log .error ("[validateStopWords] Searching stop words index failed." , e );
129
- return false ;
146
+ hitStopWords . set ( true ) ;
130
147
}
131
148
132
- SearchHit [] hits = searchResponse .getHits ().getHits ();
133
- if (hits != null && hits .length > 0 ) {
134
- return false ;
149
+ try {
150
+ latch .await (5 , SECONDS );
151
+ } catch (InterruptedException e ) {
152
+ log .error ("[validateStopWords] Searching stop words index was timeout." , e );
153
+ throw new IllegalStateException (e );
135
154
}
136
- return true ;
155
+ return hitStopWords . get () ;
137
156
}
138
157
}
0 commit comments