В данной статье будет рассмотрена реализация многопоточного приложения на Python’е. Сама задачка выглядит примерно так:
1. Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
2. Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть — выводит результат, иначе, запускает 3й поток с такими же параметрами (путь и хеш) и по его завершении заново проверяет БД. Выводит результат.
3. 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.
Главный поток у нас всегда один, вторых потоков может быть сколько угодно, а третьих (вычисляющих) — только один запущенный.
Для наглядности реализуем всё сначала без потоков. Создадим класс MainTask со следующими методами:
- runMainTask(self, sHash, sFolder)
- searchFile(self, s_hash, s_folder)
- runHashTask(self, s_folder, s_hash)
- runMainTask — в общем то почти ничего не делает ;-). Запускает метод searchFile с соответствующими параметрами.
searchFile — проверяет БД на наличие искомого хеша, если записи нашлись — выводит их и завершается. Если записи не найдены — вызывает метод runHashTask, затем опять проверяет БД.
runHashTask — рекурсивно (т.е. включая подкаталоги) обходит переданный через параметры путь и хеширует файлы. Если во время обхода встречается искомый хеш — добавляет результат вычислений (все вычисленные хеши для файлов) в БД и завершается. Если хеш не был найден — делает то же самое (обновляет БД и завершает работу) ;-).
Для хранения результатов вычисления была выбрана база данных, что-бы не использовать какой-либо сервер был выбран SQLite (СУБД работает без сервера на основе файлов).
Для нашего приложения нужна всего одна табличка с двумя полями — «path» и «file_hash». Саму таблицу назовём «hashes». Поле «path» будет содержать полный путь к файлу включая его имя, а «file_hash» — хеш этого файла (неожиданный поворот событий? ).
Так же сразу я решил описать функцию add_hashes. Получает в качестве параметра словарь вида [«file0″:»hash0», «file1″:»hash1», …] и пытается добавить его в БД.
Смотрим, что у нас получилось. Модуль для работы с базой данных (files.py):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | import sqlalchemy as sa from sqlalchemy.orm import mapper, sessionmaker engine = sa.create_engine("sqlite:///hashes.db", echo=False) metadata = sa.MetaData() hashes_table = sa.Table("hashes", metadata, sa.Column("path", sa.Text(convert_unicode=True), nullable=False, primary_key=True), sa.Column("file_hash", sa.String(32, convert_unicode=True), nullable=False), ) metadata.create_all(engine) class Hash(object): def __init__(self, path='', file_hash=''): self.path = path self.file_hash = file_hash def __repr__(self): return "" % (self.path, self.file_hash) def add_hashes(files): session = Session() counter = 0 try: for f in files: session.save_or_update(Hash(f, files[f])) counter += 1 session.commit() except: print "Error. Can't add hashes into database." finally: return counter mapper(Hash, hashes_table) Session = sessionmaker(bind = engine) |
Основной модуль (task_1.py):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | import md5 import os, sys import files class MainTask(): def __init__(self): self.sHash = "" self.sFolder = "" def runMainTask(self, sHash, sFolder): self.sHash = sHash self.sFolder = sFolder self.searchFile(self.sHash, self.sFolder) def searchFile(self, s_hash, s_folder): s = files.Session() q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "Found files in database:" for f in q: print f.path return True else: print "Files in database not found, searching..." self.runHashTask(s_folder, s_hash) q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "Total found %d files:" % q.count() for f in q: print f.path else: print "File with hash %s not found." % s_hash return True def runHashTask(self, s_folder, s_hash): hashes = {} s = files.Session() for root, dirs, fnames in os.walk(s_folder): for name in fnames: q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name)) if q.count() == 0: h = '' try: f = open(os.path.join(root, name), "rb") except: print "Error: Can't open file %s." % os.path.join(root, name) else: h = md5.new(f.read()).hexdigest() hashes[os.path.join(root, name)] = h try: f.close() except: pass if h == s_hash: files.add_hashes(hashes) return True files.add_hashes(hashes) return True if __name__ == "__main__": mt = MainTask() while True: shash = unicode(raw_input("Enter file hash: "), "cp1251") sfolder = unicode(raw_input("Enter folder to search: "), "cp1251") if shash == "": raw_input("Press Enter to exit...") sys.exit(0) mt.runMainTask(shash, sfolder) |
Теперь попробуем сделать всё то же самое, но используя потоки. Модуль для работы с БД менять не надо, все изменения коснуться только основного модуля.
runMainTask теперь должен вызывать исполнение метода searchFile в отдельном потоке (так же, как и searchFile должен вызывать runHashTask). Перед вызовом третьего потока мы используем блокировку (threading.Lock()), что не позволяет одновременно нескольким вторым потокам запустить третий.
Способ использования блокировок:
1 2 3 4 5 6 7 8 9 10 11 12 13 | lock = threading.Lock() # Перед участком кода, который нежелательно чтобы выполнялся # несколькими потоками одновременно запираем замок: lock.acquire() # ... какой-то код # затем отпираем замок: lock.release() |
Можно так же использовать один и тот же экземпляр замка в разных местах, если надо чтобы эти участки кода не выполнялись одновременно.
Порождение дочернего потока происходит с помощью класса Thread в модуле threading.
Описание использованных параметров класса Thread:
target — указывает на имя функции (в данном случае метод класса), которую стоит выполнять в новом потоке
name — имя порождаемого потока
args — список аргументов, которые мы передаём в функцию, указанную в параметре target
Поток создаётся так:
1 | th = threading.Thread(...) |
Запускается на исполнение поток путём выозова метода start:
1 | th = threading.start() |
Метод join() ожидает завершения потока.
Ну, вроде бы теперь должно быть всё понятно. Посмотрим что у нас должно получить в итоге:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | # -*- coding: cp1251 -*- import threading import md5 import sqlalchemy import os, sys import files class MainTask(): def __init__(self): self.sHash = "" self.sFolder = "" self.__th2Lock = threading.Lock() def runMainTask(self, sHash, sFolder, c=0): c = str(c) self.sHash = sHash self.sFolder = sFolder #raw_input("1") th1 = threading.Thread(target=self.searchFile, name="th1_"+c, args=[self.sHash,self.sFolder,"th1_"+c]) #raw_input("1") print "[MainThred]: *** starting th1_%s ***" % c th1.start() def searchFile(self, s_hash, s_folder, thName): s = files.Session() q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "[%s]: Found %d files in database:" % (thName, q.count()) for f in q: print "[%s]: %s" % (thName, f.path) print "[%s]: *** thread finished ***" % thName return True print "[%s]: Files in database not found, searching..." % thName ### START LOCK self.__th2Lock.acquire() th2 = threading.Thread(target=self.runHashTask, name="th2", args=[s_folder,s_hash]) print "[%s]: *** starting th2 ***" % thName th2.start() th2.join() print "[%s]: *** th2 finished ***" % thName self.__th2Lock.release() ### END LOCK q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "[%s]: Total found %d files:" % (thName, q.count()) for f in q: print f.path else: print "[%s]: File with hash %s not found." % (thName, s_hash) print "[%s]: *** thread finished ***" % thName return True def runHashTask(self, s_folder, s_hash): hashes = {} s = files.Session() for root, dirs, fnames in os.walk(s_folder): for name in fnames: q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name)) if q.count() == 0: h = '' try: f = open(os.path.join(root, name), "rb") except: print "[th2]: Error: Can't open file %s." % os.path.join(root, name) else: h = md5.new(f.read()).hexdigest() hashes[os.path.join(root, name)] = h try: f.close() except: pass if h == s_hash: files.add_hashes(hashes) return True files.add_hashes(hashes) return True if __name__ == "__main__": mt = MainTask() counter = 0 while True: shash = unicode(raw_input("[MainThread]: Enter file hash: "), "cp1251") sfolder = raw_input("[MainThread]: Enter folder to search: ") if shash == '': raw_input("[MainThread]: Press Enter to exit...") sys.exit(0) if sfolder == '': sfolder = '.' sfolder = unicode(os.path.abspath(sfolder), "cp1251") mt.runMainTask(shash, sfolder, str(counter)) counter += 1 |
Вывод потоков может быть перемешан, но это сделано для наглядности.
Стоит так же отметить, что преобразование пути в кодировку utf8 вызвано проблемами с SQLAlchemy, как то он странно обрабатывает кодировку cp1251.
Если кто-то не знает как получить md5-хеш файла, вот пример:
1 2 3 4 5 6 7 8 9 | import md5 f = open("path_to_file", "rb") hash = md5.new(f.read()).hexdigest() f.close() print hash |
Блин, чувак, долго не мог понять, как эти гребаные замки использовать, поглядел листинг- а оно проще пареной репы
Кстати, скоро выложу вариант по-круче ;). Задачку делал для универа, как оказалось, не совсем то, что надо %).