Главная > параллельное, Программирование > Python: Работа с потоками. Часть 2.

Python: Работа с потоками. Часть 2.

Мы уже недавно рассматривали примитивную работу с потоками, а именно, запуск потоков и передача им параметров, использование замков и, по большому счёту, всё. Теперь пришло время изучить другие способы взаимодействия потоков в python'е.

Для лучшего понимания советую прочитать этот пост.

Решим ту же самую задачу, что и в предыдущем посте, но несколько изменив условия. Итак, вот:

  1. Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
  2. Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть - выводит результат, иначе, передаёт задания для 3го потока (путь) и по его завершении (задания, не потока) заново проверяет БД. Выводит результат.
  3. 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.

Только на этот раз у нас будет действительно один считающий поток, который не будет завершаться, а будет ждать заданий для вычисления.

Модуль для работы с базой данных не изменился (files.py), поменялся только task.py.

Вот логика работы приложения вкратце:

  1. В конструкторе класса объявляем: очередь заданий taskQueue, замок globalTaskEventLock, событие globalTaskEvent - событие, которое будет оповещать потоки о том, что хеширование какой-либо папки закончено, массив localTaskEvent - в нём будут хранится события, которые отвечают за полное выполнение задания какого-то конкретного "ищущего" потока, и counter - счётчик "ищущих" потоков (за всё время работы приложения).
  2. Метод runMainTask(s_hash, s_folder) - в параметрах передаётся искомый хеш и директория, в которой следует искать (при поиске в БД не учитывается). Запускает исполнение метода searchFile в отдельном потоке и увеличивает счётчик "ищущих" потоков на 1.
  3. searchFile(s_hash, s_folder) - аргументы те же, что и у метода runMainTask. Проверяет наличие в базе данных искомого хеша. Если совпадения найдены - выводит их в консоль и завершает работу, иначе добавляет указанную директорию и имя потока, в котором выполняется (сам метод) в очередь заданий, так же добавляет в массив localTaskEvents пару thName => event, где thName - имя потока, а event - событие, наступление которого означает полное выполнения задания (вся папка прохеширована). Затем ждёт в бесконечном цикле события globalTaskEvents (оповещает о том, что хеши для всех файлов в какой-то папке посчитаны и добавлены в БД). При наступлении события проверяет БД, если совпадения найдены - выводит результат, удаляет своё событие из массива localTaskEvents и завершается, если нет - проверяет было ли вычислено его задание, если да - выводит, что ничего не нашёл, опять удаляет свои данные из localTaskEvent и завершается.
  4. runHashTask() - исполняет код в бесконечном цикле. Получает имя потока и папку из очереди заданий. Проверяет в массиве localTaskEvents наличие события для имени потока, которое он получил. Если его нет - то поток уже завершился и удалил своё событие, тогда пытаемся получить следующее задание. Иначе, рекурсивно проходим по всем подпапкам и хешируем все файлы. После того как в какой-то подпапке файлы прохешированы - результат добавляется в БД и с помощью globalTaskEvent оповещаются об этом все потоки. Когда задание завершено - пытаемся оповестить поток, чьё задание это было. При переходе к каждой новой подпапке проверяем надо ли считать ещё это задание или лучше перейти к следующему.

Вот, в общем то, всё, таков мой план.

Листинг нового класса:

#!/usr/local/bin/ python
# -*- coding: utf-8 -*-

import threading
import md5
import sqlalchemy
import os, sys
import files
import Queue

class MainTask():
def __init__(self):
self.taskQueue = Queue.Queue() # очередь заданий для вычисляющего потока
self.globalTaskEventLock = threading.Lock()
self.globalTaskEvent = threading.Event() # оповещение о конце вычислений какого-то задания
self.localTaskEvents = {} # оповещение о конце вычислений задания какого-то конкретного потока
self.computingThread = threading.Thread(target=self.runHashTask, name="ComputerThread") # создаём "вычисляющий поток"
self.computingThread.start()
self.counter = 0

def runMainTask(self, s_hash, s_folder):
c = str(self.counter)
th1 = threading.Thread(target=self.searchFile, name="SearherThread #"+c, args=[s_hash,s_folder]) # создаём "ищущий" поток
print "[MainThread]: *** starting SearcherThread #%s ***" % c
th1.start()
self.counter += 1
return True

def searchFile(self, s_hash, s_folder):
thName = threading.currentThread().getName()
q = self.searchInDB(s_hash)
if q.count() > 0:
print "[%s]: Found %d files in database:" % (thName, q.count())
for f in q:
print "[%s]: %s" % (thName, f.path)
print "[%s]: *** thread finished ***" % thName
return True
print "[%s]: Files in database not found, start searching..." % thName

self.localTaskEvents[thName] = threading.Event()
self.taskQueue.put((thName, s_folder))

while True:
self.globalTaskEvent.wait()
print "[%s]: Checking database..." % thName
q = self.searchInDB(s_hash)
if q.count() > 0:
print "[%s]: Total found %d files:" % (thName, q.count())
for f in q:
print "[%s]: %s" % (thName, f.path)
self.localTaskEvents.pop(thName) # удаляем события для этого потока
print "[%s]: *** thread finished ***" % thName
return True
elif self.localTaskEvents[thName].isSet():
self.localTaskEvents.pop(thName)
print "[%s]: Total found 0 files." % thName
return True
# сделано чтобы после одного события не цикл не проходил несколько раз
self.globalTaskEventLock.acquire()
self.globalTaskEventLock.release()

def searchInDB(self, s_hash):
s = files.Session()
return s.query(files.Hash).filter(files.Hash.file_hash==s_hash)

def runHashTask(self):
computerThreadName = threading.currentThread().getName()
while True:
thName, s_folder = self.taskQueue.get()

# проверяем надо ли ещё считать это задание
if self.localTaskEvents.has_key(thName):
print "[%s]: Computing %s's thread task..." % (computerThreadName, thName)

s = files.Session()
for root, dirs, fnames in os.walk(s_folder):
# опять проверяем актуальность задания
if not self.localTaskEvents.has_key(thName):
break

counter = 0
hashes = {}

for name in fnames:
q = s.query(files.Hash).filter(files.Hash.path==unicode(os.path.join(root, name), "koi8-r"))
if q.count() == 0:
h = ''
try:
f = open(os.path.join(root, name), "rb")
except:
print "[%s]: Error: Can't open file %s." % (computerThreadName, os.path.join(root, name))
else:
h = md5.new(f.read()).hexdigest()
hashes[unicode(os.path.join(root, name), "koi8-r")] = h
counter += 1
try:
f.close()
except:
pass
print "[%s]: Hashing %s finished..." % (computerThreadName, root)
if counter != 0:
files.add_hashes(hashes)
self.globalTaskEventLock.acquire()
self.globalTaskEvent.set()
self.globalTaskEvent = threading.Event() # создаём новый экземпляр события
self.globalTaskEventLock.release()

# try - на случай, если поток уже завершился, удалив задание и событие
try:
self.localTaskEvents[thName].set()
except:
pass
# добавлено на случай, если мы ищем в папке,
# все файлы которой уже есть в БД,
# иначе globalEvent не наступит ни разу
self.globalTaskEventLock.acquire()
self.globalTaskEvent.set()
self.globalTaskEvent = threading.Event()
self.globalTaskEventLock.release()

if __name__ == "__main__":
mt = MainTask()

# для теста ищем всё в одной папке - 3 файла + 1 несуществующий
hashes = ("5f4dad3fbd8c6761f8ca218bd6d8e467", "500ead228d2ec9af9986a9f10ad13e84", "e1d0bced5b03d00b3d09710705eef3b5", "invalid hash", "2d3153b321a7ce6c2cd56af3231d79a4")
folder = "/usr/home/lizzard/music/music/Roll"

for h in hashes:
mt.runMainTask(h, folder)

# завершаем программу
while threading.activeCount() > 2:
pass
print "[MainThread]: *** all computes finished ***"
sys.exit(0)

Стоит отметить, что при преобразовании строк в юникод с помощью функции unicode стоит указать кодировку, специфичкескую для Вашей ОС (для windows - cp1251, freebsd - koi8-r).

Вот результат выполнения кода:

% python task_1.py
[MainThread]: *** starting SearcherThread #0 ***
[MainThread]: *** starting SearcherThread #1 ***
[SearherThread #0]: Files in database not found, start searching...
[ComputerThread]: Computing SearherThread #0's thread task...
[SearherThread #1]: Files in database not found, start searching...
[MainThread]: *** starting SearcherThread #2 ***
[SearherThread #2]: Files in database not found, start searching...
[MainThread]: *** starting SearcherThread #3 ***
[SearherThread #3]: Files in database not found, start searching...
[MainThread]: *** starting SearcherThread #4 ***
[SearherThread #4]: Files in database not found, start searching...
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished...
[SearherThread #0]: Checking database...
[SearherThread #1]: Checking database...
[SearherThread #2]: Checking database...
[SearherThread #3]: Checking database...
[SearherThread #4]: Checking database...
[SearherThread #1]: Total found 1 files:
[SearherThread #2]: Total found 1 files:
[SearherThread #2]: /usr/home/lizzard/music/music/Roll/in_taberna.mp3
[SearherThread #2]: *** thread finished ***
[SearherThread #1]: /usr/home/lizzard/music/music/Roll/totusfloreo.mp3
[SearherThread #1]: *** thread finished ***
[SearherThread #0]: Total found 1 files:
[SearherThread #0]: /usr/home/lizzard/music/music/Roll/шалом.mp3
[SearherThread #0]: *** thread finished ***
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished...
[SearherThread #4]: Checking database...
[SearherThread #3]: Checking database...
[SearherThread #4]: Checking database...
[SearherThread #3]: Checking database...
[ComputerThread]: Computing SearherThread #3's thread task...
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished...
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished...
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Hellawes, The_Mill finished...
[SearherThread #3]: Checking database...
[SearherThread #4]: Checking database...
...
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Иллет finished...
[SearherThread #4]: Checking database...
[SearherThread #3]: Checking database...
[SearherThread #4]: Total found 1 files:
[SearherThread #4]: /usr/home/lizzard/music/music/Roll/Иллет/ilet_01.mp3
[SearherThread #4]: *** thread finished ***
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Крыс и Шмендра finished...
[SearherThread #3]: Checking database...
...
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Ю. Буркин finished...
[SearherThread #3]: Checking database...
[SearherThread #3]: Total found 0 files.
[MainThread]: *** all computes finished ***
%

Конечно, при желании, можно улучшить алгоритм, но, как пример для изучения потоков в python'е, имхо, сойдёт.

Исходники к статье в архиве - тут.

Пожалуйста, оцените полезность и качество данной статьи. Одна звезда - плохо, 5 - хорошо.
1/5. Мы будем признательны, если вы напишете комментарий с причиной низкой оценки.2/5. Мы будем признательны, если вы напишете комментарий с причиной низкой оценки.3/5. Мы будем признательны, если вы напишете комментарий с причиной низкой оценки.4/5.5/5. (Еще не оценили)
Загрузка...
  1. Пока что нет комментариев.
  1. Пока что нет уведомлений.