Напишем многопоточную программу на языке Python. Для того, чтобы избежать проблем связанных с GIL в Python есть модуль multiprocessing. Передача данных между процессами в котором осуществляется через очереди. Архитектура приложения с очередями имеет множество преимуществ одно из которых например горизонтальное масштабирование. Программа проверяет размер медиа файла на диске в папке. Данная программа использвуется для примера того, как использовать очередь и многопоточное выполнение.
# coding=utf8
import copy
import os
import re
import datetime
import time
import subprocess
import Queue
import multiprocessing
NUM_PROCESS = 16
REPORT = '/tmp/report_video_files.csv'
ROOT_DIR='/data/'
CAMERA_FILE_PATTERN = re.compile('^video-.*\.avi$')
class TestProcess(multiprocessing.Process):
def __init__(self, dirs_queue, result_queue, camera_file_pattern, process_id):
super(TestProcess, self).__init__()
self.dirs = dirs_queue
self.result = result_queue
self.process_id = process_id
self.camera_file_pattern = camera_file_pattern
print 'pid test process %s create' % self.process_id
def test_file(self, file_name):
cmd = 'ffprobe -i %s -show_entries format=duration -v quiet -of csv="p=0"' % file_name
try:
output = subprocess.check_output(cmd, executable='/bin/bash', stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError, e:
return 0.0
else:
try:
result = float(output)
except ValueError:
result = 0.0
return result
def run(self):
while True:
try:
dirs = self.dirs.get(True, 15)
except Queue.Empty:
break
else:
dir_name = dirs[0]
start_time = time.time()
dir_files = []
for camera_file in os.listdir(dir_name):
if self.camera_file_pattern.match(camera_file):
dir_files.append(camera_file)
if len(dir_files):
dir_files.sort()
for camera_file in dir_files:
duration = self.test_file(file_name)
self.result.put([file_name, duration, time.time()-start_time, self.process_id])
else:
print '%s empty' % dir_name
self.dirs.task_done()
return
if __name__ == '__main__':
dirs_queue = multiprocessing.JoinableQueue()
result_queue = multiprocessing.JoinableQueue()
for video_dir in os.listdir(ROOT_DIR):
dirs_queue.put([video_dir])
dirs_count = dirs_queue.qsize()
processes = []
for i in xrange(NUM_PROCESS):
test_process = TestProcess(dirs_queue, result_queue, CAMERA_FILE_PATTERN, i)
test_process.start()
processes.append({'process': test_process, 'start_time': time.time(), 'pid': i})
scanned = 0
with open(REPORT, 'w') as report_file:
while True:
if not result_queue.empty():
try:
video_result = result_queue.get(True, 15)
except Queue.Empty:
pass
else:
report_file.write((u'%s;%s;%s;\n' % (video_result[0], video_result[1], video_result[2])).encode('utf8'))
result_queue.task_done()
# поставим процессу текущее время, он не завис
for tp in processes:
if tp['pid'] == camera_result[3]:
tp['start_time'] = time.time()
break
# перезапустим зависшие процессы
all_not_alive = True
for tp in processes:
if tp['process'].is_alive():
all_not_alive = False
work_time = time.time()-tp['start_time']
if work_time > 1200:
tp['process'].terminate()
tp['process'].join()
if not (dirs_queue.empty() and result_queue.empty()):
tp['process'] = TestProcess(dirs_queue, result_queue, CAMERA_FILE_PATTERN, tp['pid'])
tp['start_time'] = time.time()
tp['process'].start()
print 'pid process %s restarted, work_time %d' % (tp['pid'], int(work_time))
else:
print 'pid process %s terminated, work_time %d' % (tp['pid'], int(work_time))
# посчитаем процент выполнения
if dirs_queue.empty() and result_queue.empty() and all_not_alive:
print u'all queyes empty, processes are not alive'
break
else:
current_scanned = dirs_count - dirs_queue.qsize()
if scanned != current_scanned and current_scanned != dirs_count:
print u'scanned dirs %s of %s (%d %%)' % (current_scanned, dirs_count, int((current_scanned*100.0)/dirs_count))
scanned = current_scanned
if dirs_queue.empty() and result_queue.empty():
time.sleep(1)
comments powered by Disqus