Рейтинг@Mail.ru

Python: Работа с потоками. Часть 2.


Мы уже недавно рассматривали примитивную работу с потоками, а именно — запуск потоков и передача им параметров, использование замков и, по большому счёту, всё. Теперь пришло время изучить другие способы взаимодействия потоков в python’е.

Для лучшего понимания советую прочитать этот пост.

Решим ту же самую задачу, что и в предыдущем посте, но несколько изменив условия. Итак, вот:

1. Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
2. Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть — выводит результат, иначе, передаёт задания для 3го потока (путь) и по его завершении (задания, не потока ;-) ) заново проверяет БД. Выводит результат.
3. 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.

Только на этот раз у нас будет действительно один считающий поток, который не будет завершаться, а будет ждать заданий для вычисления.

Модуль для работы с базой данных не изменился (files.py), поменялся только task.py.

Вот логика работы приложения вкратце:

1. В конструкторе класса объявляем: очередь заданий taskQueue, замок globalTaskEventLock, событие globalTaskEvent — событие, которое будет оповещать потоки о том, что хеширование какой-либо папки закончено, массив localTaskEvent — в нём будут хранится события, которые отвечают за полное выполнение задания какого-то конкретного «ищущего» потока, и counter — счётчик «ищущих» потоков (за всё время работы приложения).

2. Метод runMainTask(s_hash, s_folder) — в параметрах передаётся искомый хеш и директория, в которой следует искать (при поиске в БД не учитывается). Запускает исполнение метода searchFile в отдельном потоке и увеличивает счётчик «ищущих» потоков на 1.

3. searchFile(s_hash, s_folder) - аргументы те же, что и у метода runMainTask. Проверяет наличие в базе данных искомого хеша. Если совпадения найдены — выводит их в консоль и завершает работу, иначе добавляет указанную директорию и имя потока, в котором выполняется (сам метод) в очередь заданий, так же добавляет в массив localTaskEvents пару thName => event, где thName — имя потока, а event — событие, наступление которого означает полное выполнения задания (вся папка прохеширована). Затем ждёт второго пришествия в бесконечном цикле события globalTaskEvents (оповещает о том, что хеши для всех файлов в какой-то папке посчитаны и добавлены в БД). При наступлении события проверяет БД, если совпадения найдены — выводит результат, удаляет своё событие из массива localTaskEvents и завершается, если нет — проверяет было ли вычислено его задание, если да — выводит, что ничего не нашёл, опять удаляет свои данные из localTaskEvent и завершается.

3. runHashTask() — исполняет код в бесконечном цикле. Получает имя потока и папку из очереди заданий. Проверяет в массиве localTaskEvents наличие события для имени потока, которое он получил. Если его нет — то поток уже завершился и удалил своё событие, тогда пытаемся получить следующее задание. Иначе — рекурсивно проходим по всем подпапкам и хешируем все файлы. После того, как в какой-то подпапке файлы прохешированы — результат добавляется в БД и с помощью globalTaskEvent оповещаются об этом все потоки. Когда задание завершено — пытаемся оповестить поток, чьё задание это было. При переходе к каждой новой подпапке проверяем надо ли считать ещё это задание или лучше перейти к следующему.

Вот, в общем то всё, таков мой план ;-).

Листинг нового класса:

#!/usr/local/bin/ python # -*- coding: utf-8 -*- import threading import md5 import sqlalchemy import os, sys import files import Queue class MainTask():     def __init__(self):         self.taskQueue = Queue.Queue() # очередь заданий для вычисляющего потока         self.globalTaskEventLock = threading.Lock()         self.globalTaskEvent = threading.Event() # оповещение о конце вычислений какого-то задания         self.localTaskEvents = {} # оповещение о конце вычислений задания какого-то конкретного потока         self.computingThread = threading.Thread(target=self.runHashTask, name="ComputerThread") # создаём "вычисляющий поток"         self.computingThread.start()         self.counter = 0     def runMainTask(self, s_hash, s_folder):         c = str(self.counter)         th1 = threading.Thread(target=self.searchFile, name="SearherThread #"+c, args=[s_hash,s_folder]) # создаём "ищущий" поток         print "[MainThread]: *** starting SearcherThread #%s ***" % c         th1.start()         self.counter += 1         return True     def searchFile(self, s_hash, s_folder):         thName = threading.currentThread().getName()         q = self.searchInDB(s_hash)         if q.count() > 0:             print "[%s]: Found %d files in database:" % (thName, q.count())             for f in q:                 print "[%s]: %s" % (thName, f.path)             print "[%s]: *** thread finished ***" % thName             return True         print "[%s]: Files in database not found, start searching..." % thName         self.localTaskEvents[thName] = threading.Event()         self.taskQueue.put((thName, s_folder))         while True:             self.globalTaskEvent.wait()             print "[%s]: Checking database..." % thName             q = self.searchInDB(s_hash)             if q.count() > 0:                 print "[%s]: Total found %d files:" % (thName, q.count())                 for f in q:                     print "[%s]: %s" % (thName, f.path)                 self.localTaskEvents.pop(thName) # удаляем события для этого потока                 print "[%s]: *** thread finished ***" % thName                 return True             elif self.localTaskEvents[thName].isSet():                 self.localTaskEvents.pop(thName)                 print "[%s]: Total found 0 files." % thName                 return True             # сделано чтобы после одного события не цикл не проходил несколько раз             self.globalTaskEventLock.acquire()             self.globalTaskEventLock.release()     def searchInDB(self, s_hash):         s = files.Session()         return s.query(files.Hash).filter(files.Hash.file_hash==s_hash)     def runHashTask(self):         computerThreadName = threading.currentThread().getName()         while True:             thName, s_folder = self.taskQueue.get()             # проверяем надо ли ещё считать это задание             if self.localTaskEvents.has_key(thName):                 print "[%s]: Computing %s's thread task..." % (computerThreadName, thName)                 s = files.Session()                 for root, dirs, fnames in os.walk(s_folder):                     # опять проверяем актуальность задания                     if not self.localTaskEvents.has_key(thName):                         break                     counter = 0                     hashes = {}                     for name in fnames:                         q = s.query(files.Hash).filter(files.Hash.path==unicode(os.path.join(root, name), "koi8-r"))                         if q.count() == 0:                             h = ''                             try:                                 f = open(os.path.join(root, name), "rb")                             except:                                 print "[%s]: Error: Can't open file %s." % (computerThreadName, os.path.join(root, name))                             else:                                 h = md5.new(f.read()).hexdigest()                                 hashes[unicode(os.path.join(root, name), "koi8-r")] = h                                 counter += 1                             try:                                 f.close()                             except:                                 pass                     print "[%s]: Hashing %s finished..." % (computerThreadName, root)                     if counter != 0:                         files.add_hashes(hashes)                         self.globalTaskEventLock.acquire()                         self.globalTaskEvent.set()                         self.globalTaskEvent = threading.Event() # создаём новый экземпляр события                         self.globalTaskEventLock.release()                 # try - на случай, если поток уже завершился, удалив задание и событие                 try:                     self.localTaskEvents[thName].set()                 except:                     pass                 # добавлено на случай, если мы ищем в папке,                 # все файлы которой уже есть в БД,                 # иначе globalEvent не наступит ни разу                 self.globalTaskEventLock.acquire()                 self.globalTaskEvent.set()                 self.globalTaskEvent = threading.Event()                 self.globalTaskEventLock.release() if __name__ == "__main__":     mt = MainTask()     # для теста ищем всё в одной папке - 3 файла + 1 несуществующий     hashes = ("5f4dad3fbd8c6761f8ca218bd6d8e467", "500ead228d2ec9af9986a9f10ad13e84", "e1d0bced5b03d00b3d09710705eef3b5", "invalid hash", "2d3153b321a7ce6c2cd56af3231d79a4")     folder = "/usr/home/lizzard/music/music/Roll"     for h in hashes:         mt.runMainTask(h, folder)     # завершаем программу     while threading.activeCount() > 2:         pass     print "[MainThread]: *** all computes finished ***"     sys.exit(0)

Стоит отметить, что при преобразовании строк в юникод с помощью функции unicode стоит указать кодировку, специфичкескую для Вашей ОС (для windows — cp1251, freebsd — koi8-r).

Вот результат выполнения кода:

% python task_1.py
[MainThread]: *** starting SearcherThread #0 ***
[MainThread]: *** starting SearcherThread #1 ***
[SearherThread #0]: Files in database not found, start searching…
[ComputerThread]: Computing SearherThread #0′s thread task…
[SearherThread #1]: Files in database not found, start searching…
[MainThread]: *** starting SearcherThread #2 ***
[SearherThread #2]: Files in database not found, start searching…
[MainThread]: *** starting SearcherThread #3 ***
[SearherThread #3]: Files in database not found, start searching…
[MainThread]: *** starting SearcherThread #4 ***
[SearherThread #4]: Files in database not found, start searching…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished…
[SearherThread #0]: Checking database…
[SearherThread #1]: Checking database…
[SearherThread #2]: Checking database…
[SearherThread #3]: Checking database…
[SearherThread #4]: Checking database…
[SearherThread #1]: Total found 1 files:
[SearherThread #2]: Total found 1 files:
[SearherThread #2]: /usr/home/lizzard/music/music/Roll/in_taberna.mp3
[SearherThread #2]: *** thread finished ***
[SearherThread #1]: /usr/home/lizzard/music/music/Roll/totusfloreo.mp3
[SearherThread #1]: *** thread finished ***
[SearherThread #0]: Total found 1 files:
[SearherThread #0]: /usr/home/lizzard/music/music/Roll/шалом.mp3
[SearherThread #0]: *** thread finished ***
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished…
[SearherThread #4]: Checking database…
[SearherThread #3]: Checking database…
[SearherThread #4]: Checking database…
[SearherThread #3]: Checking database…
[ComputerThread]: Computing SearherThread #3′s thread task…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Hellawes, The_Mill finished…
[SearherThread #3]: Checking database…
[SearherThread #4]: Checking database…

[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Иллет finished…
[SearherThread #4]: Checking database…
[SearherThread #3]: Checking database…
[SearherThread #4]: Total found 1 files:
[SearherThread #4]: /usr/home/lizzard/music/music/Roll/Иллет/ilet_01.mp3
[SearherThread #4]: *** thread finished ***
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Крыс и Шмендра finished…
[SearherThread #3]: Checking database…

[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Ю. Буркин finished…
[SearherThread #3]: Checking database…
[SearherThread #3]: Total found 0 files.
[MainThread]: *** all computes finished ***
%

Конечно, при желании можно улучшить алгоритм, но как пример для изучения потоков в python’е, имхо, сойдёт ;-).

Исходники к статье в архиве — тут.

Ссылки к статье:
http://docs.python.org/library/threading.html — официальная документация python по модулю threading (англ.).

параллельное, Программирование , , ,

Пожалуйста, оцените полезность и качество данной статьи. Одна звезда - плохо, 5 - хорошо.
1 звезда2 звезды3 звезды4 звезды5 звёзд (Еще не оценили)
Loading ... Loading ...

  1. aidana
    15 Апрель 2010 в 13:46 | #1

    здраствуйте! мне нужна программная реализация шифрование файла и папок. можете отправить исходники этой программы на Delphi/ заоанее спасибо.

  2. 17 Апрель 2010 в 12:21 | #2

    @aidana
    И Вам не болеть, но при чём тут многопоточность в питоне? ;) Дождитесь ответа в соответствующем посте, я Ваши комментарии видел, но, увы, помочь ничем не могу, delphi совсем не знаю :).

  1. Пока что нет уведомлений.
*