-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathbinary_clas.py
More file actions
255 lines (203 loc) · 10 KB
/
binary_clas.py
File metadata and controls
255 lines (203 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import warnings
warnings.filterwarnings('ignore')
import tensorflow as tf
#tf.compat.v1.enable_eager_execution() #enable only for freeze_graph function
import numpy as np
from tensorflow import keras
import os, random
import argparse
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image
from tensorflow.python.tools import freeze_graph
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
from tensorflow.python.tools import optimize_for_inference_lib
from progressbar import ProgressBar
from tensorflow.python.platform import gfile
#import tensorflow.contrib.decent_q
#from tensorflow_model_optimization.quantization.keras import vitis_quantize
import matplotlib.pyplot as plt
def preprocess(directory_path='data/mrlEyes_2018_01'):
'''
img_dir = os.listdir(directory_path)
total = len(img_dir)
count = 0
os.system('rm -rf data/Train data/Test')
os.system('mkdir data/Train data/Test')
for dir in img_dir:
dir = os.path.join(directory_path, dir)
if count <= int(total/2)+1:
train_list = os.listdir(dir)
for train_img in train_list:
os.system('cp ' + dir + '/' + train_img + ' data/Train/')
else:
test_list = os.listdir(dir)
for test_img in test_list:
os.system('cp ' + dir + '/' + test_img + ' data/Test/')
count = count + 1
train_img_list = os.listdir('data/Train')
os.system('rm -rf data/Train/Open data/Train/Close')
os.system('mkdir data/Train/Open')
os.system('mkdir data/Train/Close')
for train_img in train_img_list:
try:
if(int(train_img.split('_')[4]) == 0):
os.system('mv data/Train/' + train_img + ' data/Train/Close')
else:
os.system('mv data/Train/' + train_img + ' data/Train/Open')
except IndexError:
break
test_img_list = os.listdir('data/Test')
os.system('rm -rf data/Test/Open data/Test/Close')
os.system('mkdir data/Test/Open')
os.system('mkdir data/Test/Close')
for test_img in test_img_list:
try:
if(int(test_img.split('_')[4]) == 0):
os.system('mv data/Test/' + test_img + ' data/Test/Close')
else:
os.system('mv data/Test/' + test_img + ' data/Test/Open')
except IndexError:
break
'''
train = ImageDataGenerator(rescale=1/255, fill_mode='reflect', shear_range=0.2, width_shift_range=0.2, height_shift_range=0.2)
test = ImageDataGenerator(rescale=1/255)
train_dataset = train.flow_from_directory("data/Train/", target_size=(150,150), batch_size = 32, class_mode = 'binary', color_mode='grayscale')
test_dataset = test.flow_from_directory("data/Test/", target_size=(150,150), batch_size = 32, class_mode = 'binary', color_mode='grayscale')
print(test_dataset.class_indices)
return train_dataset, test_dataset
def classifier_model(train_dataset, test_dataset):
model = keras.Sequential()
# Convolutional layer and maxpool layer 1
model.add(keras.layers.Conv2D(32,(3,3),activation='relu',input_shape=(150,150,1)))
model.add(keras.layers.MaxPool2D(2,2))
# Convolutional layer and maxpool layer 2
model.add(keras.layers.Conv2D(64,(3,3),activation='relu'))
model.add(keras.layers.MaxPool2D(2,2))
# Convolutional layer and maxpool layer 3
model.add(keras.layers.Conv2D(128,(3,3),activation='relu'))
model.add(keras.layers.MaxPool2D(2,2))
# Convolutional layer and maxpool layer 4
model.add(keras.layers.Conv2D(128,(3,3),activation='relu'))
model.add(keras.layers.MaxPool2D(2,2))
#model.add(keras.layers.Dropout(0.4))
# This layer flattens the resulting image array to 1D array
model.add(keras.layers.Flatten())
# Hidden layer with 1024 neurons and Rectified Linear Unit activation function
model.add(keras.layers.Dense(1024,activation='relu'))
# Hidden layer with 512 neurons and Rectified Linear Unit activation function
model.add(keras.layers.Dense(512,activation='relu'))
#model.add(keras.layers.Dropout(0.4))
# Output layer with single neuron which gives 0 for Close or 1 for Open
#Here we use sigmoid activation function which makes our model output to lie between 0 and 1
model.add(keras.layers.Dense(1,activation='sigmoid'))
return model
def train(train_dataset, test_dataset):
model = classifier_model(train_dataset, test_dataset)
model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])
model.fit_generator(train_dataset,
steps_per_epoch = train_dataset.samples//train_dataset.batch_size,
epochs = 10,
validation_data = test_dataset,
validation_steps=test_dataset.samples//test_dataset.batch_size)
print(model.summary())
scores = model.evaluate(test_dataset, batch_size=32)
print('Loss: %.3f' % scores[0])
print('Accuracy: %.3f' % scores[1])
# save weights, model architecture & optimizer to an HDF5 format file
os.system('rm -rf saved_model')
os.mkdir('saved_model')
model.save('saved_model/classification_model.h5')
def freeze_graph(model, input_node):
model = keras.models.load_model(model)
# Convert Keras model to ConcreteFunction
full_model = tf.function(lambda x: model(x))
full_model = full_model.get_concrete_function(
tf.TensorSpec(model.inputs[0].shape, model.inputs[0].dtype, name=input_node))
# Get frozen ConcreteFunction
frozen_func = convert_variables_to_constants_v2(full_model)
frozen_func.graph.as_graph_def()
layers = [op.name for op in frozen_func.graph.get_operations()]
print("Frozen model layers: ")
for layer in layers:
print(layer)
print("Frozen model inputs: ")
print(frozen_func.inputs)
print("Frozen model outputs: ")
print(frozen_func.outputs)
# Save frozen graph from frozen ConcreteFunction to hard drive
tf.io.write_graph(graph_or_graph_def=frozen_func.graph,
logdir="./saved_model",
name="frozen_graph.pb",
as_text=False)
return
def optimize_graph(input_nodes, output_nodes):
inputGraph = tf.GraphDef()
with tf.gfile.Open('frozen_models/frozen_graph.pb', "rb") as f:
data2read = f.read()
inputGraph.ParseFromString(data2read)
outputGraph = optimize_for_inference_lib.optimize_for_inference(
inputGraph,
input_nodes, # an array of the input node(s)
output_nodes, # an array of output nodes
tf.int32.as_datatype_enum)
# Save the optimized graph'test.pb'
f = tf.gfile.FastGFile('frozen_models/OptimizedGraph.pb', "w")
f.write(outputGraph.SerializeToString())
def evaluate_graph(graph, batch_size, test_dataset, input_node, output_node):
input_graph_def = tf.Graph().as_graph_def()
input_graph_def.ParseFromString(tf.io.gfile.GFile(graph, "rb").read())
tf.import_graph_def(input_graph_def,name = '')
# Get input placeholders & tensors
images_in = tf.compat.v1.get_default_graph().get_tensor_by_name(input_node)
labels = tf.compat.v1.placeholder(tf.int32,shape = [None,2])
# get output tensors
logits = tf.compat.v1.get_default_graph().get_tensor_by_name(output_node)
predicted_logit = tf.argmax(input=logits, axis=1, output_type=tf.int32)
ground_truth_label = tf.argmax(labels, 1, output_type=tf.int32)
# Define the metric and update operations
tf_metric, tf_metric_update = tf.compat.v1.metrics.accuracy(labels=ground_truth_label,
predictions=predicted_logit,
name='acc')
with tf.compat.v1.Session() as sess:
progress = ProgressBar()
sess.run(tf.compat.v1.initializers.global_variables())
sess.run(tf.compat.v1.initializers.local_variables())
feed_dict={images_in: test_dataset} #, labels: y_batch}
acc = sess.run(tf_metric_update, feed_dict)
print ('Graph accuracy with validation dataset: {:1.4f}'.format(acc))
def test(model, test_path):
model = keras.models.load_model(model)
#inp = model.input
#output = model.output
#print(inp, output)
filename = random.choice(os.listdir(test_path))
path = test_path + filename
img = image.load_img(path,target_size=(150,150))
plt.imshow(img)
Y = image.img_to_array(img)
X = np.expand_dims(Y,axis=0)
val = model.predict(X)
val = int(val[0][0])
print('value = ', val)
if val == 1:
plt.xlabel("Open",fontsize=30)
elif val == 0:
plt.xlabel("Close",fontsize=30)
plt.show()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--graph', type=str, default='./quantize_results/quantize_eval_model.pb', help='graph file (.pb) to be evaluated')
parser.add_argument('--batch_size', type=int, default=32, help='Evaluation batchsize, must be integer value')
ap.add_argument('-d', '--image_dir', type=str, default='data', help='Path to folder of images.')
ap.add_argument('-m', '--model', type=str, default='saved_model/classification_model.h5', help='Path of the float model.')
ap.add_argument('--input_nodes', type=str, default='', help='List of input nodes of the graph.')
ap.add_argument('--output_nodes', type=str, default='', help='List of output nodes of the graph.')
args = parser.parse_args()
train_dataset, test_dataset = preprocess(args.image_dir)
train(train_dataset, test_dataset)
#freeze_graph(args.model, args.input_nodes)
#optimize_graph(args.input_nodes, args.output_nodes)
#evaluate_graph(args.graph, args.batch_size, test_dataset, args.input_node, args.output_node)
#test(args.model, args.image_dir)
if __name__ == '__main__':
main()