Чтобы понять многопоточность, сначала вникнем, что такое процесс. Процесс – это часть виртуальной памяти и ресурсов, которую ОС выделяет для выполнения программы. Если открыть несколько экземпляров одного приложения, под каждый система выделит по процессу. В современных браузерах за каждую вкладку может отвечать отдельный процесс.
Вы наверняка сталкивались с «Диспетчером задач» в Windows (в Linux — «Системный монитор») и знаете, что лишние запущенные процессы грузят систему, а самые «тяжёлые» из них часто зависают, так что их приходится завершать принудительно.
Но пользователи любят многозадачность: хлебом не корми — дай открыть с десяток окон и попрыгать туда-сюда. Налицо дилемма: нужно обеспечить одновременную работу приложений и при этом снизить нагрузку на систему, чтобы она не тормозила. Допустим, «железу» не угнаться за потребностями владельцев — нужно решать вопрос на программном уровне.
Мы хотим, чтобы в единицу времени процессор успевал выполнить больше команд и обработать больше данных. То есть нам надо уместить в каждом кванте времени больше выполненного кода. Представьте единицу выполнения кода в виде объекта — это и есть поток.
К сложному делу легче подступиться, если разбить его на несколько простых. Так и при работе с памятью: «тяжёлый» процесс делят на потоки, которые занимают меньше ресурсов и скорее доносят код до вычислителя (как именно — см. ниже).
У каждого приложения есть как минимум один процесс, а у каждого процесса — минимум один поток, который называют главным, и из которого при необходимости запускают новые.
-
Потоки используют память, выделенную под процесс, а процессы требуют себе отдельное место в памяти. Поэтому потоки создаются и завершаются быстрее: системе не нужно каждый раз выделять им новое адресное пространство, а потом высвобождать его.
-
Процессы работают каждый со своими данными — обмениваться чем-то они могут только через механизм межпроцессного взаимодействия. Потоки обращаются к данным и ресурсам друг друга напрямую: что изменил один — сразу доступно всем. Поток может контролировать «собратьев» по процессу, в то время как процесс контролирует исключительно своих «дочек». Поэтому переключаться между потоками быстрее и коммуникация между ними организована проще.
Какой отсюда вывод? Если вам нужно как можно быстрее обработать большой объём данных, разбейте его на куски, которые можно обрабатывать отдельными потоками, а затем соберите результат воедино. Это лучше, чем плодить жадные до ресурсов процессы.
Но почему такое популярное приложение как Firefox идёт по пути создания нескольких процессов? Потому что именно для браузера изолированная работа вкладок — это надёжно и гибко. Если с одним процессом что-то не так, необязательно завершать программу целиком — есть возможность сохранить хотя бы часть данных.
Что такое многопоточность? Вот мы и подошли к главному. Многопоточность — это когда процесс приложения разбит на потоки, которые параллельно — в одну единицу времени — обрабатываются процессором.
Вычислительная нагрузка распределяется между двумя или более ядрами, так что интерфейс и другие компоненты программы не замедляют работу друг друга.
Многопоточные приложения можно запускать и на одноядерных процессорах, но тогда потоки выполняются по очереди: первый поработал, его состояние сохранили — дали поработать второму, сохранили — вернулись к первому или запустили третий, и т. д.
В информатике поток — это минимальная единица работы, запланированная для выполнения операционной системой.
О потоках нужно знать следующее:
- Они существуют внутри процесса;
- В одном процессе может быть несколько потоков;
- Потоки в одном процессе разделяют состояние и память родительского процесса.
- Потоки работают параллельно.
В Python существует встроенный модуль threading
, самым простым примером использования будет следующий код:
import time
from threading import Thread
def sleep_me(i):
print("Поток %i засыпает на 5 секунд.\n" % i)
time.sleep(5)
print("Поток %i сейчас проснулся.\n" % i)
for i in range(10):
th = Thread(target=sleep_me, args=(i,))
th.start()
Вывод будет примерно следующим:
Поток 0 засыпает на 5 секунд.
Поток 3 засыпает на 5 секунд.
Поток 1 засыпает на 5 секунд.
Поток 4 засыпает на 5 секунд.
Поток 2 засыпает на 5 секунд.
Поток 5 засыпает на 5 секунд.
Поток 6 засыпает на 5 секунд.
Поток 7 засыпает на 5 секунд.
Поток 8 засыпает на 5 секунд.
Поток 9 засыпает на 5 секунд.
Поток 0 сейчас проснулся.
Поток 3 сейчас проснулся.
Поток 1 сейчас проснулся.
Поток 4 сейчас проснулся.
Поток 2 сейчас проснулся.
Поток 5 сейчас проснулся.
Поток 6 сейчас проснулся.
Поток 7 сейчас проснулся.
Поток 8 сейчас проснулся.
Поток 9 сейчас проснулся.
Порядок может быть вообще любым, и мы этот порядок не контролируем!
Эта функция возвращает количество исполняемых на текущий момент потоков. Изменим последнюю программу, чтобы она выглядела вот так:
import time
import threading
from threading import Thread
def sleep_me(i):
print("Поток %i засыпает на 5 секунд." % i)
time.sleep(5)
print("Поток %i сейчас проснулся." % i)
for i in range(10):
th = Thread(target=sleep_me, args=(i,))
th.start()
print("Запущено потоков: %i." % threading.active_count())
Результат будет примерно такой:
Поток 0 засыпает на 5 секунд.
Запущено потоков: 2.
Поток 1 засыпает на 5 секунд.
Запущено потоков: 3.
Поток 2 засыпает на 5 секунд.
Запущено потоков: 4.
Поток 3 засыпает на 5 секунд.
Запущено потоков: 5.
Поток 4 засыпает на 5 секунд.
Запущено потоков: 6.
Поток 5 засыпает на 5 секунд.
Запущено потоков: 7.
Поток 6 засыпает на 5 секунд.
Запущено потоков: 8.
Поток 7 засыпает на 5 секунд.
Запущено потоков: 9.
Поток 8 засыпает на 5 секунд.
Запущено потоков: 10.
Поток 9 засыпает на 5 секунд.
Запущено потоков: 11.
Поток 0 сейчас проснулся.
Поток 5 сейчас проснулся.
Поток 2 сейчас проснулся.
Поток 9 сейчас проснулся.
Поток 3 сейчас проснулся.
Поток 7 сейчас проснулся.
Поток 1 сейчас проснулся.
Поток 8 сейчас проснулся.
Поток 6 сейчас проснулся.
Поток 4 сейчас проснулся.
Также обратите внимание, что после запуска всех потоков счетчик показывает число 11, а не 10. Причина в том, что основной поток также учитывается наравне с 10 остальными.
Имеется ряд проблем, возникающих при использовании многопоточности – попытка множества потоков получить доступ к одному
и тому же фрагменту данных может привести к проблемам несовместимости или получению искаженной информации (например,
фраза HWeol,rldo
вместо Hello, World
на консоли). Подобные проблемы возникают, когда компьютеру не указан способ
организации потоков.
Как правильно приказать компьютеру синхронизировать потоки? Для этого используются примитивы синхронизации — простые программные механизмы, обеспечивающие гармоничное взаимодействие потоков друг с другом.
В этом посте представлены некоторые популярные примитивы синхронизации в Python, определенные в стандартном модуле
threading.py
.
Изучим Locks
, RLocks
, Semaphores
, Events
, Conditions
и Barriers
. Разумеется, можно создавать собственные
примитивы пользовательской синхронизации. Начнем с Locks
как с простейшего из примитивов и постепенно перейдем к более
сложным.
Примитив Lock
, простейший примитив в Python. Для Lock
возможны только два состояния ‑ заблокирован и разблокирован.
Примитив создается в разблокированном состоянии и содержит два метода – acquire()
и release()
. Метод acquire()
блокирует Lock
и выполнение блока до тех пор, пока метод release()
из другой сопрограммы не разблокирует его. Затем
он снова блокирует Lock
и возвращает значение True
. Метод release()
вызывается только в заблокированном состоянии
– устанавливает состояние разблокировки и немедленно возвращает управление. Вызов release()
в разблокированном
состоянии приводит к RunTimeError
.
from threading import Lock, Thread
lock = Lock()
def add_one(li):
lock.acquire()
try:
li.append(1)
finally:
lock.release()
def add_two(li):
lock.acquire()
try:
li.append(2)
finally:
lock.release()
# то же самое, что и конструкция:
# with lock:
# li.append(2)
threads = []
list_to_append = []
for func in [add_one, add_two]:
threads.append(Thread(target=func, args=(list_to_append,)))
threads[-1].start()
for thread in threads:
"""
Waits for threads to complete before moving on with the main script.
"""
thread.join()
print(list_to_append)
Если не использовать Lock
, то мы не можем быть уверены, что в конце получится [1, 2]
, могло бы получится и [2, 1]
Стандартный Lock
не знает, какой поток блокируется в данный момент. Если блокировка сохраняется, блокируется любой из
потоков, пытающихся получить доступ, даже если этот тот же самый поток, который уже удерживает блокировку. Именно для
таких случаев и используется RLock
— блокировка повторного входа. Вы можете расширить код в следующем фрагменте,
добавив выходные инструкции для демонстрации возможностей RLock
предотвращать нежелательную блокировку.
import threading
num = 0
lock = threading.Lock()
lock.acquire()
num += 1
lock.acquire() # This will block.
num += 2
lock.release()
# With RLock, that problem doesn't happen.
lock = threading.RLock()
lock.acquire()
num += 3
lock.acquire() # This won’t block.
num += 4
lock.release()
lock.release() # You need to call release once for each call to acquire.
Возможно рекурсивное использование RLock
— когда родительский вызов функции блокирует вложенный вызов. Таким образом,
RLock
используются для вложенного доступа к общим ресурсам.
Семафоры – это просто дополнительные счетчики. Вызов acquire()
будет блокироваться семафором только после превышения
определенного количества запущенных потоков acquire()
. Значение соответствующего счетчика уменьшается на каждый вызов
acquire()
и увеличивается на каждый вызов release()
. Значение ValueError
будет возникать, если вызовы release()
будут пытаться увеличивать значение счетчика после достижения заданного максимального значения (количества потоков,
которые допустимы семафором acquire()
до применения блокировки). Следующий код демонстрирует использование семафоров
для простой задачи производитель-потребитель.
from threading import Thread, BoundedSemaphore
from time import sleep, time
ticket_office = BoundedSemaphore(value=3)
def ticket_buyer(number):
start_service = time()
with ticket_office:
sleep(1)
print(f"client {number}, service time: {time() - start_service}")
buyer = [Thread(target=ticket_buyer, args=(i,)) for i in range(5)]
for b in buyer:
b.start()
Примерный вывод:
client 0, service time: 1.0011110305786133
client 2, service time: 1.0013604164123535
client 1, service time: 1.001556158065796
client 3, service time: 2.002437114715576
client 4, service time: 2.0027763843536377
Как только первые потоки освободились, работу начали следующие.
События по своему назначению и алгоритму работы похожи на рассмотренные ранее условные переменные. Основная задача,
которую они решают – это взаимодействие между потоками через механизм оповещения. Объект класса Event
управляет
внутренним флагом, который сбрасывается с помощью метода clear()
и устанавливается методом set()
. Потоки, которые
используют объект Event
для синхронизации блокируются при вызове метода wait()
, если флаг сброшен.
Методы класса Event
:
-
is_set()
возвращает True, если флаг находится во взведенном состоянии. -
set()
переводит флаг во взведенное состояние. -
clear()
переводит флаг в сброшенное состояние. -
wait(timeout=None)
блокирует вызвавший данный метод поток, если флаг соответствующего Event-объекта находится в сброшенном состоянии. Время нахождения в состоянии блокировки можно задать через параметрtimeout
.
from threading import Thread, Event
event = Event()
def worker(name: str):
event.wait() # ждём, пока флаг не изменится
print(f"Worker: {name}")
# Clear event
event.clear()
# Create and start workers
workers = [Thread(target=worker, args=(f"wrk {i}",)) for i in range(5)]
for w in workers:
w.start()
print("Main thread")
event.set() # Взводим флаг, чем и запускам функции сверху
Main thread
Worker: wrk 1
Worker: wrk 2
Worker: wrk 3
Worker: wrk 4
Worker: wrk 0
Их порядок мы не контролируем, только событие, по которому они срабатывают.
События на стероидах.
При создании объекта Condition
вы можете передать в конструктор объект Lock
или RLock
, с которым хотите работать.
Перечислим методы объекта Condition
с кратким описанием:
-acquire(*args)
- захват объекта-блокировки.
-release()
- освобождение объекта-блокировки.
-wait(timeout=None)
- блокировка выполнения потока до оповещения о снятии блокировки. Через параметр timeout
можно
задать время ожидания оповещения о снятии блокировки. Если вызвать wait()
на условной переменной, у которой
предварительно не был вызван acquire()
, то будет выброшено исключение RuntimeError
.
-notify(n=1)
снимает блокировку с остановленного методом wait()
потока. Если необходимо разблокировать несколько
потоков, то для этого следует передать их количество через аргумент n
.
-notify_all()
снимает блокировку со всех остановленных методом wait()
потоков.
from threading import Thread, Condition
condition = Condition()
def worker_wait(name: str):
condition.acquire()
print(f"Worker: {name} after ac")
condition.wait()
print(f"Worker: {name} after w")
condition.release()
def worker_notify(name: str):
condition.acquire()
print(f"Worker: {name} after ac")
condition.notify()
print(f"Worker: {name} after n")
condition.release()
# Create and start workers
workers_wait = [Thread(target=worker_wait, args=(f"wrk {i}",)) for i in range(5)]
workers_notify = [Thread(target=worker_notify, args=(f"wrk {i + 5}",)) for i in range(5)]
for w in workers_wait:
w.start()
for w in workers_notify:
w.start()
Результат:
Worker: wrk 0 after ac
Worker: wrk 1 after ac
Worker: wrk 2 after ac
Worker: wrk 3 after ac
Worker: wrk 4 after ac
Worker: wrk 5 after ac
Worker: wrk 5 after n
Worker: wrk 0 after w
Worker: wrk 6 after ac
Worker: wrk 6 after n
Worker: wrk 1 after w
Worker: wrk 7 after ac
Worker: wrk 7 after n
Worker: wrk 8 after ac
Worker: wrk 8 after n
Worker: wrk 2 after w
Worker: wrk 9 after ac
Worker: wrk 9 after n
Worker: wrk 3 after w
Worker: wrk 4 after w
Process finished with exit code 0
Первые 5 запускаются и останавливаются на команде wait()
.
6-ой (с номером 5) тоже блокируется и вызывает notify()
, чем и "отпускает" поток с номером 0.
Модуль threading
предоставляет удобный инструмент для запуска задач по таймеру – класс Timer
. При создании таймера
указывается функция, которая будет выполнена, когда он сработает. Timer
реализован как поток, является наследником от
Thread
, поэтому для его запуска необходимо вызвать start()
, если необходимо остановить работу таймера, то вызовите
cancel()
.
Конструктор класса Timer
:
Timer(interval, function, args=None, kwargs=None)
Параметры:
-
interval
- количество секунд, по истечении которых будет вызвана функцияfunction
. -
function
- функция, вызов которой нужно осуществить по таймеру. -
args
,kwargs
- аргументы функцииfunction
.
Методы класса Timer
:
cancel()
останавливает выполнение таймера.
from threading import Timer
timer = Timer(interval=3, function=lambda: print("Message from Timer!"))
timer.start()
Программа пойдёт дальше, а функция будет выполнена через 3 секунды.
Еще бывает барьер, но его мы рассматривать не будем. Он позволяет реализовать алгоритм, когда необходимо дождаться завершения работы группы потоков, прежде чем продолжить выполнение задачи.
Шикарная статья на тему
Python Global Interpreter Lock (GIL) — это своеобразная блокировка, позволяющая только одному потоку управлять интерпретатором Python. Это означает, что в любой момент времени будет выполняться только один конкретный поток.
Работа GIL может казаться несущественной для разработчиков, создающих однопоточные программы. Но во многопоточных программах отсутствие GIL может негативно сказываться на производительности процессоро-зависимых программ.
Поскольку GIL позволяет работать только одному потоку даже в многопоточном приложении, он заработал репутацию «печально известной» функции.
Фактически Python не вызывает много потоков одновременно, а только очень быстро их переключает, что делает все многопоточные вычисления по факту однопоточными.
Python подсчитывает количество ссылок для корректного управления памятью. Это означает, что созданные в Python объекты имеют переменную подсчёта ссылок, в которой хранится количество всех ссылок на этот объект. Как только эта переменная становится равной нулю, память, выделенная под этот объект, освобождается.
Вот небольшой пример кода, демонстрирующий работу переменных подсчёта ссылок:
import sys
a = []
b = a
sys.getrefcount(a)
3
В этом примере количество ссылок на пустой массив равно 3. На этот массив ссылаются: переменная a
, переменная b
и
аргумент, переданный функции sys.getrefcount()
.
Проблема, которую решает GIL, связана с тем, что в многопоточном приложении сразу несколько потоков могут увеличивать или уменьшать значения этого счётчика ссылок. Это может привести к тому, что память очистится неправильно и удалится тот объект, на который ещё существует ссылка.
Счётчик ссылок можно защитить, добавив блокираторы на все структуры данных, которые распространяются по нескольким потокам. В таком случае счётчик будет изменяться исключительно последовательно.
Но добавление блокировки к нескольким объектам может привести к появлению другой проблемы — взаимоблокировки (англ. deadlocks), которая получается только если блокировка есть более чем на одном объекте. К тому же эта проблема тоже снижала бы производительность из-за многократной установки блокираторов.
GIL
— это одиночный блокиратор самого интерпретатора Python. Он добавляет правило: любое выполнение байт-кода в Python
требует блокировки интерпретатора. В таком случае можно исключить взаимоблокировку, т. к. GIL
будет единственной
блокировкой в приложении. К тому же его влияние на производительность процессора совсем не критично. Однако стоит
помнить, что GIL уверенно делает любую программу однопоточной.
Несмотря на то, что GIL
используется и в других интерпретаторах, например в Ruby, он не является единственным решением
этой проблемы. Некоторые языки решают проблему потокобезопасного освобождения памяти с помощью сборки мусора.
Если GIL
у вас вызывает проблемы, вот несколько решений, которые вы можете попробовать:
Многопроцессорность против многопоточности. Довольно популярное решение, поскольку у каждого Python-процесса есть собственный интерпретатор с выделенной под него памятью, поэтому с GIL проблем не будет.
Корутины. О них на следующем занятии.
Сначала поговорим о параллельной обработке. Это способ одновременно разбивать и запускать программные задачи на нескольких микропроцессорах. По сути, это попытка сократить время обработки и это то, чего мы можем достичь с помощью компьютера с двумя или более процессорами, или с использованием компьютерной сети. Мы также называем это параллельными вычислениями.
Итак, теперь перейдем к Python Multiprocessing, это способ повысить производительность путем создания параллельного кода. Производители процессоров делают это возможным, добавляя больше ядер к своим процессорам. В многопроцессорной системе приложения разбиваются на более мелкие подпрограммы для самостоятельной работы. Взгляните на однопроцессорную систему. Учитывая несколько процессов одновременно, он пытается прерывать и переключаться между задачами. Как бы вы себя чувствовали, будучи единственным шеф-поваром на кухне с сотнями клиентов? Вы должны были бы выполнять все обычные задачи от выпечки до замеса теста.
-
Мультипроцессор – компьютер с несколькими центральными процессорами.
-
Многоядерный процессор – один вычислительный компонент с более чем одной независимой фактической единицей обработки/ядрами.
В любом случае процессор может выполнять несколько задач одновременно, назначая процессор для каждой задачи.
Пример:
from multiprocessing import Process
def square(n):
print("Число в квадрате ", n ** 2)
def cube(n):
print("Число в кубе", n ** 3)
if __name__ == "__main__":
p1 = Process(target=square, args=(7,))
p2 = Process(target=cube, args=(7,))
p1.start()
p2.start()
p1.join()
p2.join()
print("Мы закончили")
Отличие от многопоточности в том, что в этом случае каждый отдельный процесс будет выполняться отдельным ядром или процессором, и никак не блокируется GIL.
Но процедура создания нового процесса достаточно дорогостоящая, и нет никакого смысла создавать новый процесс для простых действий.
У каждого процесса есть id, название и т. д. эти данные всегда можно извлечь.
Так же как и с потоками у нас может быть ситуация, когда разные процессы обрабатывают одни и те же данные, и чтобы быть уверенным, что действия не происходят одновременно, мы можем заблокировать процесс, синтаксис идентичен.
from multiprocessing import Process, Lock
lock = Lock()
def printer(item):
lock.acquire()
try:
print(item)
finally:
lock.release()
if __name__ == "__main__":
items = ['nacho', 'salsa', 7]
for item in items:
p = Process(target=printer, args=(item,))
p.start()
Для многопроцессорности работают ровно те же самые блокировки, как и для многопоточности.
Пул - это возможность создать сразу необходимое количество процессов, а не делать это по одному. В данном примере мы сразу создаём 3 процесса для трех параллельных вычислений.
from multiprocessing import Pool
def double(n):
return n * 2
if __name__ == '__main__':
nums = [2, 3, 6]
pool = Pool(processes=3)
print(pool.map(double, nums))
Если нам необходимо вычислять одно действие на трёх процессорах, нам поможет функция apply_async()
:
from multiprocessing import Pool
def double(n):
return n * 2
if __name__ == '__main__':
pool = Pool(processes=3)
result = pool.apply_async(double, (7,))
print(result.get())
Практика/Домашка:
- Написать функцию, которая будет делать запросы (import requests) на
https://google.com
,https://amazon.com
,https://microsoft.com
. Синхронно (обычным способом), многопоточно, многопроцессорно, сравнить время выполнения, сделать выводы.
- 1.1 сделать по 5 запросов на каждый сайт, получить время.
- Написать функцию, которая возводит числа 2, 3 и 5, в 1000000 степень. Синхронно (обычным способом), многопоточно, многопроцессорно, сравнить время выполнения, сделать выводы.
Ответить, какой способ для каких задач годится лучше, и почему.