- A modularized SpringBoot Micro-service based application that provides a central database consisting of posts from various social-media applications.
- The 5 microservices are loosely coupled and can be run independently, while communicating with each other using Kafka, and HTTP calls.
- A 6th Springboot application
infraacts as a library to provide common configurations and utilities to the other microservices.
| Modules | Version | Description |
|---|---|---|
| producer | 2.3.0 | Fetches data from various social media APIs and sends it to Kafka. |
| consumer | 2.3.0 | Reads data from Kafka and processes it. |
| elasticsearchManager | 2.0.0 | Manages the elasticsearch database and writes data to it. |
| searchService | 2.0.0 | Provides a REST API to search data in the elasticsearch database. |
| regexManager | 1.0.0 | Manages the regular expressions used in processing the API responses. |
| infra | 2.1.0 | Provides common configurations and utilities to the other microservices. |
Below given is a brief overview of all loosely coupled applications in the project.
produceris a REST service that fetches data from various social media APIs and sends it to a Kafka topic.- The data is fetched using the
service.apipackage and the data is sent to Kafka using theKafkaProducerServiceclass. - The
KafkaProducerConfigclass defines the Kafka Producer configurations and theKafkaProducerServiceclass uses theKafkaTemplateto send the data to the Kafka topic. ApiServiceabstract class defines the template code required for fetching data from the API. The implementation classes are called periodically to fetch data from the API.NOTEthe implementation classes are inservice.apipackage and each must be annotated with@Serviceto be picked up by the SpringBoot Application Context.
Reddit API requires multiple API calls in order to authenticate and fetch data.
- First, a
POSTrequest is made to https://www.reddit.com/api/v1/authorize to get the authentication code.- It takes following parameters:
client_id: The client id of the application.response_type: The response type. In this case, it iscode.state: A random string to prevent CSRF attacks.redirect_uri: The redirect URI of the application, in this casehttp://localhost:8082/reddit/callback.duration: The duration for which the token is valid. In this case, it ispermanent.scope: The scope of the token. In this case, it isread.
- It takes following parameters:
- The authorize API call then makes
GETcall to theredirect_uriwith parameterscodeandstateorerrorin case of error. - The
redirect_uriendpoint uses the code to send aPOSTrequest to https://www.reddit.com/api/v1/access_token to get the access token.- It takes following parameters:
grant_type: The grant type. In this case, it isauthorization_code.code: The code received from the authorize API call.redirect_uri: The redirect URI of the application, in this casehttp://localhost:8082/reddit/callback.
- It returns
access_tokenandrefresh_tokenon success which are stored inRedditServiceas Instance Variables. - The
access_tokenis used to make further API calls to fetch data. - The
refresh_tokenis used to get a newaccess_tokenwhen the current one expires.
- It takes following parameters:
consumeris a Kafka Consumer that reads data from the Kafka topic and processes it.- A dedicated listener is defined in
KafkaConsumerServiceclass which listens to the Kafka topic and processes the data. - It receives all the api responses and adapts them to a common class named
GenericChannelPost.- Since the apiResponse can contain multiple posts, the adapter returns
List<GenericChannelPost>.
- Since the apiResponse can contain multiple posts, the adapter returns
- Each of these posts is then sent to the
elasticsearchManagermicroservice using an asynchronous REST call.
elasticsearchManagerreceives the data from theconsumermicroservice and stores it in the elasticsearch database.- The manager focuses on the WRITE operations to the elasticsearch database.
- The purpose of the
elasticsearchManageris to decouple the database from the process of generating data. - With this, another database can easily replace elasticsearch and the central post repository since the consumer does not require knowledge of database internals to send a POST request to add a new post.
searchServiceis a simple Web Application API that allows performing search operations on the data present in elasticsearch. ThesearchServiceapplication is the only application that needs to exposed to external world.- This allows restricting access to the public to READ-only operations preventing any unauthorized modifications to the data.
- The
searchServicecan be merged with theelasticsearchManagerapplication to provide a single point of access to the data.- RBAC will have to be implemented in that case with
elasticsearchManageroperations having WRITE access andsearchServiceoperations having READ access.
- RBAC will have to be implemented in that case with
regexManageris a REST service with CRUD operations to manage the regular expressions useful in processing API responses.- This microservice is an additional feature that can help pre-process the posts and introduce additional tags and filters to the posts.
- Although implemented, the regexManager is
not upto-datewith the latest version of the project and so, isnot usedin the current version.
- The Project is a Springboot project requiring
GradleandJava-21. - Clone The repository on your machine using
git clone https://github.com/vihar-s1/api-data-processor.git
- The project is divided into 5 microservices and 1 library.
- Each microservice requires a separate environment file to run.
- The environment file is named
.envand is present in the root directory of each microservice. - Use
.env.samplein each microservice to create a new.envfile. - Once the
.envfile is created, run the following command from project root to start the microservice.
gradle producer:bootRun
gradle consumer:bootRun
gradle elasticsearchManager:bootRun
gradle searchService:bootRun
gradle regexManager:bootRun
Follow the step-by-step guide to add a new API to fetch data from.
- Add a new Enum in
models.ApiTypeto represent the new API. - Create a new class in the
models.apiResponsepackage in the that represents the response from the API. - Create appropriate class for the posts as well.
- The response must implement
ApiResponseInterface. - Go to
models.deserializer.ApiResponseInterfaceDeserializerand add a new case for the newly created API enum for successful deserialization. - In the
models.genericChannelPost.Adapter, implement a static function namedtoGenericChannelPostthat converts the API response to aGenericChannelPost.
- Create a new class in the
producer.service.apipackage that extends abstract classApiService.- The class must be annotated with
@Service. - The class must implement the
fetchDatamethod to fetch data from the API along with the additional abstract methods.
- The class must be annotated with
- Add any additional configurations secrets required in the
application.propertiesfile.- This may include API key, secret, token or more.
- Make sure to import them in
application.propertiesfrom.envfile. - Use
@Valueannotation to inject the values in the class. - Add import validations in the
isExecutablemethod to check if the required configurations are present.
- In
services.KafkaConsumerService, add a new case for the new API in thegenericApiResponseListenermethod.
That's It !! You have successfully integrated a new API to fetch data from.