Python: Работа с потоками. Часть 2.
Мы уже недавно рассматривали примитивную работу с потоками, а именно — запуск потоков и передача им параметров, использование замков и, по большому счёту, всё. Теперь пришло время изучить другие способы взаимодействия потоков в python’е.
Для лучшего понимания советую прочитать этот пост.
Решим ту же самую задачу, что и в предыдущем посте, но несколько изменив условия. Итак, вот:
1. Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
2. Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть — выводит результат, иначе, передаёт задания для 3го потока (путь) и по его завершении (задания, не потока ;-) ) заново проверяет БД. Выводит результат.
3. 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.
Только на этот раз у нас будет действительно один считающий поток, который не будет завершаться, а будет ждать заданий для вычисления.
Модуль для работы с базой данных не изменился (files.py), поменялся только task.py.
Вот логика работы приложения вкратце:
1. В конструкторе класса объявляем: очередь заданий taskQueue, замок globalTaskEventLock, событие globalTaskEvent — событие, которое будет оповещать потоки о том, что хеширование какой-либо папки закончено, массив localTaskEvent — в нём будут хранится события, которые отвечают за полное выполнение задания какого-то конкретного «ищущего» потока, и counter — счётчик «ищущих» потоков (за всё время работы приложения).
2. Метод runMainTask(s_hash, s_folder) — в параметрах передаётся искомый хеш и директория, в которой следует искать (при поиске в БД не учитывается). Запускает исполнение метода searchFile в отдельном потоке и увеличивает счётчик «ищущих» потоков на 1.
3. searchFile(s_hash, s_folder) - аргументы те же, что и у метода runMainTask. Проверяет наличие в базе данных искомого хеша. Если совпадения найдены — выводит их в консоль и завершает работу, иначе добавляет указанную директорию и имя потока, в котором выполняется (сам метод) в очередь заданий, так же добавляет в массив localTaskEvents пару thName => event, где thName — имя потока, а event — событие, наступление которого означает полное выполнения задания (вся папка прохеширована). Затем ждёт второго пришествия в бесконечном цикле события globalTaskEvents (оповещает о том, что хеши для всех файлов в какой-то папке посчитаны и добавлены в БД). При наступлении события проверяет БД, если совпадения найдены — выводит результат, удаляет своё событие из массива localTaskEvents и завершается, если нет — проверяет было ли вычислено его задание, если да — выводит, что ничего не нашёл, опять удаляет свои данные из localTaskEvent и завершается.
3. runHashTask() — исполняет код в бесконечном цикле. Получает имя потока и папку из очереди заданий. Проверяет в массиве localTaskEvents наличие события для имени потока, которое он получил. Если его нет — то поток уже завершился и удалил своё событие, тогда пытаемся получить следующее задание. Иначе — рекурсивно проходим по всем подпапкам и хешируем все файлы. После того, как в какой-то подпапке файлы прохешированы — результат добавляется в БД и с помощью globalTaskEvent оповещаются об этом все потоки. Когда задание завершено — пытаемся оповестить поток, чьё задание это было. При переходе к каждой новой подпапке проверяем надо ли считать ещё это задание или лучше перейти к следующему.
Вот, в общем то всё, таков мой план ;-).
Листинг нового класса:
#!/usr/local/bin/ python # -*- coding: utf-8 -*- import threading import md5 import sqlalchemy import os, sys import files import Queue class MainTask(): def __init__(self): self.taskQueue = Queue.Queue() # очередь заданий для вычисляющего потока self.globalTaskEventLock = threading.Lock() self.globalTaskEvent = threading.Event() # оповещение о конце вычислений какого-то задания self.localTaskEvents = {} # оповещение о конце вычислений задания какого-то конкретного потока self.computingThread = threading.Thread(target=self.runHashTask, name="ComputerThread") # создаём "вычисляющий поток" self.computingThread.start() self.counter = 0 def runMainTask(self, s_hash, s_folder): c = str(self.counter) th1 = threading.Thread(target=self.searchFile, name="SearherThread #"+c, args=[s_hash,s_folder]) # создаём "ищущий" поток print "[MainThread]: *** starting SearcherThread #%s ***" % c th1.start() self.counter += 1 return True def searchFile(self, s_hash, s_folder): thName = threading.currentThread().getName() q = self.searchInDB(s_hash) if q.count() > 0: print "[%s]: Found %d files in database:" % (thName, q.count()) for f in q: print "[%s]: %s" % (thName, f.path) print "[%s]: *** thread finished ***" % thName return True print "[%s]: Files in database not found, start searching..." % thName self.localTaskEvents[thName] = threading.Event() self.taskQueue.put((thName, s_folder)) while True: self.globalTaskEvent.wait() print "[%s]: Checking database..." % thName q = self.searchInDB(s_hash) if q.count() > 0: print "[%s]: Total found %d files:" % (thName, q.count()) for f in q: print "[%s]: %s" % (thName, f.path) self.localTaskEvents.pop(thName) # удаляем события для этого потока print "[%s]: *** thread finished ***" % thName return True elif self.localTaskEvents[thName].isSet(): self.localTaskEvents.pop(thName) print "[%s]: Total found 0 files." % thName return True # сделано чтобы после одного события не цикл не проходил несколько раз self.globalTaskEventLock.acquire() self.globalTaskEventLock.release() def searchInDB(self, s_hash): s = files.Session() return s.query(files.Hash).filter(files.Hash.file_hash==s_hash) def runHashTask(self): computerThreadName = threading.currentThread().getName() while True: thName, s_folder = self.taskQueue.get() # проверяем надо ли ещё считать это задание if self.localTaskEvents.has_key(thName): print "[%s]: Computing %s's thread task..." % (computerThreadName, thName) s = files.Session() for root, dirs, fnames in os.walk(s_folder): # опять проверяем актуальность задания if not self.localTaskEvents.has_key(thName): break counter = 0 hashes = {} for name in fnames: q = s.query(files.Hash).filter(files.Hash.path==unicode(os.path.join(root, name), "koi8-r")) if q.count() == 0: h = '' try: f = open(os.path.join(root, name), "rb") except: print "[%s]: Error: Can't open file %s." % (computerThreadName, os.path.join(root, name)) else: h = md5.new(f.read()).hexdigest() hashes[unicode(os.path.join(root, name), "koi8-r")] = h counter += 1 try: f.close() except: pass print "[%s]: Hashing %s finished..." % (computerThreadName, root) if counter != 0: files.add_hashes(hashes) self.globalTaskEventLock.acquire() self.globalTaskEvent.set() self.globalTaskEvent = threading.Event() # создаём новый экземпляр события self.globalTaskEventLock.release() # try - на случай, если поток уже завершился, удалив задание и событие try: self.localTaskEvents[thName].set() except: pass # добавлено на случай, если мы ищем в папке, # все файлы которой уже есть в БД, # иначе globalEvent не наступит ни разу self.globalTaskEventLock.acquire() self.globalTaskEvent.set() self.globalTaskEvent = threading.Event() self.globalTaskEventLock.release() if __name__ == "__main__": mt = MainTask() # для теста ищем всё в одной папке - 3 файла + 1 несуществующий hashes = ("5f4dad3fbd8c6761f8ca218bd6d8e467", "500ead228d2ec9af9986a9f10ad13e84", "e1d0bced5b03d00b3d09710705eef3b5", "invalid hash", "2d3153b321a7ce6c2cd56af3231d79a4") folder = "/usr/home/lizzard/music/music/Roll" for h in hashes: mt.runMainTask(h, folder) # завершаем программу while threading.activeCount() > 2: pass print "[MainThread]: *** all computes finished ***" sys.exit(0)
Стоит отметить, что при преобразовании строк в юникод с помощью функции unicode стоит указать кодировку, специфичкескую для Вашей ОС (для windows — cp1251, freebsd — koi8-r).
Вот результат выполнения кода:
[MainThread]: *** starting SearcherThread #0 ***
[MainThread]: *** starting SearcherThread #1 ***
[SearherThread #0]: Files in database not found, start searching…
[ComputerThread]: Computing SearherThread #0′s thread task…
[SearherThread #1]: Files in database not found, start searching…
[MainThread]: *** starting SearcherThread #2 ***
[SearherThread #2]: Files in database not found, start searching…
[MainThread]: *** starting SearcherThread #3 ***
[SearherThread #3]: Files in database not found, start searching…
[MainThread]: *** starting SearcherThread #4 ***
[SearherThread #4]: Files in database not found, start searching…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished…
[SearherThread #0]: Checking database…
[SearherThread #1]: Checking database…
[SearherThread #2]: Checking database…
[SearherThread #3]: Checking database…
[SearherThread #4]: Checking database…
[SearherThread #1]: Total found 1 files:
[SearherThread #2]: Total found 1 files:
[SearherThread #2]: /usr/home/lizzard/music/music/Roll/in_taberna.mp3
[SearherThread #2]: *** thread finished ***
[SearherThread #1]: /usr/home/lizzard/music/music/Roll/totusfloreo.mp3
[SearherThread #1]: *** thread finished ***
[SearherThread #0]: Total found 1 files:
[SearherThread #0]: /usr/home/lizzard/music/music/Roll/шалом.mp3
[SearherThread #0]: *** thread finished ***
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished…
[SearherThread #4]: Checking database…
[SearherThread #3]: Checking database…
[SearherThread #4]: Checking database…
[SearherThread #3]: Checking database…
[ComputerThread]: Computing SearherThread #3′s thread task…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Hellawes, The_Mill finished…
[SearherThread #3]: Checking database…
[SearherThread #4]: Checking database…
…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Иллет finished…
[SearherThread #4]: Checking database…
[SearherThread #3]: Checking database…
[SearherThread #4]: Total found 1 files:
[SearherThread #4]: /usr/home/lizzard/music/music/Roll/Иллет/ilet_01.mp3
[SearherThread #4]: *** thread finished ***
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Крыс и Шмендра finished…
[SearherThread #3]: Checking database…
…
[ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Ю. Буркин finished…
[SearherThread #3]: Checking database…
[SearherThread #3]: Total found 0 files.
[MainThread]: *** all computes finished ***
%
Конечно, при желании можно улучшить алгоритм, но как пример для изучения потоков в python’е, имхо, сойдёт ;-).
Исходники к статье в архиве — тут.
Ссылки к статье:
http://docs.python.org/library/threading.html — официальная документация python по модулю threading (англ.).


здраствуйте! мне нужна программная реализация шифрование файла и папок. можете отправить исходники этой программы на Delphi/ заоанее спасибо.
@aidana
И Вам не болеть, но при чём тут многопоточность в питоне? ;) Дождитесь ответа в соответствующем посте, я Ваши комментарии видел, но, увы, помочь ничем не могу, delphi совсем не знаю :).