|
| 1 | +import os |
| 2 | +import uuid |
| 3 | +import argparse |
| 4 | +import asyncio |
| 5 | +import logging |
| 6 | +import colorlog |
| 7 | +from aiokafka import AIOKafkaConsumer |
| 8 | + |
| 9 | +# ANSI styles for terminal colors |
| 10 | +STYLES = { |
| 11 | + "blue": "\033[1;34m", |
| 12 | + "yellow": "\033[1;33m", |
| 13 | + "red": "\033[1;31m", |
| 14 | + "green": "\033[1;32m", |
| 15 | + "magenta": "\033[1;35m", |
| 16 | + "cyan": "\033[1;36m", |
| 17 | + "reset": "\033[0m" |
| 18 | +} |
| 19 | + |
| 20 | +# Emoji + color by topic |
| 21 | +TOPIC_META = { |
| 22 | + "alerts": {"emoji": "🚨", "style": STYLES["red"]}, |
| 23 | + "health": {"emoji": "💚", "style": STYLES["green"]}, |
| 24 | + "transactions": {"emoji": "💸", "style": STYLES["cyan"]}, |
| 25 | + "default": {"emoji": "📩", "style": STYLES["blue"]}, |
| 26 | +} |
| 27 | + |
| 28 | +# Colorlog setup |
| 29 | +handler = colorlog.StreamHandler() |
| 30 | +handler.setFormatter(colorlog.ColoredFormatter( |
| 31 | + "%(log_color)s%(message)s", |
| 32 | + log_colors={ |
| 33 | + "DEBUG": "cyan", |
| 34 | + "INFO": "white", |
| 35 | + "WARNING": "yellow", |
| 36 | + "ERROR": "red", |
| 37 | + "CRITICAL": "bold_red", |
| 38 | + } |
| 39 | +)) |
| 40 | +logger = colorlog.getLogger(__name__) |
| 41 | +logger.addHandler(handler) |
| 42 | +logger.setLevel(logging.INFO) |
| 43 | + |
| 44 | + |
| 45 | +async def consume(topic: str): |
| 46 | + bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092") |
| 47 | + group_id = f"test-group-{uuid.uuid4()}" |
| 48 | + |
| 49 | + if not bootstrap_servers: |
| 50 | + logger.error("KAFKA_BOOTSTRAP_SERVERS environment variable is not set.") |
| 51 | + return |
| 52 | + |
| 53 | + logger.info(f"🔌 KAFKA_BOOTSTRAP_SERVERS: {bootstrap_servers}") |
| 54 | + logger.info(f"📦 Kafka topic: {topic}") |
| 55 | + logger.info(f"👥 Consumer group: {group_id}") |
| 56 | + |
| 57 | + consumer = AIOKafkaConsumer( |
| 58 | + topic, |
| 59 | + bootstrap_servers=bootstrap_servers, |
| 60 | + group_id=group_id, |
| 61 | + auto_offset_reset='earliest' |
| 62 | + ) |
| 63 | + |
| 64 | + await consumer.start() |
| 65 | + try: |
| 66 | + logger.info(f"🧲 Subscribed to {topic}, waiting for messages...\n") |
| 67 | + async for msg in consumer: |
| 68 | + decoded = msg.value.decode('utf-8') |
| 69 | + partition = msg.partition |
| 70 | + topic_name = msg.topic |
| 71 | + |
| 72 | + # Determine style & emoji based on topic |
| 73 | + meta = TOPIC_META.get(topic_name, TOPIC_META["default"]) |
| 74 | + emoji = meta["emoji"] |
| 75 | + style = meta["style"] |
| 76 | + |
| 77 | + # Override style if content indicates severity |
| 78 | + lowered = decoded.lower() |
| 79 | + if "error" in lowered: |
| 80 | + style = STYLES["red"] |
| 81 | + elif "warn" in lowered: |
| 82 | + style = STYLES["yellow"] |
| 83 | + elif "success" in lowered or "ok" in lowered: |
| 84 | + style = STYLES["green"] |
| 85 | + |
| 86 | + # Print the styled message |
| 87 | + #logger.info(f"{style}{emoji} [{topic_name}-p{partition}]: {decoded}{STYLES['reset']}") |
| 88 | + message_style = STYLES["yellow"] |
| 89 | + logger.info(f"{style}{emoji} [{topic_name}-p{partition}]: {message_style}{decoded}{STYLES['reset']}") |
| 90 | + |
| 91 | + except KeyboardInterrupt: |
| 92 | + logger.info("👋 Consumer stopped by user.") |
| 93 | + finally: |
| 94 | + await consumer.stop() |
| 95 | + logger.info("🛑 Kafka consumer stopped.") |
| 96 | + |
| 97 | + |
| 98 | +if __name__ == "__main__": |
| 99 | + parser = argparse.ArgumentParser(description="Kafka Consumer") |
| 100 | + parser.add_argument( |
| 101 | + "--topic", |
| 102 | + type=str, |
| 103 | + default="test-topic", |
| 104 | + help="Kafka topic to consume messages from (default: test-topic)" |
| 105 | + ) |
| 106 | + args = parser.parse_args() |
| 107 | + |
| 108 | + try: |
| 109 | + asyncio.run(consume(args.topic)) |
| 110 | + except KeyboardInterrupt: |
| 111 | + print("\n👋 Consumer interrupted by user.") |
| 112 | + |
| 113 | + |
| 114 | +# HELP! My neighbor's favorite topic is pets, but he always gets my cats name wrong! |
| 115 | +# I've suggested he look on pk-kafka-kafka-bootstrap.pk-world.svc.cluster.local for my cats name |
| 116 | +# Think you can find my cats name? |
0 commit comments