@@ -96,6 +96,7 @@ public void init(Context context) throws IOException {
9696 initializePeerUUID ();
9797 initializeBackupFileSystemManager ();
9898 startWalFlushExecutor ();
99+ LOG .info ("{} Initialization complete" , Utils .logPeerId (peerId ));
99100 }
100101
101102 private void initializePeerUUID () throws IOException {
@@ -156,20 +157,24 @@ private void flushAndBackupSafely() {
156157 }
157158
158159 private void flushWriters () throws IOException {
160+ LOG .info ("{} Flushing {} WAL writers" , Utils .logPeerId (peerId ), walWriters .size ());
159161 for (Map .Entry <Long , FSHLogProvider .Writer > entry : walWriters .entrySet ()) {
160162 FSHLogProvider .Writer writer = entry .getValue ();
161163 if (writer != null ) {
164+ LOG .debug ("{} Closing WAL writer for day: {}" , Utils .logPeerId (peerId ), entry .getKey ());
162165 try {
163166 writer .close ();
167+ LOG .debug ("{} Successfully closed WAL writer for day: {}" , Utils .logPeerId (peerId ),
168+ entry .getKey ());
164169 } catch (IOException e ) {
165170 LOG .error ("{} Failed to close WAL writer for day: {}. Error: {}" , Utils .logPeerId (peerId ),
166- entry .getKey (), e .getMessage ());
171+ entry .getKey (), e .getMessage (), e );
167172 throw e ;
168173 }
169174 }
170175 }
171176 walWriters .clear ();
172- LOG .debug ("{} WAL writers flushed and cleared" , Utils .logPeerId (peerId ));
177+ LOG .info ("{} WAL writers flushed and cleared" , Utils .logPeerId (peerId ));
173178 }
174179
175180 @ Override
@@ -198,19 +203,30 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
198203 return ReplicationResult .SUBMITTED ;
199204 }
200205
206+ LOG .debug ("{} Received {} WAL entries for replication" , Utils .logPeerId (peerId ),
207+ entries .size ());
208+
201209 Map <Long , List <WAL .Entry >> groupedEntries = groupEntriesByDay (entries );
202- lock .lock ();
210+ LOG .debug ("{} Grouped WAL entries by day: {}" , Utils .logPeerId (peerId ),
211+ groupedEntries .keySet ());
203212
213+ lock .lock ();
204214 try {
205215 for (Map .Entry <Long , List <WAL .Entry >> entry : groupedEntries .entrySet ()) {
216+ LOG .debug ("{} Backing up {} WAL entries for day {}" , Utils .logPeerId (peerId ),
217+ entry .getValue ().size (), entry .getKey ());
206218 backupWalEntries (entry .getKey (), entry .getValue ());
207219 }
208220
209221 if (isAnyWriterFull ()) {
222+ LOG .debug ("{} Some WAL writers reached max size, triggering flush" ,
223+ Utils .logPeerId (peerId ));
210224 flushWriters ();
225+ LOG .debug ("{} Replication committed after WAL flush" , Utils .logPeerId (peerId ));
211226 return ReplicationResult .COMMITTED ;
212227 }
213228
229+ LOG .debug ("{} Replication submitted successfully" , Utils .logPeerId (peerId ));
214230 return ReplicationResult .SUBMITTED ;
215231 } catch (IOException e ) {
216232 LOG .error ("{} Replication failed. Error details: {}" , Utils .logPeerId (peerId ), e .getMessage (),
@@ -237,16 +253,29 @@ private boolean isWriterFull(FSHLogProvider.Writer writer) {
237253 }
238254
239255 private void backupWalEntries (long day , List <WAL .Entry > walEntries ) throws IOException {
256+ LOG .debug ("{} Starting backup of {} WAL entries for day {}" , Utils .logPeerId (peerId ),
257+ walEntries .size (), day );
258+
240259 try {
241260 FSHLogProvider .Writer walWriter = walWriters .computeIfAbsent (day , this ::createWalWriter );
242261 List <Path > bulkLoadFiles = BulkLoadProcessor .processBulkLoadFiles (walEntries );
262+
263+ if (LOG .isTraceEnabled ()) {
264+ LOG .trace ("{} Processed {} bulk load files for WAL entries" , Utils .logPeerId (peerId ),
265+ bulkLoadFiles .size ());
266+ LOG .trace ("{} Bulk load files: {}" , Utils .logPeerId (peerId ),
267+ bulkLoadFiles .stream ().map (Path ::toString ).collect (Collectors .joining (", " )));
268+ }
269+
243270 for (WAL .Entry entry : walEntries ) {
244271 walWriter .append (entry );
245272 }
246273 walWriter .sync (true );
247274 uploadBulkLoadFiles (bulkLoadFiles );
248275 } catch (UncheckedIOException e ) {
249276 String errorMsg = Utils .logPeerId (peerId ) + " Failed to get or create WAL Writer for " + day ;
277+ LOG .error ("{} Backup failed for day {}. Error: {}" , Utils .logPeerId (peerId ), day ,
278+ e .getMessage (), e );
250279 throw new IOException (errorMsg , e );
251280 }
252281 }
@@ -275,7 +304,7 @@ private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
275304 writer .init (fs , walFilePath , conf , true , WALUtil .getWALBlockSize (conf , fs , walFilePath ),
276305 StreamSlowMonitor .create (conf , walFileName ));
277306
278- LOG .debug ("{} Created WAL writer for day : {}" , Utils .logPeerId (peerId ), dayDirectoryName );
307+ LOG .info ("{} WAL writer created : {}" , Utils .logPeerId (peerId ), walFilePath );
279308 return writer ;
280309 } catch (Exception e ) {
281310 throw new UncheckedIOException (
@@ -299,6 +328,7 @@ protected void doStop() {
299328 }
300329
301330 private void close () {
331+ LOG .info ("{} Closing WAL replication component..." , Utils .logPeerId (peerId ));
302332 shutdownFlushExecutor ();
303333 lock .lock ();
304334 try {
@@ -309,17 +339,29 @@ private void close() {
309339 e );
310340 } finally {
311341 lock .unlock ();
342+ LOG .info ("{} WAL replication component closed." , Utils .logPeerId (peerId ));
312343 }
313344 }
314345
315346 private void uploadBulkLoadFiles (List <Path > bulkLoadFiles ) throws IOException {
347+ LOG .debug ("{} Starting upload of {} bulk load files" , Utils .logPeerId (peerId ),
348+ bulkLoadFiles .size ());
349+
350+ if (LOG .isTraceEnabled ()) {
351+ LOG .trace ("{} Bulk load files to upload: {}" , Utils .logPeerId (peerId ),
352+ bulkLoadFiles .stream ().map (Path ::toString ).collect (Collectors .joining (", " )));
353+ }
316354 for (Path file : bulkLoadFiles ) {
317355 Path sourcePath = getBulkLoadFileStagingPath (file );
318356 Path destPath = new Path (backupFileSystemManager .getBulkLoadFilesDir (), file );
319357
320358 try {
359+ LOG .debug ("{} Copying bulk load file from {} to {}" , Utils .logPeerId (peerId ), sourcePath ,
360+ destPath );
361+
321362 FileUtil .copy (CommonFSUtils .getRootDirFileSystem (conf ), sourcePath ,
322363 backupFileSystemManager .getBackupFs (), destPath , false , conf );
364+
323365 LOG .info ("{} Bulk load file {} successfully backed up to {}" , Utils .logPeerId (peerId ), file ,
324366 destPath );
325367 } catch (IOException e ) {
@@ -328,6 +370,8 @@ private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
328370 throw e ;
329371 }
330372 }
373+
374+ LOG .debug ("{} Completed upload of bulk load files" , Utils .logPeerId (peerId ));
331375 }
332376
333377 private Path getBulkLoadFileStagingPath (Path relativePathFromNamespace ) throws IOException {
@@ -338,20 +382,34 @@ private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws I
338382 Path hFileArchiveDir =
339383 new Path (rootDir , new Path (HConstants .HFILE_ARCHIVE_DIRECTORY , baseNSDir ));
340384
385+ LOG .debug ("{} Searching for bulk load file: {} in paths: {}, {}" , Utils .logPeerId (peerId ),
386+ relativePathFromNamespace , baseNamespaceDir , hFileArchiveDir );
387+
341388 Path result =
342389 findExistingPath (rootFs , baseNamespaceDir , hFileArchiveDir , relativePathFromNamespace );
390+
343391 if (result == null ) {
392+ LOG .error ("{} No bulk loaded file found in relative path: {}" , Utils .logPeerId (peerId ),
393+ relativePathFromNamespace );
344394 throw new IOException (
345395 "No Bulk loaded file found in relative path: " + relativePathFromNamespace );
346396 }
397+
398+ LOG .debug ("{} Bulk load file found at {}" , Utils .logPeerId (peerId ), result );
347399 return result ;
348400 }
349401
350402 private static Path findExistingPath (FileSystem rootFs , Path baseNamespaceDir ,
351403 Path hFileArchiveDir , Path filePath ) throws IOException {
404+ if (LOG .isTraceEnabled ()) {
405+ LOG .trace ("Checking for bulk load file at: {} and {}" , new Path (baseNamespaceDir , filePath ),
406+ new Path (hFileArchiveDir , filePath ));
407+ }
408+
352409 for (Path candidate : new Path [] { new Path (baseNamespaceDir , filePath ),
353410 new Path (hFileArchiveDir , filePath ) }) {
354411 if (rootFs .exists (candidate )) {
412+ LOG .debug ("Found bulk load file at: {}" , candidate );
355413 return candidate ;
356414 }
357415 }
@@ -360,14 +418,16 @@ private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir,
360418
361419 private void shutdownFlushExecutor () {
362420 if (flushExecutor != null ) {
421+ LOG .info ("{} Initiating WAL flush executor shutdown." , Utils .logPeerId (peerId ));
422+
363423 flushExecutor .shutdown ();
364424 try {
365425 if (
366426 !flushExecutor .awaitTermination (EXECUTOR_TERMINATION_TIMEOUT_SECONDS , TimeUnit .SECONDS )
367427 ) {
368- flushExecutor .shutdownNow ();
369428 LOG .warn ("{} Flush executor did not terminate within timeout, forcing shutdown." ,
370429 Utils .logPeerId (peerId ));
430+ flushExecutor .shutdownNow ();
371431 }
372432 } catch (InterruptedException e ) {
373433 Thread .currentThread ().interrupt ();
0 commit comments