1+ import kafka .errors
2+ from kafka import KafkaConsumer , KafkaProducer
3+ import threading
4+ import uuid
5+ import queue
6+ import json
7+ import math
8+ import time
9+
10+
11+
12+ class ProducerThread (threading .Thread ):
13+
14+ def __init__ (self , url , date = None , nb_try_connection = 10 ):
15+
16+ threading .Thread .__init__ (self )
17+
18+ self .date = date
19+ self .nb_try_connection = nb_try_connection
20+
21+
22+
23+ self .init_producer_ok , self .producer = self .init_producer (url )
24+ self .queue_sender = queue .Queue (2 )
25+
26+ self .finished = False
27+
28+
29+ print ('[ProducerThread] Kafka Producer has been initiated...' )
30+
31+ def init_producer (self , url ):
32+
33+ count = 0
34+ while count < self .nb_try_connection :
35+ try :
36+ producer = KafkaProducer (bootstrap_servers = url ,
37+ value_serializer = lambda v : json .dumps (v ).encode ('utf-8' ),
38+ max_request_size = 15728640 )
39+ print ('[ProducerThread] Connection with broker : OK' )
40+ return True , producer
41+
42+ except kafka .errors .NoBrokersAvailable :
43+ print (
44+ "[ProducerThread] Impossible to connect with brokers, waiting kafka launch or check if kafka exist." )
45+ time .sleep (1 )
46+ count += 1
47+
48+ return False , None
49+
50+ def run (self ):
51+
52+ while not self .finished :
53+
54+ try :
55+ msg = self .queue_sender .get (timeout = 1 )
56+ topic = msg ['topic' ]
57+ data = msg ['data' ]
58+
59+ if type (msg ['data' ]) is dict :
60+ msg ['data' ] = json .dumps (msg ['data' ])
61+
62+
63+ future = self .producer .send (topic , data )
64+ result = future .get (timeout = 10 )
65+
66+
67+ except queue .Empty as error :
68+ pass
69+
70+ print ("[ProducerThread][run] Finish." )
71+
72+
73+
74+ def send (self , topic , data ):
75+
76+ msg = {"topic" : topic , "data" : data }
77+ self .queue_sender .put (msg )
78+
79+ def stop (self ):
80+ self .finished = True
81+
82+
83+ class ConsumerThread (threading .Thread ):
84+ def __init__ (self , url , date = None , nb_try_connection = 10 ):
85+
86+ threading .Thread .__init__ (self )
87+
88+ self .date = date
89+ self .nb_try_connection = nb_try_connection
90+
91+ self .callback = {}
92+ self .topics = []
93+ self .finished = False
94+
95+ self .message_split = {}
96+
97+ self .init_consumer_ok , self .consumer = self .init_consumer (url )
98+
99+ print ('[ConsumerThread] Kafka Consumer has been initiated...' )
100+
101+ def init_consumer (self , url ):
102+
103+ count = 0
104+ while count < self .nb_try_connection :
105+ try :
106+ producer = KafkaConsumer (bootstrap_servers = url , max_partition_fetch_bytes = 15728640 ,
107+ receive_buffer_bytes = (15 * 1024 * 1024 ), enable_auto_commit = False , auto_offset_reset = 'latest' )
108+ print ('[ConsumerThread] Connection with broker : OK' )
109+ return True , producer
110+
111+ except kafka .errors .NoBrokersAvailable :
112+ print (
113+ "[ConsumerThread] Impossible to connect with brokers, waiting kafka launch or check if kafka exist." )
114+ time .sleep (1 )
115+ count += 1
116+ return False , None
117+
118+
119+ def add_topics (self , topic , callback ):
120+
121+ self .callback [topic ] = callback
122+ self .topics .append (topic )
123+
124+ def run (self ):
125+
126+ if self .topics != [] :
127+
128+ self .consumer .subscribe (self .topics )
129+
130+ try :
131+ while not self .finished :
132+ msg = self .consumer .poll (timeout_ms = 100 , max_records = 200 )
133+ if not msg :
134+ continue
135+ else :
136+ for topic_partition , messages in msg .items ():
137+ topic = topic_partition .topic
138+
139+ for message in messages :
140+ data = message .value .decode ('UTF-8' )
141+ print (data )
142+ try :
143+ data = json .loads (data )
144+ except json .decoder .JSONDecodeError :
145+ pass
146+ self .callback [topic ](topic , data )
147+
148+ print ("[ConsumerThread][run] Finish." )
149+ finally :
150+ self .consumer .close ()
151+ print ("[ConsumerThread][run] Close." )
152+
153+ def stop (self ):
154+ self .finished = True
0 commit comments