Рейтинг@Mail.ru

Python: Работа с потоками.


В данной статье будет рассмотрена реализация многопоточного приложения на Python’е. Сама задачка выглядит примерно так:
1. Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
2. Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть — выводит результат, иначе — запускает 3й поток с такими же параметрами (путь и хеш) и по его завершении заново проверяет БД. Выводит результат.
3. 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.

Главный поток у нас всегда один, вторых потоков может быть сколько угодно, а третьих (вычисляющих) — только один запущенный.

Для наглядности реализуем всё сначала без потоков. Создадим класс MainTask со следующими методами:

  • runMainTask(self, sHash, sFolder)
  • searchFile(self, s_hash, s_folder)
  • runHashTask(self, s_folder, s_hash)

runMainTask — в общем то почти ничего не делает ;-). Запускает метод searchFile с соответствующими параметрами.

searchFile — проверяет БД на наличие искомого хеша, если записи нашлись — выводит их и завершается. Если записи не найдены — вызывает метод runHashTask, затем опять проверяет БД.

runHashTask — рекурсивно (т.е. включая подкаталоги) обходит переданный через параметры путь и хеширует файлы. Если во время обхода встречается искомый хеш — добавляет результат вычислений (все вычисленные хеши для файлов) в БД и завершается. Если хеш не был найден — делает то же самое (обновляет БД и завершает работу) ;-).

Для хранения результатов вычисления была выбрана база данных, чтобы не использовать какой-либо сервер был выбран SQLite (СУБД работает без сервера на основе файлов).

Для нашего приложения нужна всего одна табличка с двумя полями — «path» и «file_hash». Саму таблицу назовём «hashes». Поле «path» будет содержать полный путь к файлу включая его имя, а «file_hash» — хеш этого файла (неожиданный поворот событий? ;-) ).

Так же сразу я решил описать функцию add_hashes. Получает в качестве параметра словарь вида ["file0":"hash0", "file1":"hash1", ...] и пытается добавить его в БД.

Смотрим, что у нас получилось. Модуль для работы с базой данных (files.py):

import sqlalchemy as sa from sqlalchemy.orm import mapper, sessionmaker engine = sa.create_engine("sqlite:///hashes.db", echo=False) metadata = sa.MetaData() hashes_table = sa.Table("hashes", metadata,                         sa.Column("path", sa.Text(convert_unicode=True), nullable=False, primary_key=True),                         sa.Column("file_hash", sa.String(32, convert_unicode=True), nullable=False),                        ) metadata.create_all(engine) class Hash(object):     def __init__(self, path='', file_hash=''):         self.path      = path         self.file_hash = file_hash     def __repr__(self):         return "" % (self.path, self.file_hash) def add_hashes(files):     session = Session()     counter = 0     try:         for f in files:             session.save_or_update(Hash(f, files[f]))             counter += 1         session.commit()     except:         print "Error. Can't add hashes into database."     finally:         return counter mapper(Hash, hashes_table) Session = sessionmaker(bind = engine)

Основной модуль (task_1.py):

import md5 import os, sys import files class MainTask():     def __init__(self):         self.sHash = ""         self.sFolder = ""     def runMainTask(self, sHash, sFolder):         self.sHash = sHash         self.sFolder = sFolder         self.searchFile(self.sHash, self.sFolder)     def searchFile(self, s_hash, s_folder):         s = files.Session()         q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)         if q.count() > 0:             print "Found files in database:"             for f in q:                 print f.path             return True         else:             print "Files in database not found, searching..."             self.runHashTask(s_folder, s_hash)         q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)         if q.count() > 0:             print "Total found %d files:" % q.count()             for f in q:                 print f.path         else:             print "File with hash %s not found." % s_hash         return True     def runHashTask(self, s_folder, s_hash):         hashes = {}         s = files.Session()         for root, dirs, fnames in os.walk(s_folder):             for name in fnames:                 q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name))                 if q.count() == 0:                     h = ''                     try:                         f = open(os.path.join(root, name), "rb")                     except:                         print "Error: Can't open file %s." % os.path.join(root, name)                     else:                         h = md5.new(f.read()).hexdigest()                         hashes[os.path.join(root, name)] = h                     try:                         f.close()                     except:                         pass                     if h == s_hash:                         files.add_hashes(hashes)                         return True         files.add_hashes(hashes)         return True if __name__ == "__main__":     mt = MainTask()     while True:         shash   = unicode(raw_input("Enter file hash: "), "cp1251")         sfolder = unicode(raw_input("Enter folder to search: "), "cp1251")         if shash == "":             raw_input("Press Enter to exit...")             sys.exit(0)         mt.runMainTask(shash, sfolder)

Теперь попробуем сделать всё то же самое, но используя потоки. Модуль для работы с БД менять не надо, все изменения коснутся только основного модуля.

runMainTask теперь должен вызывать исполнение метода searchFile в отдельном потоке (так же, как и searchFile должен вызывать runHashTask). Перед вызовом третьего потока мы используем блокировку (threading.Lock()), что не позволяет одновременно нескольким вторым потокам запустить третий.

Способ использования блокировок:

lock = threading.Lock()
# Перед участком кода, который нежелательно чтобы выполнялся
# несколькими потоками одновременно запираем замок:
lock.acquire()
# … какой-то код
# затем отпираем замок:
lock.release()

Можно так же использовать один и тот же экземпляр замка в разных местах, если надо чтобы эти участки кода не выполнялись одновременно.

Порождение дочернего потока происходит с помощью класса Thread в модуле threading.

Описание использованных параметров класса Thread:

  • target — указывает на имя функции (в данном случае метод класса), которую стоит выполнять в новом потоке
  • name — имя порождаемого потока
  • args — список аргументов, которые мы передаём в функцию, указанную в параметре target

Поток создаётся так:

th = threading.Thread()

Запускается на исполнение поток путём выозова метода start:

th = threading.start()

Метод join() ожидает завершения потока.

Ну, вроде бы теперь должно быть всё понятно. Посмотрим, что у нас должно получить в итоге:

# -*- coding: cp1251 -*- import threading import md5 import sqlalchemy import os, sys import files class MainTask():     def __init__(self):         self.sHash = ""         self.sFolder = ""         self.__th2Lock = threading.Lock()     def runMainTask(self, sHash, sFolder, c=0):         c = str(c)         self.sHash = sHash         self.sFolder = sFolder         #raw_input("1")         th1 = threading.Thread(target=self.searchFile, name="th1_"+c, args=[self.sHash,self.sFolder,"th1_"+c])         #raw_input("1")         print "[MainThred]: *** starting th1_%s ***" % c         th1.start()     def searchFile(self, s_hash, s_folder, thName):         s = files.Session()         q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)         if q.count() > 0:             print "[%s]: Found %d files in database:" % (thName, q.count())             for f in q:                 print "[%s]: %s" % (thName, f.path)             print "[%s]: *** thread finished ***" % thName             return True         print "[%s]: Files in database not found, searching..." % thName         ### START LOCK         self.__th2Lock.acquire()         th2 = threading.Thread(target=self.runHashTask, name="th2", args=[s_folder,s_hash])         print "[%s]: *** starting th2 ***" % thName         th2.start()         th2.join()         print "[%s]: *** th2 finished ***" % thName         self.__th2Lock.release()         ### END LOCK         q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)         if q.count() > 0:             print "[%s]: Total found %d files:" % (thName, q.count())             for f in q:                 print f.path         else:             print "[%s]: File with hash %s not found."  % (thName, s_hash)         print "[%s]: *** thread finished ***" % thName         return True     def runHashTask(self, s_folder, s_hash):         hashes = {}         s = files.Session()         for root, dirs, fnames in os.walk(s_folder):             for name in fnames:                 q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name))                 if q.count() == 0:                     h = ''                     try:                         f = open(os.path.join(root, name), "rb")                     except:                         print "[th2]: Error: Can't open file %s." % os.path.join(root, name)                     else:                         h = md5.new(f.read()).hexdigest()                         hashes[os.path.join(root, name)] = h                     try:                         f.close()                     except:                         pass                     if h == s_hash:                         files.add_hashes(hashes)                         return True         files.add_hashes(hashes)         return True if __name__ == "__main__":     mt = MainTask()     counter = 0     while True:         shash   = unicode(raw_input("[MainThread]: Enter file hash: "), "cp1251")         sfolder = raw_input("[MainThread]: Enter folder to search: ")         if shash == '':             raw_input("[MainThread]: Press Enter to exit...")             sys.exit(0)         if sfolder == '':             sfolder = '.'         sfolder = unicode(os.path.abspath(sfolder), "cp1251")         mt.runMainTask(shash, sfolder, str(counter))         counter += 1

Вывод потоков может быть перемешан, но это сделано для наглядности.

Стоит так же отметить, что преобразование пути в кодировку utf8 вызвано проблемами с SQLAlchemy, как то он странно обрабатывает кодировку cp1251.

Если кто-то не знает как получить md5-хеш файла, вот пример:

import md5 f = open("path_to_file", "rb") hash = md5.new(f.read()).hexdigest() f.close() print hash

Архив с исходниками тут. В архиве в корне — программа с использованием потоков, в папке line — без.

Ссылки к статье:
http://www.intuit.ru/department/pl/python/class/free/11/ — «Многопоточные вычисления».
http://keysolutions.ru/articles/osnovy-raboty-s-potokami-v-python — Сергей Шилов — «Основы работы с потоками в Python».

параллельное, Программирование , , ,

Пожалуйста, оцените полезность и качество данной статьи. Одна звезда - плохо, 5 - хорошо.
1 звезда2 звезды3 звезды4 звезды5 звёзд (2 голосов, средний: 5,00 из 5)
Loading ... Loading ...

  1. duh_seti
    16 Декабрь 2008 в 20:10 | #1

    Блин, чувак, долго не мог понять, как эти гребаные замки использовать, поглядел листинг- а оно проще пареной репы :)

  2. lizz
    16 Декабрь 2008 в 22:15 | #2

    Кстати, скоро выложу вариант по-круче ;). Задачку делал для универа, как оказалось, не совсем то, что надо %).

  1. 21 Декабрь 2008 в 01:15 | #1
*