Главная > параллельное, Программирование > Python: Работа с потоками.

Python: Работа с потоками.

В данной статье будет рассмотрена реализация многопоточного приложения на Python'е. Сама задачка выглядит примерно так:
1. Главный поток запрашивает у пользователя md5-сумму какого-либо файла и директорию для поиска файла с такой же суммой. Получив эти параметры первый поток запускает 2й и продолжает свою работу (опять ожидает данных от пользователя).
2. Запущенный поток проверяет есть ли в БД приложения файл с таким хешом. Если есть - выводит результат, иначе, запускает 3й поток с такими же параметрами (путь и хеш) и по его завершении заново проверяет БД. Выводит результат.
3. 3й поток хеширует все файлы с помощью md5 по указанному пути и результат записывает в базу данных.

Главный поток у нас всегда один, вторых потоков может быть сколько угодно, а третьих (вычисляющих) - только один запущенный.

Для наглядности реализуем всё сначала без потоков. Создадим класс MainTask со следующими методами:

  • runMainTask(self, sHash, sFolder)
  • searchFile(self, s_hash, s_folder)
  • runHashTask(self, s_folder, s_hash)

runMainTask - в общем то почти ничего не делает ;-). Запускает метод searchFile с соответствующими параметрами.

searchFile - проверяет БД на наличие искомого хеша, если записи нашлись - выводит их и завершается. Если записи не найдены - вызывает метод runHashTask, затем опять проверяет БД.

runHashTask - рекурсивно (т.е. включая подкаталоги) обходит переданный через параметры путь и хеширует файлы. Если во время обхода встречается искомый хеш - добавляет результат вычислений (все вычисленные хеши для файлов) в БД и завершается. Если хеш не был найден - делает то же самое (обновляет БД и завершает работу) ;-).

Для хранения результатов вычисления была выбрана база данных, что-бы не использовать какой-либо сервер был выбран SQLite (СУБД работает без сервера на основе файлов).

Для нашего приложения нужна всего одна табличка с двумя полями - "path" и "file_hash". Саму таблицу назовём "hashes". Поле "path" будет содержать полный путь к файлу включая его имя, а "file_hash" - хеш этого файла (неожиданный поворот событий? ;-) ).

Так же сразу я решил описать функцию add_hashes. Получает в качестве параметра словарь вида ["file0":"hash0", "file1":"hash1", ...] и пытается добавить его в БД.

Смотрим, что у нас получилось. Модуль для работы с базой данных (files.py):

import sqlalchemy as sa
from sqlalchemy.orm import mapper, sessionmaker

engine = sa.create_engine("sqlite:///hashes.db", echo=False)
metadata = sa.MetaData()

hashes_table = sa.Table("hashes", metadata,
sa.Column("path", sa.Text(convert_unicode=True), nullable=False, primary_key=True),
sa.Column("file_hash", sa.String(32, convert_unicode=True), nullable=False),
)

metadata.create_all(engine)

class Hash(object):
def __init__(self, path='', file_hash=''):
self.path = path
self.file_hash = file_hash
def __repr__(self):
return "" % (self.path, self.file_hash)

def add_hashes(files):
session = Session()
counter = 0
try:
for f in files:
session.save_or_update(Hash(f, files[f]))
counter += 1
session.commit()
except:
print "Error. Can't add hashes into database."
finally:
return counter

mapper(Hash, hashes_table)
Session = sessionmaker(bind = engine)

Основной модуль (task_1.py):

import md5
import os, sys
import files

class MainTask():
def __init__(self):
self.sHash = ""
self.sFolder = ""

def runMainTask(self, sHash, sFolder):
self.sHash = sHash
self.sFolder = sFolder
self.searchFile(self.sHash, self.sFolder)

def searchFile(self, s_hash, s_folder):
s = files.Session()
q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)
if q.count() > 0:
print "Found files in database:"
for f in q:
print f.path
return True
else:
print "Files in database not found, searching..."
self.runHashTask(s_folder, s_hash)
q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)
if q.count() > 0:
print "Total found %d files:" % q.count()
for f in q:
print f.path
else:
print "File with hash %s not found." % s_hash
return True

def runHashTask(self, s_folder, s_hash):
hashes = {}
s = files.Session()
for root, dirs, fnames in os.walk(s_folder):
for name in fnames:
q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name))
if q.count() == 0:
h = ''
try:
f = open(os.path.join(root, name), "rb")
except:
print "Error: Can't open file %s." % os.path.join(root, name)
else:
h = md5.new(f.read()).hexdigest()
hashes[os.path.join(root, name)] = h
try:
f.close()
except:
pass
if h == s_hash:
files.add_hashes(hashes)
return True
files.add_hashes(hashes)
return True

if __name__ == "__main__":
mt = MainTask()
while True:
shash = unicode(raw_input("Enter file hash: "), "cp1251")
sfolder = unicode(raw_input("Enter folder to search: "), "cp1251")
if shash == "":
raw_input("Press Enter to exit...")
sys.exit(0)
mt.runMainTask(shash, sfolder)

Теперь попробуем сделать всё то же самое, но используя потоки. Модуль для работы с БД менять не надо, все изменения коснуться только основного модуля.

runMainTask теперь должен вызывать исполнение метода searchFile в отдельном потоке (так же, как и searchFile должен вызывать runHashTask). Перед вызовом третьего потока мы используем блокировку (threading.Lock()), что не позволяет одновременно нескольким вторым потокам запустить третий.

Способ использования блокировок:

lock = threading.Lock()
# Перед участком кода, который нежелательно чтобы выполнялся
# несколькими потоками одновременно запираем замок:
lock.acquire()
# ... какой-то код
# затем отпираем замок:
lock.release()

Можно так же использовать один и тот же экземпляр замка в разных местах, если надо чтобы эти участки кода не выполнялись одновременно.

Порождение дочернего потока происходит с помощью класса Thread в модуле threading.

Описание использованных параметров класса Thread:

  • target - указывает на имя функции (в данном случае метод класса), которую стоит выполнять в новом потоке
  • name - имя порождаемого потока
  • args - список аргументов, которые мы передаём в функцию, указанную в параметре target

Поток создаётся так:

th = threading.Thread(...)

Запускается на исполнение поток путём выозова метода start:

th = threading.start()

Метод join() ожидает завершения потока.

Ну, вроде бы теперь должно быть всё понятно. Посмотрим что у нас должно получить в итоге:

# -*- coding: cp1251 -*-
import threading
import md5
import sqlalchemy
import os, sys
import files

class MainTask():
def __init__(self):
self.sHash = ""
self.sFolder = ""
self.__th2Lock = threading.Lock()

def runMainTask(self, sHash, sFolder, c=0):
c = str(c)
self.sHash = sHash
self.sFolder = sFolder
#raw_input("1")
th1 = threading.Thread(target=self.searchFile, name="th1_"+c, args=[self.sHash,self.sFolder,"th1_"+c])
#raw_input("1")
print "[MainThred]: *** starting th1_%s ***" % c
th1.start()

def searchFile(self, s_hash, s_folder, thName):
s = files.Session()
q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)
if q.count() > 0:
print "[%s]: Found %d files in database:" % (thName, q.count())
for f in q:
print "[%s]: %s" % (thName, f.path)
print "[%s]: *** thread finished ***" % thName
return True
print "[%s]: Files in database not found, searching..." % thName
### START LOCK
self.__th2Lock.acquire()
th2 = threading.Thread(target=self.runHashTask, name="th2", args=[s_folder,s_hash])
print "[%s]: *** starting th2 ***" % thName
th2.start()
th2.join()
print "[%s]: *** th2 finished ***" % thName
self.__th2Lock.release()
### END LOCK
q = s.query(files.Hash).filter(files.Hash.file_hash==s_hash)
if q.count() > 0:
print "[%s]: Total found %d files:" % (thName, q.count())
for f in q:
print f.path
else:
print "[%s]: File with hash %s not found." % (thName, s_hash)
print "[%s]: *** thread finished ***" % thName
return True

def runHashTask(self, s_folder, s_hash):
hashes = {}
s = files.Session()
for root, dirs, fnames in os.walk(s_folder):
for name in fnames:
q = s.query(files.Hash).filter(files.Hash.path==os.path.join(root, name))
if q.count() == 0:
h = ''
try:
f = open(os.path.join(root, name), "rb")
except:
print "[th2]: Error: Can't open file %s." % os.path.join(root, name)
else:
h = md5.new(f.read()).hexdigest()
hashes[os.path.join(root, name)] = h
try:
f.close()
except:
pass
if h == s_hash:
files.add_hashes(hashes)
return True
files.add_hashes(hashes)
return True

if __name__ == "__main__":
mt = MainTask()
counter = 0
while True:
shash = unicode(raw_input("[MainThread]: Enter file hash: "), "cp1251")
sfolder = raw_input("[MainThread]: Enter folder to search: ")
if shash == '':
raw_input("[MainThread]: Press Enter to exit...")
sys.exit(0)
if sfolder == '':
sfolder = '.'
sfolder = unicode(os.path.abspath(sfolder), "cp1251")
mt.runMainTask(shash, sfolder, str(counter))
counter += 1

Вывод потоков может быть перемешан, но это сделано для наглядности.

Стоит так же отметить, что преобразование пути в кодировку utf8 вызвано проблемами с SQLAlchemy, как то он странно обрабатывает кодировку cp1251.

Если кто-то не знает как получить md5-хеш файла, вот пример:

import md5

f = open("path_to_file", "rb")
hash = md5.new(f.read()).hexdigest()
f.close()
print hash

Архив с исходниками тут. В архиве в корне программа с использованием потоков, в папке line - без.

Пожалуйста, оцените полезность и качество данной статьи. Одна звезда - плохо, 5 - хорошо.
1/5. Мы будем признательны, если вы напишете комментарий с причиной низкой оценки.2/5. Мы будем признательны, если вы напишете комментарий с причиной низкой оценки.3/5. Мы будем признательны, если вы напишете комментарий с причиной низкой оценки.4/5.5/5. (Еще не оценили)
Загрузка...
  1. duh_seti
    16 декабря 2008 в 20:10 | #1

    Блин, чувак, долго не мог понять, как эти гребаные замки использовать, поглядел листинг- а оно проще пареной репы :)

  2. lizz
    16 декабря 2008 в 22:15 | #2

    Кстати, скоро выложу вариант по-круче ;). Задачку делал для универа, как оказалось, не совсем то, что надо %).

  1. 21 декабря 2008 в 01:15 | #1