Мы уже недавно рассматривали примитивную работу с потоками, а именно, запуск потоков и передача им параметров, использование замков и, по большому счёту, всё. Теперь пришло время изучить другие способы взаимодействия потоков в python’е.
Решим ту же самую задачу, что и в предыдущем посте, но несколько изменив условия. Итак, вот:
- Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
- Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть — выводит результат, иначе, передаёт задания для 3го потока (путь) и по его завершении (задания, не потока) заново проверяет БД. Выводит результат.
- 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.
Только на этот раз у нас будет действительно один считающий поток, который не будет завершаться, а будет ждать заданий для вычисления.
Модуль для работы с базой данных не изменился (files.py), поменялся только task.py.
Вот логика работы приложения вкратце:
- В конструкторе класса объявляем: очередь заданий taskQueue, замок globalTaskEventLock, событие globalTaskEvent — событие, которое будет оповещать потоки о том, что хеширование какой-либо папки закончено, массив localTaskEvent — в нём будут хранится события, которые отвечают за полное выполнение задания какого-то конкретного «ищущего» потока, и counter — счётчик «ищущих» потоков (за всё время работы приложения).
- Метод runMainTask(s_hash, s_folder) — в параметрах передаётся искомый хеш и директория, в которой следует искать (при поиске в БД не учитывается). Запускает исполнение метода searchFile в отдельном потоке и увеличивает счётчик «ищущих» потоков на 1.
- searchFile(s_hash, s_folder) — аргументы те же, что и у метода runMainTask. Проверяет наличие в базе данных искомого хеша. Если совпадения найдены — выводит их в консоль и завершает работу, иначе добавляет указанную директорию и имя потока, в котором выполняется (сам метод) в очередь заданий, так же добавляет в массив localTaskEvents пару thName => event, где thName — имя потока, а event — событие, наступление которого означает полное выполнения задания (вся папка прохеширована). Затем ждёт в бесконечном цикле события globalTaskEvents (оповещает о том, что хеши для всех файлов в какой-то папке посчитаны и добавлены в БД). При наступлении события проверяет БД, если совпадения найдены — выводит результат, удаляет своё событие из массива localTaskEvents и завершается, если нет — проверяет было ли вычислено его задание, если да — выводит, что ничего не нашёл, опять удаляет свои данные из localTaskEvent и завершается.
- runHashTask() — исполняет код в бесконечном цикле. Получает имя потока и папку из очереди заданий. Проверяет в массиве localTaskEvents наличие события для имени потока, которое он получил. Если его нет — то поток уже завершился и удалил своё событие, тогда пытаемся получить следующее задание. Иначе, рекурсивно проходим по всем подпапкам и хешируем все файлы. После того как в какой-то подпапке файлы прохешированы — результат добавляется в БД и с помощью globalTaskEvent оповещаются об этом все потоки. Когда задание завершено — пытаемся оповестить поток, чьё задание это было. При переходе к каждой новой подпапке проверяем надо ли считать ещё это задание или лучше перейти к следующему.
Вот, в общем то, всё, таков мой план.
Листинг нового класса:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | #!/usr/local/bin/ python # -*- coding: utf-8 -*- import threading import md5 import sqlalchemy import os, sys import files import Queue class MainTask(): def __init__(self): self.taskQueue = Queue.Queue() # очередь заданий для вычисляющего потока self.globalTaskEventLock = threading.Lock() self.globalTaskEvent = threading.Event() # оповещение о конце вычислений какого-то задания self.localTaskEvents = {} # оповещение о конце вычислений задания какого-то конкретного потока self.computingThread = threading.Thread(target=self.runHashTask, name="ComputerThread") # создаём "вычисляющий поток" self.computingThread.start() self.counter = 0 def runMainTask(self, s_hash, s_folder): c = str(self.counter) th1 = threading.Thread(target=self.searchFile, name="SearherThread #"+c, args=[s_hash,s_folder]) # создаём "ищущий" поток print "[MainThread]: *** starting SearcherThread #%s ***" % c th1.start() self.counter += 1 return True def searchFile(self, s_hash, s_folder): thName = threading.currentThread().getName() q = self.searchInDB(s_hash) if q.count() > 0: print "[%s]: Found %d files in database:" % (thName, q.count()) for f in q: print "[%s]: %s" % (thName, f.path) print "[%s]: *** thread finished ***" % thName return True print "[%s]: Files in database not found, start searching…" % thName self.localTaskEvents[thName] = threading.Event() self.taskQueue.put((thName, s_folder)) while True: self.globalTaskEvent.wait() print "[%s]: Checking database…" % thName q = self.searchInDB(s_hash) if q.count() > 0: print "[%s]: Total found %d files:" % (thName, q.count()) for f in q: print "[%s]: %s" % (thName, f.path) self.localTaskEvents.pop(thName) # удаляем события для этого потока print "[%s]: *** thread finished ***" % thName return True elif self.localTaskEvents[thName].isSet(): self.localTaskEvents.pop(thName) print "[%s]: Total found 0 files." % thName return True # сделано чтобы после одного события не цикл не проходил несколько раз self.globalTaskEventLock.acquire() self.globalTaskEventLock.release() def searchInDB(self, s_hash): s = files.Session() return s.query(files.Hash).filter(files.Hash.file_hash==s_hash) def runHashTask(self): computerThreadName = threading.currentThread().getName() while True: thName, s_folder = self.taskQueue.get() # проверяем надо ли ещё считать это задание if self.localTaskEvents.has_key(thName): print "[%s]: Computing %s’s thread task…" % (computerThreadName, thName) s = files.Session() for root, dirs, fnames in os.walk(s_folder): # опять проверяем актуальность задания if not self.localTaskEvents.has_key(thName): break counter = 0 hashes = {} for name in fnames: q = s.query(files.Hash).filter(files.Hash.path==unicode(os.path.join(root, name), "koi8-r")) if q.count() == 0: h = » try: f = open(os.path.join(root, name), "rb") except: print "[%s]: Error: Can’t open file %s." % (computerThreadName, os.path.join(root, name)) else: h = md5.new(f.read()).hexdigest() hashes[unicode(os.path.join(root, name), "koi8-r")] = h counter += 1 try: f.close() except: pass print "[%s]: Hashing %s finished…" % (computerThreadName, root) if counter != 0: files.add_hashes(hashes) self.globalTaskEventLock.acquire() self.globalTaskEvent.set() self.globalTaskEvent = threading.Event() # создаём новый экземпляр события self.globalTaskEventLock.release() # try — на случай, если поток уже завершился, удалив задание и событие try: self.localTaskEvents[thName].set() except: pass # добавлено на случай, если мы ищем в папке, # все файлы которой уже есть в БД, # иначе globalEvent не наступит ни разу self.globalTaskEventLock.acquire() self.globalTaskEvent.set() self.globalTaskEvent = threading.Event() self.globalTaskEventLock.release() if __name__ == "__main__": mt = MainTask() # для теста ищем всё в одной папке — 3 файла + 1 несуществующий hashes = ("5f4dad3fbd8c6761f8ca218bd6d8e467", "500ead228d2ec9af9986a9f10ad13e84", "e1d0bced5b03d00b3d09710705eef3b5", "invalid hash", "2d3153b321a7ce6c2cd56af3231d79a4") folder = "/usr/home/lizzard/music/music/Roll" for h in hashes: mt.runMainTask(h, folder) # завершаем программу while threading.activeCount() > 2: pass print "[MainThread]: *** all computes finished ***" sys.exit(0) |
Стоит отметить, что при преобразовании строк в юникод с помощью функции unicode стоит указать кодировку, специфичкескую для Вашей ОС (для windows — cp1251, freebsd — koi8-r).
Вот результат выполнения кода:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | % python task_1.py [MainThread]: *** starting SearcherThread #0 *** [MainThread]: *** starting SearcherThread #1 *** [SearherThread #0]: Files in database not found, start searching… [ComputerThread]: Computing SearherThread #0′s thread task… [SearherThread #1]: Files in database not found, start searching… [MainThread]: *** starting SearcherThread #2 *** [SearherThread #2]: Files in database not found, start searching… [MainThread]: *** starting SearcherThread #3 *** [SearherThread #3]: Files in database not found, start searching… [MainThread]: *** starting SearcherThread #4 *** [SearherThread #4]: Files in database not found, start searching… [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished… [SearherThread #0]: Checking database… [SearherThread #1]: Checking database… [SearherThread #2]: Checking database… [SearherThread #3]: Checking database… [SearherThread #4]: Checking database… [SearherThread #1]: Total found 1 files: [SearherThread #2]: Total found 1 files: [SearherThread #2]: /usr/home/lizzard/music/music/Roll/in_taberna.mp3 [SearherThread #2]: *** thread finished *** [SearherThread #1]: /usr/home/lizzard/music/music/Roll/totusfloreo.mp3 [SearherThread #1]: *** thread finished *** [SearherThread #0]: Total found 1 files: [SearherThread #0]: /usr/home/lizzard/music/music/Roll/шалом.mp3 [SearherThread #0]: *** thread finished *** [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished… [SearherThread #4]: Checking database… [SearherThread #3]: Checking database… [SearherThread #4]: Checking database… [SearherThread #3]: Checking database… [ComputerThread]: Computing SearherThread #3′s thread task… [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll finished… [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Garmarna finished… [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Hellawes, The_Mill finished… [SearherThread #3]: Checking database… [SearherThread #4]: Checking database… … [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Иллет finished… [SearherThread #4]: Checking database… [SearherThread #3]: Checking database… [SearherThread #4]: Total found 1 files: [SearherThread #4]: /usr/home/lizzard/music/music/Roll/Иллет/ilet_01.mp3 [SearherThread #4]: *** thread finished *** [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Крыс и Шмендра finished… [SearherThread #3]: Checking database… … [ComputerThread]: Hashing /usr/home/lizzard/music/music/Roll/Ю. Буркин finished… [SearherThread #3]: Checking database… [SearherThread #3]: Total found 0 files. [MainThread]: *** all computes finished *** % |
Конечно, при желании, можно улучшить алгоритм, но, как пример для изучения потоков в python’е, имхо, сойдёт.