التعرف على الصور لحظيا

57

أدمغتنا تجعل من الرؤية مهمة سهلة. لا يتطلب الأمر أي جهد من البشر للتعرف على قطة وكلب أو حتى وجه إنسان. ولكن في الواقع هذه مشكلة بالنسبة لجهاز كمبيوتر. في السنوات القليلة الماضية، توصلت Deep Neural Networks (الشبكات العصبية العميقة) إلى نتائج مذهلة حول مشاكل التعرف على الأنماط، أحد المكونات الأساسية هو Convolutional Neural Networks (الشبكات العصبية التحويلية).

تدريب شبكة عصبية عميقة قادرة على تصنيف عدد كبير من الانماط المختلفة لمدة يومين كاملين غير عملي تمامًا، لذلك استخدمنا الإصدار v3 المدرب مسبقًا:

تدرب Inception-v3 عن طريق تطبيق ImageNet لتحدي التعٌرف البصري باستخدام البيانات من عام 2012. هذه مهمة اساسية في مجال computer vision (رؤية الكمبيوتر)، حيث تحاول النماذج تصنيف صور بأكملها إلى 1000 فئة مختلفة، مثل “الحمار الوحشي” و “سفينة الفضاء” و ” غسالة أطباق”. لم تكن جميعها أشياء شائعة تجدها في البيئة الحقيقية، وعادة ما يتم إعادة تدريب الطبقات الأخيرة، حيث ان الطبقات الأولى تحصل على ميزات شائعة جدًا مثل الحواف والخطوط فقط.

 

ماذا تحتاج؟

ملحوظة: إذا كان لديك GPU يتمتع ب CUDA Compute Capability 3.0 أو أعلى، فجرّب Tensorflow مع دعم GPU، أسرع بكثير، لكنك ستحتاج إلى تثبيت مجموعة أدوات CuDNN وCUDA. إذا لم يكن كذلك، اذهب إلى tensorflow بدون دعم GPU، بالتأكيد أسهل في التثبيت.

 

النموذج

في كل مرة نواجه فيها نظامًا معقدًا، يجب علينا تقسيمه إلى نماذج فرعية أبسط. فمثلاً:

  • استيراد الصورة من كاميرا الويب
  • تصنيف الصور
  • تحويل النص الى الكلام

سنقوم ببرمجة واختبار كل مكون من المكونات بشكل منفصل.

 

استيراد الصورة من كاميرا الويب

سوف نستخدم Opencv ببساطة:


import cv2

video_capture = cv2.VideoCapture(0)
while True:
    # Capture frame-by-frame
    ret, frame = video_capture.read()
    cv2.imshow('Video', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# When everything is done, cleanup
video_capture.release()
cv2.destroyAllWindows()

نحتاج فقط لاستدعاء ()cv2.VideoCapture، وقراءة الإطارات الواردة. ()read في السطر 6 هي عملية اعتراضية، لذا يتم إيقاف عمل النواه الرئيسية للبرنامج تمامًا حتى تتم قراءة الإطار من الكاميرا بالكامل.

هذه مشكلة، لأنه لا بد أن يستجيب نظامنا في الحظياً. يمكن أن نحسن FPS (عدد الإطارات في الثانية) كحل لهذه المشكلة ببساطة عن طريق إنشاء نواه جديدة لا تفعل شيئًا سوى سحب الإطارات جديدة من الكاميرا بينما تعالج النواه الرئيسية الإطارات الحالية:

# Threaded class for performance improvement
class VideoStream:
	
	def __init__(self, src=0):
		self.stream = cv2.VideoCapture(src)
		(self.grabbed, self.frame) = self.stream.read()
		self.stopped = False
		
	def start(self):
		Thread(target=self.update, args=()).start()
		return self

	def update(self):
                while True:
			if self.stopped:
				return
			(self.grabbed, self.frame) = self.stream.read()
	
	def read(self):
                # Return the latest frame
		return self.frame
	
	def stop(self):
		self.stopped = True

نعرّف وظيفة constructor (المُنشئ) من فئة VideoStream في السطر الرابع، وبتمرير المتغير scr لهذه الوظيفة “وذلك شيء اختياري”. إذا كان src عبارة عن عدد صحيح، فيفترض أن تكون كاميرا الويب / كاميرا USB تعمل جيداً على نظامك. للوصول إلى أحدث إطار تم التقاطه من الكاميرا، سنستخدم وظيفة ()read في السطور 19-21. وذلك يؤدي الى تحسن كبير في لأداء.

 

تصنيف الصور

بالنسبة لهذا الجزء، سنستعمل بعض ادوات Tensorflow التوضيحية لتصنيف الصور.

أولاً، يتعين علينا تنزيل النموذج الخاص بنا وإنشاء فئة NodeLookup للحصول في النهاية على كلمات مفهومة لوصف الصور.

class NodeLookup(object):
  def __init__(self,
               label_lookup_path=None,
               uid_lookup_path=None):
    if not label_lookup_path:
      label_lookup_path = os.path.join(
          model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt')
    if not uid_lookup_path:
      uid_lookup_path = os.path.join(
          model_dir, 'imagenet_synset_to_human_label_map.txt')
    self.node_lookup = self.load(label_lookup_path, uid_lookup_path)

  def load(self, label_lookup_path, uid_lookup_path):

    if not tf.gfile.Exists(uid_lookup_path):
      tf.logging.fatal('File does not exist %s', uid_lookup_path)
    if not tf.gfile.Exists(label_lookup_path):
      tf.logging.fatal('File does not exist %s', label_lookup_path)

    # Loads mapping from string UID to human-readable string
    proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
    uid_to_human = {}
    p = re.compile(r'[n\d]*[ \S,]*')
    for line in proto_as_ascii_lines:
      parsed_items = p.findall(line)
      uid = parsed_items[0]
      human_string = parsed_items[2]
      uid_to_human[uid] = human_string

    # Loads mapping from string UID to integer node ID.
    node_id_to_uid = {}
    proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
    for line in proto_as_ascii:
      if line.startswith('  target_class:'):
        target_class = int(line.split(': ')[1])
      if line.startswith('  target_class_string:'):
        target_class_string = line.split(': ')[1]
        node_id_to_uid[target_class] = target_class_string[1:-2]

    # Loads the final mapping of integer node ID to human-readable string
    node_id_to_name = {}
    for key, val in node_id_to_uid.items():
      if val not in uid_to_human:
        tf.logging.fatal('Failed to locate: %s', val)
      name = uid_to_human[val]
      node_id_to_name[key] = name

    return node_id_to_name

  def id_to_string(self, node_id):
    if node_id not in self.node_lookup:
      return ''
    return self.node_lookup[node_id]


def create_graph():

  # Creates graph from saved graph_def.pb.
  with tf.gfile.FastGFile(os.path.join(
      model_dir, 'classify_image_graph_def.pb'), 'rb') as f:
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(f.read())
    _ = tf.import_graph_def(graph_def, name='')


def maybe_download_and_extract():
  # Download and extract model tar file
  dest_directory = model_dir
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (
          filename, float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
  tarfile.open(filepath, 'r:gz').extractall(dest_directory)

كما ذكرت أعلاه، ما بين السطور 1-53 نقوم بأنشاء فئة NodeLookup، التي ستقوم بمعالجة التسميات وتعطي كلمات مفهومة لوصف الصور لكل نتيجة في تصنيف. تقوم الدالة ()create_graph بتحميل الرسم البياني الخاص بtensorflow الذي سنستخدمه من الذاكرة.

وتقوم الدالة ()maybe_download_and_extract بالتحقق مما إذا كان النموذج موجودًا في النظام الخاص بك ام لا.

عنوان URL للنموذج هو:

http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz

والمتغير “model_dir” هو المكان الذي تريد حفظ النموذج فيه (مثل ‘/tmp/imagenet’).

 

تحويل النص الى كلام

في البداية، فكرنا في بناء النموذج، باستخدام شبكة عصبية مثل WaveNet. ستدرك أن هذا غير ممكن لكمبيوتر مكتبي التطبيق والاستجابة الفورية. لذلك  باستخدام تطبيقات Google TTS (تحويل النص إلى كلام) Gtts، وحفظ كل نتيجة منه في جدول البحث، حتى نتمكن من الحصول على أداء أفضل واستجابة لحظية. ولإنتاج الملف الصوتي، استخدمنا pygame.

from gtts import gTTS
import pygame

tts = gTTS(text='Hello', lang='en')
tts.save("hello.mp3")
pygame.mixer.init()
pygame.mixer.music.load("hello.mp3")
pygame.mixer.music.play()
while pygame.mixer.music.get_busy() == True:
    continue

 

وضع كل شيء معا

# Download and create graph
maybe_download_and_extract()
create_graph()

# Variables declarations
frame_count=0
score=0
start = time.time()
pygame.mixer.init()
pred=0
last=0
human_str=None
font=cv2.FONT_HERSHEY_TRIPLEX
font_color=(255,255,255)

# Init video stream
vs = VideoStream(src=0).start()

# Start tensroflow session
with tf.Session() as sess:
        softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')

        while True:
                frame = vs.read()
                frame_count+=1

                # Only run every 5 frames
                if frame_count%5==0:

                        # Save the image as the fist layer of inception is a DecodeJpeg
                        cv2.imwrite("current_frame.jpg",frame)

                        image_data = tf.gfile.FastGFile("./current_frame.jpg", 'rb').read()
                        predictions=sess.run(softmax_tensor,{'DecodeJpeg/contents:0':image_data})

                        predictions = np.squeeze(predictions)
                        node_lookup = NodeLookup()

                        # change n_pred for more predictions
                        n_pred=1
                        top_k = predictions.argsort()[-n_pred:][::-1]
                        for node_id in top_k:
                                human_str_n = node_lookup.id_to_string(node_id)
                                score = predictions[node_id]
                        if score>.5:
                                # Some manual corrections
                                if human_str_n=="stethoscope":human_str_n="Headphones"
                                if human_str_n=="spatula":human_str_n="fork"
                                if human_str_n=="iPod":human_str_n="iPhone"
                                human_str=human_str_n

                                lst=human_str.split()
                                human_str=" ".join(lst[0:2])
                                human_str_filename=str(lst[0])

                        current= time.time()
                        fps=frame_count/(current-start)

                # Speech module        
                if last>40 and not pygame.mixer.music.get_busy() and human_str==human_str_n:
                        pred+=1
                        name=human_str_filename+".mp3"

                        # Only get from google if we dont have it
                        if not os.path.isfile(name):
                                tts = gTTS(text="I see a "+human_str, lang='en')
                                tts.save(name)

                        last=0
                        pygame.mixer.music.load(name)
                        pygame.mixer.music.play()

                # Show info during some time              
                if last<40 and frame_count>10:
                        # Change the text position depending on your camera resolution
                        cv2.putText(frame,human_str, (20,400),font, 1, font_color)
                        cv2.putText(frame,str(np.round(score,2))+"%",(20,440),font,1,font_color)

                if frame_count>20:
                        fps_text="fps: "+str(np.round(fps,2))
                        cv2.putText(frame, fps_text, (460,460), font, 1, font_color)

                cv2.imshow("Frame", frame)
                last+=1


                # if the 'q' key is pressed, stop the loop
                if cv2.waitKey(1) & 0xFF == ord("q"):break

# cleanup everything
vs.stop()
cv2.destroyAllWindows()     
sess.close()
print("Done")

في السطرين 2 و3 هي مجرد استدعاء للوظائف التي اعددناها في السابق.

من السطر 6 إلى 14، نقوم بتعريف بعض المتغيرات، كالنتيجة واللون والخط للنص على الشاشة. السطر 17 يبدأ باستيراد الإطارات من الكاميرا.

في السطر 20، يبدا Tensorflow بالعمل ثم يتم تحميل النموذج.

نقوم بحساب عدد الإطارات فقط للحصول على قيمة FPS (ليس لاننا نقوم بمعالجة الصورة عند تجميع 5 إطارات).

في السطر 45، قمنا بتعيين الحد الأدنى للنجاح 50٪، وقمنا بتغيير بعض الاسماء (مثل iPhone لأجهزة iPad، نوع من الغش، أعرف ذلك).

من السطر 60 إلى 71 يتم استخدام وحدة تحويل النص إلى الكلام. نحفظ الناتج كملف صوتي fist-word-of-category.mp3. إذا كانت موجودة في المجلد، فسنقوم بتشغيلها فقط. إذا لم يكن، فسنستخدم Google API للحصول عليه وحفظه.

ثم نعرض بعض المعلومات في الشاشة مثل الناتج النهائي وFPS. إذا قمت بالضغط على “q”، فسيتم إغلاق كل شيء ويوقف البرنامج.

 

النتائج

النتائج مذهلة. على GPU GeForce 960M من Nvidia مع GB12 من ذاكرة الوصول العشوائي حصلنا على 16 إطارًا في الثانية وتصنيفات شديدة الدقة.

يجدر بك أيضا سماع الملف الصوتي.

يمكنك تحميل النسخة النهائية من الكود هنا.

 

المصدر(Real-time image recognition and speech)

تعليقات