Python: Работа с потоками.
В данной статье будет рассмотрена реализация многопоточного приложения на Python’е. Сама задачка выглядит примерно так:
1. Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
2. Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть — выводит результат, иначе — запускает 3й поток с такими же параметрами (путь и хеш) и по его завершении заново проверяет БД. Выводит результат.
3. 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.
Главный поток у нас всегда один, вторых потоков может быть сколько угодно, а третьих (вычисляющих) — только один запущенный.
Для наглядности реализуем всё сначала без потоков. Создадим класс MainTask со следующими методами:
- runMainTask(self, sHash, sFolder)
- searchFile(self, s_hash, s_folder)
- runHashTask(self, s_folder, s_hash)
runMainTask — в общем то почти ничего не делает ;-). Запускает метод searchFile с соответствующими параметрами.
searchFile — проверяет БД на наличие искомого хеша, если записи нашлись — выводит их и завершается. Если записи не найдены — вызывает метод runHashTask, затем опять проверяет БД.
runHashTask — рекурсивно (т.е. включая подкаталоги) обходит переданный через параметры путь и хеширует файлы. Если во время обхода встречается искомый хеш — добавляет результат вычислений (все вычисленные хеши для файлов) в БД и завершается. Если хеш не был найден — делает то же самое (обновляет БД и завершает работу) ;-).
Для хранения результатов вычисления была выбрана база данных, чтобы не использовать какой-либо сервер был выбран SQLite (СУБД работает без сервера на основе файлов).
Для нашего приложения нужна всего одна табличка с двумя полями — «path» и «file_hash». Саму таблицу назовём «hashes». Поле «path» будет содержать полный путь к файлу включая его имя, а «file_hash» — хеш этого файла (неожиданный поворот событий? ;-) ).
Так же сразу я решил описать функцию add_hashes. Получает в качестве параметра словарь вида ["file0":"hash0", "file1":"hash1", ...] и пытается добавить его в БД.
Смотрим, что у нас получилось. Модуль для работы с базой данных (files.py):
import sqlalchemy as sa from sqlalchemy.orm import mapper, sessionmaker engine = sa.create_engine("sqlite:///hashes.db", echo=False) metadata = sa.MetaData() hashes_table = sa.Table("hashes", metadata, sa.Column("path", sa.Text(convert_unicode=True), nullable=False, primary_key=True), sa.Column("file_hash", sa.String(32, convert_unicode=True), nullable=False), ) metadata.create_all(engine) class Hash(object): def __init__(self, path='', file_hash=''): self.path = path self.file_hash = file_hash def __repr__(self): return "" % (self.path, self.file_hash) def add_hashes(files): session = Session() counter = 0 try: for f in files: session.save_or_update(Hash(f, files[f])) counter += 1 session.commit() except: print "Error. Can't add hashes into database." finally: return counter mapper(Hash, hashes_table) Session = sessionmaker(bind = engine)
Основной модуль (task_1.py):
import md5 import os, sys import files class MainTask(): def __init__(self): self.sHash = "" self.sFolder = "" def runMainTask(self, sHash, sFolder): self.sHash = sHash self.sFolder = sFolder self.searchFile(self.sHash, self.sFolder) def searchFile(self, s_hash, s_folder): s = files.Session() q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "Found files in database:" for f in q: print f.path return True else: print "Files in database not found, searching..." self.runHashTask(s_folder, s_hash) q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "Total found %d files:" % q.count() for f in q: print f.path else: print "File with hash %s not found." % s_hash return True def runHashTask(self, s_folder, s_hash): hashes = {} s = files.Session() for root, dirs, fnames in os.walk(s_folder): for name in fnames: q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name)) if q.count() == 0: h = '' try: f = open(os.path.join(root, name), "rb") except: print "Error: Can't open file %s." % os.path.join(root, name) else: h = md5.new(f.read()).hexdigest() hashes[os.path.join(root, name)] = h try: f.close() except: pass if h == s_hash: files.add_hashes(hashes) return True files.add_hashes(hashes) return True if __name__ == "__main__": mt = MainTask() while True: shash = unicode(raw_input("Enter file hash: "), "cp1251") sfolder = unicode(raw_input("Enter folder to search: "), "cp1251") if shash == "": raw_input("Press Enter to exit...") sys.exit(0) mt.runMainTask(shash, sfolder)
Теперь попробуем сделать всё то же самое, но используя потоки. Модуль для работы с БД менять не надо, все изменения коснутся только основного модуля.
runMainTask теперь должен вызывать исполнение метода searchFile в отдельном потоке (так же, как и searchFile должен вызывать runHashTask). Перед вызовом третьего потока мы используем блокировку (threading.Lock()), что не позволяет одновременно нескольким вторым потокам запустить третий.
Способ использования блокировок:
# Перед участком кода, который нежелательно чтобы выполнялся
# несколькими потоками одновременно запираем замок:
lock.acquire()
# … какой-то код
# затем отпираем замок:
lock.release()
Можно так же использовать один и тот же экземпляр замка в разных местах, если надо чтобы эти участки кода не выполнялись одновременно.
Порождение дочернего потока происходит с помощью класса Thread в модуле threading.
Описание использованных параметров класса Thread:
- target — указывает на имя функции (в данном случае метод класса), которую стоит выполнять в новом потоке
- name — имя порождаемого потока
- args — список аргументов, которые мы передаём в функцию, указанную в параметре target
Поток создаётся так:
Запускается на исполнение поток путём выозова метода start:
Метод join() ожидает завершения потока.
Ну, вроде бы теперь должно быть всё понятно. Посмотрим, что у нас должно получить в итоге:
# -*- coding: cp1251 -*- import threading import md5 import sqlalchemy import os, sys import files class MainTask(): def __init__(self): self.sHash = "" self.sFolder = "" self.__th2Lock = threading.Lock() def runMainTask(self, sHash, sFolder, c=0): c = str(c) self.sHash = sHash self.sFolder = sFolder #raw_input("1") th1 = threading.Thread(target=self.searchFile, name="th1_"+c, args=[self.sHash,self.sFolder,"th1_"+c]) #raw_input("1") print "[MainThred]: *** starting th1_%s ***" % c th1.start() def searchFile(self, s_hash, s_folder, thName): s = files.Session() q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "[%s]: Found %d files in database:" % (thName, q.count()) for f in q: print "[%s]: %s" % (thName, f.path) print "[%s]: *** thread finished ***" % thName return True print "[%s]: Files in database not found, searching..." % thName ### START LOCK self.__th2Lock.acquire() th2 = threading.Thread(target=self.runHashTask, name="th2", args=[s_folder,s_hash]) print "[%s]: *** starting th2 ***" % thName th2.start() th2.join() print "[%s]: *** th2 finished ***" % thName self.__th2Lock.release() ### END LOCK q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash) if q.count() > 0: print "[%s]: Total found %d files:" % (thName, q.count()) for f in q: print f.path else: print "[%s]: File with hash %s not found." % (thName, s_hash) print "[%s]: *** thread finished ***" % thName return True def runHashTask(self, s_folder, s_hash): hashes = {} s = files.Session() for root, dirs, fnames in os.walk(s_folder): for name in fnames: q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name)) if q.count() == 0: h = '' try: f = open(os.path.join(root, name), "rb") except: print "[th2]: Error: Can't open file %s." % os.path.join(root, name) else: h = md5.new(f.read()).hexdigest() hashes[os.path.join(root, name)] = h try: f.close() except: pass if h == s_hash: files.add_hashes(hashes) return True files.add_hashes(hashes) return True if __name__ == "__main__": mt = MainTask() counter = 0 while True: shash = unicode(raw_input("[MainThread]: Enter file hash: "), "cp1251") sfolder = raw_input("[MainThread]: Enter folder to search: ") if shash == '': raw_input("[MainThread]: Press Enter to exit...") sys.exit(0) if sfolder == '': sfolder = '.' sfolder = unicode(os.path.abspath(sfolder), "cp1251") mt.runMainTask(shash, sfolder, str(counter)) counter += 1
Вывод потоков может быть перемешан, но это сделано для наглядности.
Стоит так же отметить, что преобразование пути в кодировку utf8 вызвано проблемами с SQLAlchemy, как то он странно обрабатывает кодировку cp1251.
Если кто-то не знает как получить md5-хеш файла, вот пример:
import md5 f = open("path_to_file", "rb") hash = md5.new(f.read()).hexdigest() f.close() print hash
Архив с исходниками тут. В архиве в корне — программа с использованием потоков, в папке line — без.
Ссылки к статье:
http://www.intuit.ru/department/pl/python/class/free/11/ — «Многопоточные вычисления».
http://keysolutions.ru/articles/osnovy-raboty-s-potokami-v-python — Сергей Шилов — «Основы работы с потоками в Python».


Блин, чувак, долго не мог понять, как эти гребаные замки использовать, поглядел листинг- а оно проще пареной репы :)
Кстати, скоро выложу вариант по-круче ;). Задачку делал для универа, как оказалось, не совсем то, что надо %).