Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

20 datafusion of all three topics into the report #24

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

stheoulle
Copy link
Contributor

No description provided.

@stheoulle stheoulle linked an issue Jan 14, 2025 that may be closed by this pull request
}

fn detect_event_type_topic2(payload: &Value) -> EventType {
if payload.get("truck_id").is_some() && payload.get("latitude").is_some() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking for the existence of two attributes, could you try to instantiating a Position object and in case of an error you return a EventType::Unknown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or can you just get the "type" field and return the correct type ? E.g. :

match payload.get("type") {
  Some(type) => match ...
  None => EventType::Unknown
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is, we do not have a field that is present in all of the 4 eventType. "type" is present for driver and truck, but not for the other two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field "type_" : "position" will be added to the Position element, to be able to use this method.
The types can be as followed : driver, truck, start, end, rest and position

true
} else {
error!("Kafka is unreachable !");
error!("Kafka is unreachable!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the tracing crate instead of the standard one. Tracing provides great features such as log formatting (for json) or a loki exporter which are mandatory in a cloud environment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// List of topics to subscribe to
let topics = vec!["topic1", "topic2", "topic3"];
// Shared HashMap for storing parsed messages
let shared_store: Arc<Mutex<HashMap<String, Vec<Value>>>> = Arc::new(Mutex::new(HashMap::new()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to reconstruct the objects and store the messages in different hashmaps ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is the end goal, I am not here yet

let shared_store = Arc::clone(&shared_store);
let topic = topic.clone();

thread::spawn(move || {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use tokio spawn instead of thread spawn. Actually tokio provides you a managed thread pool so you don't have to manage it yourself. Since you will be doing a lot of IO bound operations you should reallly use tokio since it was made for that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

consumer
.subscribe(&[&topic])
.expect("Subscription to topic failed");

loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do it the clean way, you should implement a graceful shutdown that listen for the ctrl+c unix event (interrupt). You can do that with a channel like in go https://doc.rust-lang.org/rust-by-example/std_misc/channels.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Datafusion of all three topics into the report
3 participants