Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ATLAS-4922: Atlas Async Import using Kafka #307

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

DishaTalreja3
Copy link
Contributor

What changes were proposed in this pull request?

The existing synchronous import API causes performance bottlenecks with large or multiple requests. This patch introduces asynchronous imports using Kafka to queue requests via dedicated topics, enabling users to submit multiple import requests concurrently.

New APIs added to:

  • Perform asynchronous imports
  • Check import status (all operations or a specific import ID)
  • Abort a queued import operation

How was this patch tested?

Manual Testing, Unit Tests

@DishaTalreja3 DishaTalreja3 marked this pull request as ready for review March 12, 2025 18:41
Copy link
Contributor

@mneethiraj mneethiraj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DishaTalreja3 - I haven't completed review yet; sending comments so far. Please review and update the patch.

@@ -170,6 +177,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
private static final String ATLAS_HOOK_CONSUMER_THREAD_NAME = "atlas-hook-consumer-thread";
private static final String ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME = "atlas-hook-unsorted-consumer-thread";
private static final String ATLAS_IMPORT_CONSUMER_THREAD_PREFIX = "atlas-import-hook-consumer-thread-";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atlas-import-hook-consumer-thread- => atlas-import-consumer-thread-

consumersIterator.remove();
}
}
notificationInterface.closeConsumer(ASYNC_IMPORT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would close all consumers of type ASYNC_IMPORT. Shouldn't only consumers of topic be closed instead? Perhaps only one consumer will be active for type ASYNC_IMPORT in the current implementation status; but this part of code should better handle multiple simultaneous imports.


startConsumers(executorService);
public void startImportNotificationConsumer(NotificationType notificationType, String importId, String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startImportNotificationConsumer(notificationType, importId, topic) ==> startAsyncImportConsumer(importId, topic)

ListIterator<HookConsumer> consumersIterator = consumers.listIterator();
while (consumersIterator.hasNext()) {
HookConsumer consumer = consumersIterator.next();
if (consumer.getName().contains(consumerName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contains(consumerName) => startsWith(consumerName)

new SynchronousQueue<>(), // Direct handoff queue
new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
executors = executorService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review all writes to executors i.e. following methods:

  • startInternal()
  • startConsumers()
  • stop()

Prior to this patch, these methods are to be called only once in an Atlas instance. However, now executors can be overwritten for every call to async import. Review the usage carefully and update to avoid inappropriate overwrites.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this new approach the executors are overWritten in
startConsumers() - only when the passed in executorService is null (essentially at the start of the instance or start of the first async import request). Please correct me f I am wrong.

@@ -224,7 +224,7 @@ public enum AtlasErrorCode {
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"),
PENDING_TASKS_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-013", "There are already {0} pending tasks in queue"),
IMPORT_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-016", "Given import request {0} is already in progress or completed"),
ABORT_IMPORT(409, "ATLAS-409-00-016", "Import id={0} is currently in state {1}, cannot be aborted"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ABORT_IMPORT => IMPORT_ABORT_NOT_ALLOWED

@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasAsyncImportRequest extends AtlasBaseModelObject implements Serializable {
private static final long serialVersionUID = 1L;
public static final String ASYNC_IMPORT_TYPE_NAME = "__AtlasAsyncImportRequest";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constants defined in line 47-49 seem to be used/needed only in Atlas server side. If yes, please move these to server side (like repository module).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These constants are used in this model class as well. Moving them to the repository module might introduce unnecessary dependencies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the classes in model package referenes ASYNC_IMPORT_TYPE_NAME. This is referenced only from AsyncImportService and AtlasAsyncImportRequestDTO.

}

@JsonIgnoreProperties(ignoreUnknown = true)
public static class ImportDetails {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • add implements Serializable
  • failures is marked as final. How does this work drring deserialization?
  • failures is initialized with ConcurrentHashMap. Is this necessary? Json deserializer might assign a Hashmap instance.

@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class ImportNotification extends HookNotification implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImportNotification is not instantiated anywhere. Consider following updates:

  • make ImportNotification constructor as protected
  • have AtlasTypeDefImportNotification and AtlasEntityImportNotification extend ImportNotification
  • move importId field to ImportNotification

private String importId;

@JsonProperty
private AtlasEntityWithExtInfo entities;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entities => entity

KafkaProducer producer = getOrCreateProducer(topic);

sendInternalToProducer(producer, topic, messages);
sendInternal(topic, messages, SORT_NOT_NEEDED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I get it now. SORT_NOT_NEEDED doesn't add much to readability, given the called method is right above. I suggest to simply send false here.

public class AsyncImportStatus {
private String importId;
private AtlasAsyncImportRequest.ImportStatus status;
private String importRequestReceivedAt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should importRequestReceivedAt be of type Date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept importRequestReceivedAt as a String to avoid the default timestamp serialization of Date and to ensure a more readable format.

private String importId;
private AtlasAsyncImportRequest.ImportStatus status;
private String importRequestReceivedAt;
private String importRequestReceivedBy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does field importRequestReceivedBy capture? Is it username or hostname or anything else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the username

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

importRequestReceivedBy is username? Which user would it be? Atlas server principal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the requesting user, retrieved using Servlets.getUserName(httpServletRequest). This is also tracked in the existing import API and is part of the AtlasImportResult object.

@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasAsyncImportRequest extends AtlasBaseModelObject implements Serializable {
private static final long serialVersionUID = 1L;
public static final String ASYNC_IMPORT_TYPE_NAME = "__AtlasAsyncImportRequest";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the classes in model package referenes ASYNC_IMPORT_TYPE_NAME. This is referenced only from AsyncImportService and AtlasAsyncImportRequestDTO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants