Блог Синявского
  • Разделы
  • Метки
  • Все статьи

Многопоточное программирование в Python

1

Напишем многопоточную программу на языке Python. Для того, чтобы избежать проблем связанных с GIL в Python есть модуль multiprocessing. Передача данных между процессами в котором осуществляется через очереди. Архитектура приложения с очередями имеет множество преимуществ одно из которых например горизонтальное масштабирование. Программа проверяет размер медиа файла на диске в папке. Данная программа использвуется для примера того, как использовать очередь и многопоточное выполнение.

# coding=utf8
import copy
import os
import re
import datetime
import time
import subprocess
import Queue
import multiprocessing

NUM_PROCESS = 16

REPORT = '/tmp/report_video_files.csv'
ROOT_DIR='/data/'

CAMERA_FILE_PATTERN = re.compile('^video-.*\.avi$')

class TestProcess(multiprocessing.Process):
    def __init__(self, dirs_queue, result_queue, camera_file_pattern, process_id):
        super(TestProcess, self).__init__()
        self.dirs = dirs_queue
        self.result = result_queue
        self.process_id = process_id
        self.camera_file_pattern = camera_file_pattern
        print 'pid test process %s create' % self.process_id

    def test_file(self, file_name):
        cmd = 'ffprobe -i %s -show_entries format=duration -v quiet -of csv="p=0"' % file_name
        try:
            output = subprocess.check_output(cmd, executable='/bin/bash', stderr=subprocess.STDOUT, shell=True)
        except subprocess.CalledProcessError, e:
            return 0.0
        else:
            try:
                result = float(output)
            except ValueError:
                result = 0.0
            return result

    def run(self):
        while True:
            try:
                dirs = self.dirs.get(True, 15)
            except Queue.Empty:
                break
            else:
                dir_name = dirs[0]
                start_time = time.time()
                dir_files = []
                for camera_file in os.listdir(dir_name):
                    if self.camera_file_pattern.match(camera_file):
                        dir_files.append(camera_file)

                if len(dir_files):
                    dir_files.sort()
                    for camera_file in dir_files:
                        duration = self.test_file(file_name)
                        self.result.put([file_name, duration, time.time()-start_time, self.process_id])
                else:
                    print '%s empty' % dir_name
                self.dirs.task_done()
        return


if __name__ == '__main__':
    dirs_queue = multiprocessing.JoinableQueue()
    result_queue = multiprocessing.JoinableQueue()

    for video_dir in os.listdir(ROOT_DIR):
        dirs_queue.put([video_dir])

    dirs_count = dirs_queue.qsize()

    processes = []
    for i in xrange(NUM_PROCESS):
        test_process = TestProcess(dirs_queue, result_queue, CAMERA_FILE_PATTERN, i)
        test_process.start()
        processes.append({'process': test_process, 'start_time': time.time(), 'pid': i})

    scanned = 0
    with open(REPORT, 'w') as report_file:
        while True:
            if not result_queue.empty():
                try:
                    video_result = result_queue.get(True, 15)
                except Queue.Empty:
                    pass
                else:
                    report_file.write((u'%s;%s;%s;\n' % (video_result[0], video_result[1], video_result[2])).encode('utf8'))
                    result_queue.task_done()

                    # поставим процессу текущее время, он не завис
                    for tp in processes:
                        if tp['pid'] == camera_result[3]:
                            tp['start_time'] = time.time()
                            break

            # перезапустим зависшие процессы
            all_not_alive = True
            for tp in processes:
                if tp['process'].is_alive():
                    all_not_alive = False
                    work_time = time.time()-tp['start_time']
                    if work_time > 1200:
                        tp['process'].terminate()
                        tp['process'].join()
                        if not (dirs_queue.empty() and result_queue.empty()):
                            tp['process'] = TestProcess(dirs_queue, result_queue, CAMERA_FILE_PATTERN, tp['pid'])
                            tp['start_time'] = time.time()
                            tp['process'].start()
                            print 'pid process %s restarted, work_time %d' % (tp['pid'], int(work_time))
                        else:
                            print 'pid process %s terminated, work_time %d' % (tp['pid'], int(work_time))

            # посчитаем процент выполнения
            if dirs_queue.empty() and result_queue.empty() and all_not_alive:
                print u'all queyes empty, processes are not alive'
                break
            else:
                current_scanned = dirs_count - dirs_queue.qsize()
                if scanned != current_scanned and current_scanned != dirs_count:
                    print u'scanned dirs %s of %s (%d %%)' % (current_scanned, dirs_count, int((current_scanned*100.0)/dirs_count))
                    scanned = current_scanned

            if dirs_queue.empty() and result_queue.empty():
                time.sleep(1)


  • ← сюда
  • туда →

comments powered by Disqus

Опубликовано

05.10.2016

Обновление

05.05.2022

Категории

python

Тэги

  • example 16
  • multiprocessing 1
  • python 30

Всегда на связи

  • Блог Синявского - Ничего не переносить на завтра, это тоже проблема с прокастинацией?
  • © Алексей Синявский, по лицензии CC BY-SA если не указано иное.
  • С использованием Pelican. Тема: Elegant от Talha Mansoor