1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+ package org .apache .drill .exec .server .profile ;
19+
20+ import static org .apache .drill .exec .ExecConstants .DRILL_SYS_FILE_SUFFIX ;
21+
22+ import java .io .IOException ;
23+ import java .io .InputStream ;
24+ import java .text .SimpleDateFormat ;
25+ import java .util .Date ;
26+ import java .util .HashMap ;
27+ import java .util .List ;
28+ import java .util .Map ;
29+ import java .util .concurrent .TimeUnit ;
30+
31+ import org .apache .commons .io .IOUtils ;
32+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
33+ import org .apache .drill .common .config .DrillConfig ;
34+ import org .apache .drill .exec .ExecConstants ;
35+ import org .apache .drill .exec .coord .ClusterCoordinator ;
36+ import org .apache .drill .exec .coord .DistributedSemaphore ;
37+ import org .apache .drill .exec .coord .DistributedSemaphore .DistributedLease ;
38+ import org .apache .drill .exec .coord .zk .ZKClusterCoordinator ;
39+ import org .apache .drill .exec .coord .zk .ZkDistributedSemaphore ;
40+ import org .apache .drill .exec .exception .StoreException ;
41+ import org .apache .drill .exec .proto .UserBitShared .QueryProfile ;
42+ import org .apache .drill .exec .server .DrillbitContext ;
43+ import org .apache .drill .exec .server .QueryProfileStoreContext ;
44+ import org .apache .drill .exec .store .dfs .DrillFileSystem ;
45+ import org .apache .drill .exec .store .sys .PersistentStoreConfig ;
46+ import org .apache .drill .exec .store .sys .store .DrillSysFilePathFilter ;
47+ import org .apache .drill .exec .store .sys .store .LocalPersistentStore ;
48+ import org .apache .drill .exec .store .sys .store .ProfileSet ;
49+ import org .apache .drill .exec .store .sys .store .provider .ZookeeperPersistentStoreProvider ;
50+ import org .apache .drill .exec .util .DrillFileSystemUtil ;
51+ import org .apache .drill .shaded .guava .com .google .common .base .Stopwatch ;
52+ import org .apache .hadoop .fs .FileStatus ;
53+ import org .apache .hadoop .fs .Path ;
54+ import org .apache .hadoop .fs .PathFilter ;
55+ import org .slf4j .Logger ;
56+ import org .slf4j .LoggerFactory ;
57+
58+ /**
59+ * Manage profiles by archiving
60+ */
61+ public class ProfileIndexer {
62+ private static final Logger logger = LoggerFactory .getLogger (ProfileIndexer .class );
63+ private static final String lockPathString = "/profileIndexer" ;
64+ private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX .length ();
65+
66+ private final ZKClusterCoordinator zkCoord ;
67+ private final DrillFileSystem fs ;
68+ private final Path basePath ;
69+ private final ProfileSet profiles ;
70+ private final int indexingRate ;
71+ private final PathFilter sysFileSuffixFilter ;
72+ private SimpleDateFormat indexedPathFormat ;
73+ private final boolean useZkCoordinatedManagement ;
74+ private DrillConfig drillConfig ;
75+
76+ private PersistentStoreConfig <QueryProfile > pStoreConfig ;
77+ private LocalPersistentStore <QueryProfile > completedProfileStore ;
78+ private Stopwatch indexWatch ;
79+ private int indexedCount ;
80+ private int currentProfileCount ;
81+
82+
83+ /**
84+ * ProfileIndexer
85+ */
86+ public ProfileIndexer (ClusterCoordinator coord , DrillbitContext context ) throws StoreException , IOException {
87+ drillConfig = context .getConfig ();
88+
89+ // FileSystem
90+ try {
91+ this .fs = inferFileSystem (drillConfig );
92+ } catch (IOException ex ) {
93+ throw new StoreException ("Unable to get filesystem" , ex );
94+ }
95+
96+ //Use Zookeeper for coordinated management
97+ final List <String > supportedFS = drillConfig .getStringList (ExecConstants .PROFILES_STORE_INDEX_SUPPORTED_FS );
98+ if (this .useZkCoordinatedManagement = supportedFS .contains (fs .getScheme ())) {
99+ this .zkCoord = (ZKClusterCoordinator ) coord ;
100+ } else {
101+ this .zkCoord = null ;
102+ }
103+
104+ // Query Profile Store
105+ QueryProfileStoreContext pStoreContext = context .getProfileStoreContext ();
106+ this .completedProfileStore = (LocalPersistentStore <QueryProfile >) pStoreContext .getCompletedProfileStore ();
107+ this .pStoreConfig = pStoreContext .getProfileStoreConfig ();
108+ this .basePath = completedProfileStore .getBasePath ();
109+
110+ this .indexingRate = drillConfig .getInt (ExecConstants .PROFILES_STORE_INDEX_MAX );
111+ this .profiles = new ProfileSet (indexingRate );
112+ this .indexWatch = Stopwatch .createUnstarted ();
113+ this .sysFileSuffixFilter = new DrillSysFilePathFilter ();
114+ String indexPathPattern = drillConfig .getString (ExecConstants .PROFILES_STORE_INDEX_FORMAT );
115+ this .indexedPathFormat = new SimpleDateFormat (indexPathPattern );
116+ logger .info ("Organizing any existing unindexed profiles" );
117+ }
118+
119+
120+ /**
121+ * Index profiles
122+ */
123+ public void indexProfiles () {
124+ this .indexWatch .start ();
125+
126+ // Acquire lock IFF required
127+ if (useZkCoordinatedManagement ) {
128+ DistributedSemaphore indexerMutex = new ZkDistributedSemaphore (zkCoord .getCurator (), lockPathString , 1 );
129+ try (DistributedLease lease = indexerMutex .acquire (0 , TimeUnit .SECONDS )) {
130+ if (lease != null ) {
131+ listAndIndex ();
132+ } else {
133+ logger .debug ("Couldn't get a lease acquisition" );
134+ }
135+ } catch (Exception e ) {
136+ //DoNothing since lease acquisition failed
137+ logger .error ("Exception during lease-acquisition:: {}" , e );
138+ }
139+ } else {
140+ try {
141+ listAndIndex ();
142+ } catch (IOException e ) {
143+ logger .error ("Failed to index: {}" , e );
144+ }
145+ }
146+ logger .info ("Successfully indexed {} of {} profiles during startup in {} seconds" , indexedCount , currentProfileCount , this .indexWatch .stop ().elapsed (TimeUnit .SECONDS ));
147+ }
148+
149+
150+ //Lists and Indexes the latest profiles
151+ private void listAndIndex () throws IOException {
152+ currentProfileCount = listForArchiving ();
153+ indexedCount = 0 ;
154+ logger .info ("Found {} profiles that need to be indexed. Will attempt to index {} profiles" , currentProfileCount ,
155+ (currentProfileCount > this .indexingRate ) ? this .indexingRate : currentProfileCount );
156+
157+ // Track MRU index paths
158+ Map <String , Path > mruIndexPath = new HashMap <>();
159+ if (currentProfileCount > 0 ) {
160+ while (!this .profiles .isEmpty ()) {
161+ String profileToIndex = profiles .removeYoungest () + DRILL_SYS_FILE_SUFFIX ;
162+ Path srcPath = new Path (basePath , profileToIndex );
163+ long profileStartTime = getProfileStart (srcPath );
164+ if (profileStartTime < 0 ) {
165+ logger .debug ("Will skip indexing {}" , srcPath );
166+ continue ;
167+ }
168+ String indexPath = indexedPathFormat .format (new Date (profileStartTime ));
169+ //Check if dest dir exists
170+ Path indexDestPath = null ;
171+ if (!mruIndexPath .containsKey (indexPath )) {
172+ indexDestPath = new Path (basePath , indexPath );
173+ if (!fs .isDirectory (indexDestPath )) {
174+ // Build dir
175+ if (fs .mkdirs (indexDestPath )) {
176+ mruIndexPath .put (indexPath , indexDestPath );
177+ } else {
178+ //Creation failed. Did someone else create?
179+ if (fs .isDirectory (indexDestPath )) {
180+ mruIndexPath .put (indexPath , indexDestPath );
181+ }
182+ }
183+ } else {
184+ mruIndexPath .put (indexPath , indexDestPath );
185+ }
186+ } else {
187+ indexDestPath = mruIndexPath .get (indexPath );
188+ }
189+
190+ //Attempt Move
191+ boolean renameStatus = false ;
192+ if (indexDestPath != null ) {
193+ Path destPath = new Path (indexDestPath , profileToIndex );
194+ renameStatus = DrillFileSystemUtil .rename (fs , srcPath , destPath );
195+ if (renameStatus ) {
196+ indexedCount ++;
197+ }
198+ }
199+ if (indexDestPath == null || !renameStatus ) {
200+ // Stop attempting any more archiving since other StoreProviders might be archiving
201+ logger .error ("Move failed for {} [{} | {}]" , srcPath , indexDestPath == null , renameStatus );
202+ continue ;
203+ }
204+ }
205+ }
206+ }
207+
208+ // Deserialized and extract the profile's start time
209+ private long getProfileStart (Path srcPath ) {
210+ try (InputStream is = fs .open (srcPath )) {
211+ QueryProfile profile = pStoreConfig .getSerializer ().deserialize (IOUtils .toByteArray (is ));
212+ return profile .getStart ();
213+ } catch (IOException e ) {
214+ logger .info ("Unable to deserialize {}\n ---{}====" , srcPath , e .getMessage ()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538]
215+ logger .info ("deserialization RCA==> \n {}" , ExceptionUtils .getRootCause (e ));
216+ }
217+ return Long .MIN_VALUE ;
218+ }
219+
220+ // List all profiles in store's root and identify potential candidates for archiving
221+ private int listForArchiving () throws IOException {
222+ // Not performing recursive search of profiles
223+ List <FileStatus > fileStatuses = DrillFileSystemUtil .listFiles (fs , basePath , false , sysFileSuffixFilter );
224+
225+ int numProfilesInStore = 0 ;
226+ for (FileStatus stat : fileStatuses ) {
227+ String profileName = stat .getPath ().getName ();
228+ //Strip extension and store only query ID
229+ profiles .add (profileName .substring (0 , profileName .length () - DRILL_SYS_FILE_EXT_SIZE ), false );
230+ numProfilesInStore ++;
231+ }
232+
233+ return numProfilesInStore ;
234+ }
235+
236+ // Infers File System of Local Store
237+ private DrillFileSystem inferFileSystem (DrillConfig drillConfig ) throws IOException {
238+ boolean hasZkBlobRoot = drillConfig .hasPath (ZookeeperPersistentStoreProvider .DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT );
239+ final Path blobRoot = hasZkBlobRoot ?
240+ new org .apache .hadoop .fs .Path (drillConfig .getString (ZookeeperPersistentStoreProvider .DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT )) :
241+ LocalPersistentStore .getLogDir ();
242+
243+ return LocalPersistentStore .getFileSystem (drillConfig , blobRoot );
244+ }
245+
246+ }
0 commit comments