1
- ##################################################################################################
2
- #Rabbitmq: Docbao Rabbitmq Client - Dang Hai Loc #
3
- #Function: Get crawled posts through RabbitMQ #
4
- ##################################################################################################
1
+ ######################################################################
2
+ # Rabbitmq: Docbao Rabbitmq Client - Dang Hai Loc #
3
+ # Function: Get crawled posts through RabbitMQ #
4
+ ######################################################################
5
5
6
6
7
- import os
8
7
import pika
9
- import sys
10
- import time
11
8
import pickle
12
- import numpy as np
13
- from time import sleep
14
- from random import randint , choice , choices
15
- from datetime import datetime , timedelta
16
- from scipy .sparse import csr_matrix , vstack
9
+ from scipy .sparse import vstack
17
10
from sklearn .metrics .pairwise import cosine_similarity
18
11
19
- from . _config import *
12
+ import _config
20
13
from .raw_post import RawPost
21
14
from .post_orm import Post
22
15
from .post_orm import create_session , load_pickle_data
30
23
31
24
32
25
def handle_post (new_posts ):
33
- """ Handle post:
34
- Compute post_embedding,
35
- Search nearest post candidate for each post base on post_embedding
36
- Re-compute similarity_score for each candidate by Jaccard metric in compute_doc_similarity()
37
- Save post to database and pickle file
26
+ """Handle post:
27
+ Compute post_embedding,
28
+ Search nearest post candidate for each post base on post_embedding
29
+ Re-compute similarity_score for each candidate by Jaccard metric
30
+ in compute_doc_similarity(). Save post to database and pickle file
38
31
"""
39
32
if len (new_posts ) == 0 :
40
33
return
@@ -48,14 +41,14 @@ def handle_post(new_posts):
48
41
post .embedd_vector = None
49
42
50
43
new_posts = [post for post in new_posts if post .embedd_vector is not None ]
51
- old_posts = load_pickle_data (EMBEDDING_FILE )
44
+ old_posts = load_pickle_data (_config . EMBEDDING_FILE )
52
45
logger .debug (f"OLD POSTS LENGTH: { len (old_posts )} " )
53
46
session .commit ()
54
47
55
48
# compute and search nearest post
56
49
if len (old_posts ) > 0 and len (new_posts ) > 0 :
57
- old_ids = [post ['id' ] for post in old_posts ]
58
- old_vectors = vstack ([post [' vector' ] for post in old_posts ])
50
+ old_ids = [post ["id" ] for post in old_posts ]
51
+ old_vectors = vstack ([post [" vector" ] for post in old_posts ])
59
52
new_vectors = vstack ([post .embedd_vector for post in new_posts ])
60
53
61
54
# sim_matrix[i,j] - similarity score of (new_posts[i], old_posts[j])
@@ -65,39 +58,33 @@ def handle_post(new_posts):
65
58
66
59
for i , post in enumerate (new_posts ):
67
60
score_list = enumerate (list (sim_matrix [i ]))
68
- topK_score = sorted (
69
- score_list , key = lambda x : x [1 ], reverse = True )[: TOP_K ]
70
- similarity_info = [ ]
61
+ topK_score = sorted (score_list ,
62
+ key = lambda x : x [1 ],
63
+ reverse = True )[: _config . TOP_K ]
71
64
72
65
# get similarity score with compute_doc_similarity function
73
66
for index , _ in topK_score :
74
67
sim_id = old_ids [index ]
75
68
sim_post = session .query (Post ).get (sim_id )
76
69
if (sim_post is not None ) and (post .url != sim_post .url ):
77
- score = compute_doc_similarity (post .content , sim_post .content )
70
+ score = compute_doc_similarity (post .content ,
71
+ sim_post .content )
78
72
79
73
# append similarity info to database
80
- if score > SAVE_THRESH :
81
- post .add_similar_info ({
82
- "id" : sim_id ,
83
- "score" : score ,
84
- "url" : sim_post .url
85
- })
86
- sim_post .add_similar_info ({
87
- 'id' : post .id ,
88
- 'score' : score ,
89
- 'url' : post .url
90
- })
74
+ if score > _config .SAVE_THRESH :
75
+ post .add_similar_info (
76
+ {"id" : sim_id , "score" : score , "url" : sim_post .url }
77
+ )
78
+ sim_post .add_similar_info (
79
+ {"id" : post .id , "score" : score , "url" : post .url }
80
+ )
91
81
del sim_matrix
92
82
93
83
# re-save all post embedding to pickle file
94
84
for post in new_posts :
95
- old_posts .append ({
96
- 'id' : post .id ,
97
- 'vector' : post .embedd_vector
98
- })
85
+ old_posts .append ({"id" : post .id , "vector" : post .embedd_vector })
99
86
100
- f = open (EMBEDDING_FILE , ' wb+' )
87
+ f = open (_config . EMBEDDING_FILE , " wb+" )
101
88
pickle .dump (old_posts , f )
102
89
f .close ()
103
90
session .commit ()
@@ -106,11 +93,14 @@ def handle_post(new_posts):
106
93
107
94
"""
108
95
HOW TO USE
109
- This program will check repeatedly if there are new post in RabbitMQ queue. If there are new posts,
110
- it will parse binary message into Post() object, and for each Post instance, call Post.push_to_database()
96
+ This program will check repeatedly if there are new post in RabbitMQ queue.
97
+ If there are new posts, it will parse binary message into Post() object,
98
+ and for each Post instance, call Post.push_to_database()
111
99
to save it in database.
112
100
"""
113
- def read_data_from_source (data_source = 'rabbitmq' , save_raw_data = False ):
101
+
102
+
103
+ def read_data_from_source (data_source = "rabbitmq" , save_raw_data = False ):
114
104
"""
115
105
Start a process that get data from RabbitMQ then push to database
116
106
"""
@@ -120,33 +110,34 @@ def read_data_from_source(data_source='rabbitmq', save_raw_data=False):
120
110
posts = [RawPost (body ).to_orm_post () for body in all_body ]
121
111
return posts
122
112
123
- if data_source == ' csv_dataset' :
124
- posts = [fake_data () for i in range (MAX_POST )]
113
+ if data_source == " csv_dataset" :
114
+ posts = [fake_data () for i in range (_config . MAX_POST )]
125
115
return posts
126
116
127
117
# connect to RabbitMQ
128
118
# login
129
119
130
- credentials = pika .PlainCredentials (USERNAME , PASSWORD )
131
- parameters = pika .ConnectionParameters (HOST , PORT , '/' , credentials )
120
+ credentials = pika .PlainCredentials (_config .USERNAME , _config .PASSWORD )
121
+ parameters = pika .ConnectionParameters (_config .HOST ,
122
+ _config .PORT , "/" ,
123
+ credentials )
132
124
connection = pika .BlockingConnection (parameters )
133
125
134
126
channel = connection .channel ()
135
- queue_state = channel .queue_declare (POST_QUEUE , durable = True , passive = True )
136
- channel .queue_bind (exchange = EXCHANGE , queue = POST_QUEUE )
127
+ queue_state = channel .queue_declare (_config .POST_QUEUE ,
128
+ durable = True , passive = True )
129
+ channel .queue_bind (exchange = _config .EXCHANGE , queue = _config .POST_QUEUE )
137
130
queue_length = queue_state .method .message_count
138
131
logger .debug (f"QUEUE LENGTH: { queue_length } " )
139
132
140
133
# start get message
141
- load_time = 0
142
134
count_post = 0
143
- raw_posts = []
144
135
posts = []
145
136
146
- while ( queue_length >= 1 and count_post < MAX_POST ) :
137
+ while queue_length >= 1 and count_post < _config . MAX_POST :
147
138
queue_length -= 1
148
139
count_post += 1
149
- _ , _ , body = channel .basic_get (POST_QUEUE , auto_ack = True )
140
+ _ , _ , body = channel .basic_get (_config . POST_QUEUE , auto_ack = True )
150
141
if body is not None :
151
142
# parse message into Post
152
143
post = RawPost (body )
0 commit comments