3
3
import lombok .Getter ;
4
4
import lombok .NonNull ;
5
5
import lombok .extern .log4j .Log4j2 ;
6
+ import org .opensearch .ResourceNotFoundException ;
7
+ import org .opensearch .action .LatchedActionListener ;
8
+ import org .opensearch .action .get .GetResponse ;
6
9
import org .opensearch .action .search .SearchRequest ;
7
10
import org .opensearch .action .search .SearchResponse ;
8
11
import org .opensearch .client .Client ;
9
12
import org .opensearch .common .util .concurrent .ThreadContext ;
10
13
import org .opensearch .common .xcontent .LoggingDeprecationHandler ;
11
14
import org .opensearch .common .xcontent .XContentType ;
15
+ import org .opensearch .core .action .ActionListener ;
12
16
import org .opensearch .core .xcontent .NamedXContentRegistry ;
13
17
import org .opensearch .core .xcontent .XContentParser ;
14
18
import org .opensearch .search .SearchHit ;
21
25
import java .util .HashMap ;
22
26
import java .util .List ;
23
27
import java .util .Map ;
28
+ import java .util .concurrent .CountDownLatch ;
29
+ import java .util .concurrent .atomic .AtomicBoolean ;
30
+ import java .util .concurrent .atomic .AtomicReference ;
24
31
import java .util .regex .Matcher ;
25
32
import java .util .regex .Pattern ;
26
33
34
+ import static java .util .concurrent .TimeUnit .SECONDS ;
35
+ import static org .opensearch .ml .common .CommonValue .MASTER_KEY ;
27
36
import static org .opensearch .ml .common .utils .StringUtils .gson ;
28
37
29
38
@ Log4j2
@@ -101,14 +110,15 @@ public Boolean validateStopWords(String input, Map<String, List<String>> stopWor
101
110
102
111
public Boolean validateStopWordsSingleIndex (String input , String indexName , List <String > fieldNames ) {
103
112
SearchRequest searchRequest ;
104
- SearchResponse searchResponse ;
113
+ AtomicBoolean hitStopWords = new AtomicBoolean ( false ) ;
105
114
String queryBody ;
106
115
Map <String , String > documentMap = new HashMap <>();
107
116
for (String field : fieldNames ) {
108
117
documentMap .put (field , input );
109
118
}
110
119
Map <String , Object > queryBodyMap = Map
111
120
.of ("query" , Map .of ("percolate" , Map .of ("field" , "query" , "document" , documentMap )));
121
+ CountDownLatch latch = new CountDownLatch (1 );
112
122
113
123
try (ThreadContext .StoredContext context = client .threadPool ().getThreadContext ().stashContext ()) {
114
124
queryBody = AccessController .doPrivileged ((PrivilegedExceptionAction <String >) () -> gson .toJson (queryBodyMap ));
@@ -117,17 +127,26 @@ public Boolean validateStopWordsSingleIndex(String input, String indexName, List
117
127
searchSourceBuilder .parseXContent (queryParser );
118
128
searchSourceBuilder .size (1 ); //Only need 1 doc returned, if hit.
119
129
searchRequest = new SearchRequest ().source (searchSourceBuilder ).indices (indexName );
120
- searchResponse = client .search (searchRequest ).actionGet (1000l * 30 );
121
130
context .restore ();
131
+ client .search (searchRequest , ActionListener .runBefore (new LatchedActionListener (ActionListener .<SearchResponse >wrap (r -> {
132
+ if (r == null || r .getHits () == null || r .getHits ().getTotalHits () == null || r .getHits ().getTotalHits ().value == 0 ) {
133
+ hitStopWords .set (true );
134
+ }
135
+ }, e -> {
136
+ log .error ("Failed to search stop words index {}" , indexName , e );
137
+ hitStopWords .set (true );
138
+ }), latch ), () -> context .restore ()));
122
139
} catch (Exception e ) {
123
140
log .error ("[validateStopWords] Searching stop words index failed." , e );
124
- return false ;
141
+ hitStopWords . set ( true ) ;
125
142
}
126
143
127
- SearchHit [] hits = searchResponse .getHits ().getHits ();
128
- if (hits != null && hits .length > 0 ) {
129
- return false ;
144
+ try {
145
+ latch .await (5 , SECONDS );
146
+ } catch (InterruptedException e ) {
147
+ log .error ("[validateStopWords] Searching stop words index was timeout." , e );
148
+ throw new IllegalStateException (e );
130
149
}
131
- return true ;
150
+ return hitStopWords . get () ;
132
151
}
133
152
}
0 commit comments