Как использовать concurrent.futures для пула потоков?python-42

Модуль concurrent.futures предоставляет высокоуровневый интерфейс для асинхронного выполнения кода с использованием пулов потоков или процессов. Рассмотрим подробно работу с ThreadPoolExecutor.

Основные концепции

  1. Executor - абстракция для пула потоков/процессов
  2. Future - объект, представляющий отложенное вычисление
  3. ThreadPoolExecutor - конкретная реализация для потоков

Базовый пример

from concurrent.futures import ThreadPoolExecutor
import time

def task(name):
    print(f"Задача {name} запущена")
    time.sleep(2)  # Имитация I/O операции
    return f"Результат {name}"

# Создаем пул из 3 потоков
with ThreadPoolExecutor(max_workers=3) as executor:
    # Запускаем задачи
    future1 = executor.submit(task, "A")
    future2 = executor.submit(task, "B")

    # Получаем результаты
    print(future1.result())  # Блокирует пока задача не завершится
    print(future2.result())

Параллельная обработка множества задач

urls = ["url1", "url2", "url3", "url4", "url5"]

def download(url):
    # Имитация загрузки
    time.sleep(1)
    return f"Данные с {url}"

with ThreadPoolExecutor(max_workers=5) as executor:
    # Массовая отправка задач
    futures = [executor.submit(download, url) for url in urls]

    # Асинхронное получение результатов по готовности
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

Использование map

numbers = [1, 2, 3, 4, 5]

def square(x):
    return x * x

with ThreadPoolExecutor() as executor:
    results = executor.map(square, numbers)  # Сохраняет порядок результатов

    for result in results:
        print(result)  # 1, 4, 9, 16, 25

Обработка исключений

def might_fail(n):
    if n == 3:
        raise ValueError("Ошибка на 3")
    return n * 2

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(might_fail, i) for i in range(5)]

    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except ValueError as e:
            print(f"Ошибка: {e}")

Настройка пула

  1. max_workers - количество потоков (по умолчанию = 5 * число ядер)
  2. thread_name_prefix - префикс имен потоков для отладки
executor = ThreadPoolExecutor(
    max_workers=10,
    thread_name_prefix="DownloadThread_"
)

Продвинутые техники

Ограничение времени выполнения

future = executor.submit(long_running_task)
try:
    result = future.result(timeout=5)  # Ждем максимум 5 секунд
except concurrent.futures.TimeoutError:
    print("Задача не завершилась вовремя")
    future.cancel()  # Пытаемся отменить

Callback-функции

def callback(future):
    if future.exception():
        print(f"Ошибка: {future.exception()}")
    else:
        print(f"Результат: {future.result()}")

future = executor.submit(task, "A")
future.add_done_callback(callback)

Практический пример: веб-скрапинг

import requests
from concurrent.futures import ThreadPoolExecutor

def fetch_url(url):
    response = requests.get(url)
    return response.text[:100]  # Возвращаем первые 100 символов

urls = [
    "https://python.org",
    "https://google.com",
    "https://github.com"
]

with ThreadPoolExecutor(max_workers=len(urls)) as executor:
    results = list(executor.map(fetch_url, urls))

for url, content in zip(urls, results):
    print(f"{url}: {content}")

Ограничения

  1. GIL все еще ограничивает CPU-bound задачи
  2. Не подходит для задач, требующих совместного состояния
  3. Избыточное количество потоков может ухудшить производительность

Резюмируем

ThreadPoolExecutor из concurrent.futures предоставляет удобный высокоуровневый API для работы с пулами потоков, идеально подходящий для I/O-bound задач. Он абстрагирует низкоуровневые детали работы с потоками и предлагает удобные интерфейсы для управления асинхронными операциями.