1
+ import time
2
+ import argparse
3
+ import tensorflow as tf
4
+ import os
5
+ import sys
6
+ import math
7
+ import collections
8
+ from tensorflow .python .client import timeline
9
+ import json
10
+
11
+
12
+
13
+ CONTINUOUS_COLUMNS = ["I" + str (i ) for i in range (1 , 14 )] # 1-13 inclusive
14
+ CATEGORICAL_COLUMNS = ["C" + str (i ) for i in range (1 , 27 )] # 1-26 inclusive
15
+ LABEL_COLUMN = ["clicked" ]
16
+ TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
17
+ FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
18
+ HASH_BUCKET_SIZES = {
19
+ 'C1' : 2500 ,
20
+ 'C2' : 2000 ,
21
+ 'C3' : 5000000 ,
22
+ 'C4' : 1500000 ,
23
+ 'C5' : 1000 ,
24
+ 'C6' : 100 ,
25
+ 'C7' : 20000 ,
26
+ 'C8' : 4000 ,
27
+ 'C9' : 20 ,
28
+ 'C10' : 100000 ,
29
+ 'C11' : 10000 ,
30
+ 'C12' : 5000000 ,
31
+ 'C13' : 40000 ,
32
+ 'C14' : 100 ,
33
+ 'C15' : 100 ,
34
+ 'C16' : 3000000 ,
35
+ 'C17' : 50 ,
36
+ 'C18' : 10000 ,
37
+ 'C19' : 4000 ,
38
+ 'C20' : 20 ,
39
+ 'C21' : 4000000 ,
40
+ 'C22' : 100 ,
41
+ 'C23' : 100 ,
42
+ 'C24' : 250000 ,
43
+ 'C25' : 400 ,
44
+ 'C26' : 100000
45
+ }
46
+
47
+
48
+ def add_layer_summary (value , tag ):
49
+ tf .summary .scalar ('%s/fraction_of_zero_values' % tag ,
50
+ tf .nn .zero_fraction (value ))
51
+ tf .summary .histogram ('%s/activation' % tag , value )
52
+
53
+
54
+
55
+
56
+ def build_feature_cols ():
57
+ wide_column = []
58
+ deep_column = []
59
+ fm_column = []
60
+ for column_name in FEATURE_COLUMNS :
61
+ if column_name in CATEGORICAL_COLUMNS :
62
+ categorical_column = tf .feature_column .categorical_column_with_embedding (
63
+ column_name ,
64
+ dtype = tf .string )
65
+
66
+ categorical_embedding_column = tf .feature_column .embedding_column (
67
+ categorical_column , dimension = 16 , combiner = 'mean' )
68
+
69
+ wide_column .append (categorical_embedding_column )
70
+ deep_column .append (categorical_embedding_column )
71
+ fm_column .append (categorical_embedding_column )
72
+ else :
73
+ column = tf .feature_column .numeric_column (column_name , shape = (1 , ))
74
+ wide_column .append (column )
75
+ deep_column .append (column )
76
+
77
+ return wide_column , fm_column , deep_column
78
+
79
+
80
+ class DeepFM ():
81
+ def __init__ (self ,
82
+ wide_column = None ,
83
+ fm_column = None ,
84
+ deep_column = None ,
85
+ dnn_hidden_units = [1024 , 256 , 32 ],
86
+ final_hidden_units = [128 , 64 ],
87
+ optimizer_type = 'adam' ,
88
+ learning_rate = 0.001 ,
89
+ inputs = None ,
90
+ use_bn = True ,
91
+ bf16 = False ,
92
+ input_layer_partitioner = None ,
93
+ dense_layer_partitioner = None ):
94
+ if not inputs :
95
+ raise ValueError ('Dataset is not defined.' )
96
+ self .wide_column = wide_column
97
+ self .deep_column = deep_column
98
+ self .fm_column = fm_column
99
+ if not wide_column or not fm_column or not deep_column :
100
+ raise ValueError (
101
+ 'Wide column, FM column or Deep column is not defined.' )
102
+ self .dnn_hidden_units = dnn_hidden_units
103
+ self .final_hidden_units = final_hidden_units
104
+ self .optimizer_type = optimizer_type
105
+ self .learning_rate = learning_rate
106
+ self .input_layer_partitioner = input_layer_partitioner
107
+ self .dense_layer_partitioner = dense_layer_partitioner
108
+
109
+ self .feature = inputs
110
+ self .bf16 = bf16
111
+
112
+
113
+ self .use_bn = use_bn
114
+
115
+ self .predict = self .prediction ()
116
+
117
+
118
+ def dnn (self , dnn_input , dnn_hidden_units = None , layer_name = '' ):
119
+ for layer_id , num_hidden_units in enumerate (dnn_hidden_units ):
120
+ with tf .variable_scope (layer_name + "_%d" % layer_id ,
121
+ partitioner = self .dense_layer_partitioner ,
122
+ reuse = tf .AUTO_REUSE ) as dnn_layer_scope :
123
+ dnn_input = tf .layers .dense (dnn_input ,
124
+ units = num_hidden_units ,
125
+ activation = tf .nn .relu ,
126
+ name = dnn_layer_scope )
127
+ if self .use_bn :
128
+ dnn_input = tf .layers .batch_normalization (
129
+ dnn_input )
130
+ add_layer_summary (dnn_input , dnn_layer_scope .name )
131
+
132
+ return dnn_input
133
+
134
+ def prediction (self ):
135
+ # input features
136
+ with tf .variable_scope ('input_layer' ,
137
+ partitioner = self .input_layer_partitioner ,
138
+ reuse = tf .AUTO_REUSE ):
139
+
140
+ fm_cols = {}
141
+ wide_input = tf .feature_column .input_layer (
142
+ self .feature , self .wide_column , cols_to_output_tensors = fm_cols )
143
+ fm_input = tf .stack ([fm_cols [cols ] for cols in self .fm_column ], 1 )
144
+ dnn_input = tf .feature_column .input_layer (self .feature ,
145
+ self .deep_column )
146
+
147
+ if self .bf16 :
148
+ wide_input = tf .cast (wide_input , dtype = tf .bfloat16 )
149
+ fm_input = tf .cast (fm_input , dtype = tf .bfloat16 )
150
+ dnn_input = tf .cast (dnn_input , dtype = tf .bfloat16 )
151
+
152
+ # DNN part
153
+ if self .bf16 :
154
+ with tf .variable_scope ('dnn' ).keep_weights ():
155
+ dnn_output = self .dnn (dnn_input , self .dnn_hidden_units ,
156
+ 'dnn_layer' )
157
+ else :
158
+ with tf .variable_scope ('dnn' ):
159
+ dnn_output = self .dnn (dnn_input , self .dnn_hidden_units ,
160
+ 'dnn_layer' )
161
+
162
+ # linear / fisrt order part
163
+ with tf .variable_scope ('linear' , reuse = tf .AUTO_REUSE ) as linear :
164
+ linear_output = tf .reduce_sum (wide_input , axis = 1 , keepdims = True )
165
+
166
+ # FM second order part
167
+ with tf .variable_scope ('fm' , reuse = tf .AUTO_REUSE ) as fm :
168
+ sum_square = tf .square (tf .reduce_sum (fm_input , axis = 1 ))
169
+ square_sum = tf .reduce_sum (tf .square (fm_input ), axis = 1 )
170
+ fm_output = 0.5 * tf .subtract (sum_square , square_sum )
171
+
172
+ # Final dnn layer
173
+ all_input = tf .concat ([dnn_output , linear_output , fm_output ], 1 )
174
+ if self .bf16 :
175
+ with tf .variable_scope ('final_dnn' ).keep_weights ():
176
+ net = self .dnn (all_input , self .final_hidden_units , 'final_dnn' )
177
+ net = tf .cast (net , dtype = tf .float32 )
178
+ else :
179
+ with tf .variable_scope ('final_dnn' ):
180
+ net = self .dnn (all_input , self .final_hidden_units , 'final_dnn' )
181
+
182
+ net = tf .layers .dense (net , units = 1 )
183
+ net = tf .math .sigmoid (net )
184
+ self .output = net
185
+ return net
186
+
187
+
188
+
189
+
190
+ def get_arg_parser ():
191
+ parser = argparse .ArgumentParser ()
192
+ parser .add_argument ('--data_location' ,
193
+ help = 'Full path of train data' ,
194
+ required = False ,
195
+ default = './data' )
196
+ parser .add_argument ('--batch_size' ,
197
+ help = 'Batch size to train. Default is 512' ,
198
+ type = int ,
199
+ default = 512 )
200
+ parser .add_argument ('--checkpoint' ,
201
+ help = 'Full path to checkpoints input/output directory' ,
202
+ required = False )
203
+ parser .add_argument ('--bf16' ,
204
+ help = 'enable DeepRec BF16 in deep model. Default FP32' ,
205
+ action = 'store_true' )
206
+ parser .add_argument ("--optimizer" ,
207
+ type = str ,
208
+ choices = ["adam" , "adagrad" , "adamasync" ],
209
+ default = "adam" )
210
+ parser .add_argument ('--learning_rate' ,
211
+ help = 'Learning rate for model' ,
212
+ type = float ,
213
+ default = 0.001 )
214
+ return parser
215
+
216
+
217
+ def main (tf_config = None , server = None ):
218
+
219
+
220
+ with tf .Session () as sess1 :
221
+ batch_size = args .batch_size
222
+
223
+ # set fixed random seed
224
+ tf .set_random_seed (2021 )
225
+
226
+
227
+
228
+ # create data pipline
229
+ wide_column , fm_column , deep_column = build_feature_cols ()
230
+
231
+ final_input = {}
232
+
233
+
234
+ for i in range (1 ,14 ):
235
+ final_input ["I" + str (i )] = tf .placeholder (tf .float32 ,[None ], name = 'I' + str (i ))
236
+ for j in range (1 ,27 ):
237
+ final_input ["C" + str (j )] = tf .placeholder (tf .string , [None ], name = 'C' + str (j ))
238
+
239
+
240
+ # create model
241
+ model = DeepFM (wide_column = wide_column ,
242
+ fm_column = fm_column ,
243
+ deep_column = deep_column ,
244
+ optimizer_type = args .optimizer ,
245
+ learning_rate = args .learning_rate ,
246
+ bf16 = args .bf16 ,
247
+ inputs = final_input ,
248
+ input_layer_partitioner = None ,
249
+ dense_layer_partitioner = None )
250
+
251
+
252
+ # Initialize saver
253
+ folder_dir = args .checkpoint
254
+
255
+
256
+ saver = tf .train .Saver ()
257
+ sess1 .run (tf .global_variables_initializer ())
258
+ sess1 .run (tf .local_variables_initializer ())
259
+
260
+ # Restore from checkpoint
261
+ saver .restore (sess1 ,tf .train .latest_checkpoint (folder_dir ))
262
+ # Get save directory
263
+ dir = "./savedmodels"
264
+ os .makedirs (dir ,exist_ok = True )
265
+ cc_time = int (time .time ())
266
+ saved_path = os .path .join (dir ,str (cc_time ))
267
+ os .mkdir (saved_path )
268
+
269
+
270
+
271
+ tf .saved_model .simple_save (
272
+ sess1 ,
273
+ saved_path ,
274
+ inputs = model .feature ,
275
+ outputs = {"Sigmoid" :model .output }
276
+ )
277
+
278
+
279
+ if __name__ == "__main__" :
280
+ parser = get_arg_parser ()
281
+ args = parser .parse_args ()
282
+
283
+
284
+ main ()
285
+
0 commit comments