-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproducer.py
More file actions
100 lines (82 loc) · 2.88 KB
/
producer.py
File metadata and controls
100 lines (82 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import sys
import asyncio
import argparse
import logging
import colorlog
from aiokafka import AIOKafkaProducer
from aioconsole import ainput # For non-blocking async input
# Set up colorful logging
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
"%(log_color)s%(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "bold_red",
}
))
logger = colorlog.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
async def produce(topic: str):
logger.info("Starting Kafka Producer...")
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS")
if not bootstrap_servers:
logger.error("KAFKA_BOOTSTRAP_SERVERS environment variable is not set.")
return
logger.info(f"KAFKA_BOOTSTRAP_SERVERS: {bootstrap_servers}")
logger.info(f"Kafka topic: {topic}")
producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)
try:
await producer.start()
except Exception as e:
logger.error(f"❌ Failed to start Kafka producer: {e}")
return
try:
logger.info("Type your message and press Enter (type 'exit' to quit):")
while True:
try:
message = await ainput("> ")
except (EOFError, KeyboardInterrupt):
logger.info("✋ Cancelled by user (Ctrl+C)")
break
message = message.strip()
if not message:
continue
if message.lower() == "exit":
break
try:
result = await producer.send_and_wait(topic, message.encode('utf-8'))
logger.info(f"✅ Delivered message to {result.topic}-{result.partition}@{result.offset}")
except Exception as e:
logger.error(f"❌ Delivery failed: {e}")
finally:
await producer.stop()
logger.info("Kafka producer stopped.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Kafka Producer")
parser.add_argument(
"--topic",
type=str,
default="test-topic",
help="Kafka topic to produce messages to (default: test-topic)",
)
args = parser.parse_args()
async def main():
try:
await produce(args.topic)
except asyncio.CancelledError:
logger.info("✋ Cancelled by user (async)")
except Exception as e:
logger.error(f"Unexpected error: {e}")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 Producer stopped by user.")
sys.exit(0)
# HELP! My neighbor's favorite topic is pets, but he always gets my cats name wrong!
# I've suggested he look on pk-kafka-kafka-bootstrap.pk-world.svc.cluster.local for my cats name
# Think you can find my cats name?