Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate image existence and add the data to opensearch. #139

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
- "4900:4900"
- "2021:2021"
- "21890:21890"
networks:
- chorus_network
volumes:
- ./dataprepper/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
- ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
Expand All @@ -18,13 +20,17 @@ services:
build: ./middleware
ports:
- "9090:9090"
networks:
- chorus_network
volumes:
- ./middleware:/python-docker/
mysql:
container_name: mysql
image: amd64/mysql:8
ports:
- 3306:3306
networks:
- chorus_network
environment:
- MYSQL_ROOT_PASSWORD=password
volumes:
Expand Down Expand Up @@ -64,6 +70,8 @@ services:
memory: 4g # Optional: reserve memory for the container
ports:
- "9200:9200"
networks:
- chorus_network
healthcheck:
test: [ "CMD", "wget", "http://localhost:9200" ]
interval: 30s
Expand All @@ -75,6 +83,8 @@ services:
container_name: opensearch-dashboards
ports:
- 5601:5601
networks:
- chorus_network
expose:
- 5601
environment:
Expand All @@ -86,18 +96,21 @@ services:
reactivesearch:
container_name: reactivesearch
build: ./reactivesearch/.

ports:
- 3000:3000
networks:
- chorus_network
volumes:
- './reactivesearch:/usr/src/app'
- '/usr/src/app/node_modules'
ports:
- 3000:3000

quepid:
container_name: quepid
image: o19s/quepid:7.15.1
ports:
- 4000:3000
networks:
- chorus_network
environment:
- PORT=3000
- RACK_ENV=production
Expand Down Expand Up @@ -126,3 +139,11 @@ services:
image: redis:7.0.11-alpine
ports:
- 6379:6379
networks:
- chorus_network



networks:
chorus_network:
driver: bridge
93 changes: 93 additions & 0 deletions opensearch/validate_product_image_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import requests
from opensearchpy import OpenSearch

# Here is an example commandline:
# docker run --network=chorus-opensearch-edition_chorus_network -v "$(pwd)":/app -w /app python:3 bash -c "pip install -r requirements.txt && python3 ./opensearch/validate_product_image.py"

# Configuration
opensearch_host = 'http://opensearch:9200' # Replace with your OpenSearch URL
index_name = 'ecommerce' # Replace with your index name
tracking_file = 'product_image_exists.txt'

# Initialize OpenSearch client
client = OpenSearch(
hosts=[{'host': 'opensearch', 'port': 9200}],
use_ssl = False,
)

info = client.info()
print(f"Welcome to {info['version']['distribution']} {info['version']['number']}!")


# Load existing tracked IDs
def load_tracked_ids():
try:
tracked_ids = {}
with open(tracking_file, 'r') as f:
for line in f:
key, value = line.strip().split(",")
value = value.lower() == 'true'
tracked_ids[key] = value
return tracked_ids
except FileNotFoundError:
return {}

# Save a new tracked ID
def save_tracked_id(doc_id, exists):
with open(tracking_file, 'a') as f:
f.write(f"{doc_id},{exists}\n")

# Check if image exists
def check_image_exists(image_url):
response = requests.head(image_url)
return response.status_code == 200

# Main function to process documents
def process_documents():
tracked_ids = load_tracked_ids()
query = {
"query": {
"match_all": {}
}
}

# Scroll through the documents
response = client.search(index=index_name, body=query, scroll='10m', size=1000)
scroll_id = response['_scroll_id']
total_docs = response['hits']['total']['value']

while True:
for hit in response['hits']['hits']:
doc_id = hit['_id']
image_url = hit['_source'].get('image')

if image_url and doc_id not in tracked_ids:
exists = check_image_exists(image_url)
tracked_ids[doc_id] = exists
# Save the ID to the tracking file
save_tracked_id(doc_id, exists)

if image_url:
exists = tracked_ids[doc_id]
# Update OpenSearch document
client.update(
index=index_name,
id=doc_id,
body={
"doc": {
"image_exists": exists
}
}
)



# Fetch the next batch of documents
response = client.scroll(scroll_id=scroll_id, scroll='10m')
if not response['hits']['hits']:
break

print(f"Processed {total_docs} documents.")

if __name__ == "__main__":
process_documents()
Loading