Asyncio Python : Programmation asynchrone simplifiée

asyncio est un nouveau module inclus dans le cœur de Python (built-in module) depuis la version 3.4 du langage. Ce dernier permet de simplifier la programmation concurrente et asynchrone qui est bien plus complexe que la programmation séquentielle.

Il faut savoir qu'en terme de programmation il existe plusieurs types de paradigmes, et asyncio permet de faire de la programmation concurrente et séquentielle avec le même langage sans changer la syntaxe, il y a juste quelques subtilités que nous allons voir pour bien comprendre ce module qui a le vent en poupe.

Petite précision pour la suite

Ce qu'est un paradigme

Un paradigme de programmation est une façon d'approcher la programmation informatique et de traiter les solutions aux problèmes et leur formulation dans un langage de programmation approprié. Il s'oppose à la méthodologie, qui est une manière d'organiser la solution des problèmes spécifiques du génie logiciel.

Paradigme (programmation)

Concurrent, parallèle, asynchrone, quels sont les différences ?

Parfois, un beau dessin est mieux qu'un long discours

parallèle

concurrent

asynchrone

source : quora

Les conditions de tests

En gardant à l'esprit que tout sera fait en Python ainsi que pour valider et vous permettre de reproduire ces tests, je vais poser quelques hypothèses de départ :

  1. L'interpréteur Python utilisé sera Python 3.7.0
  2. Les scripts seront exécutés dans un container Docker.
  3. Le code sera le plus simple possible, donc améliorable ! Gardez à l'esprit que le but est juste de vous montrer les différences entre chaque cas et de ne pas avoir la meilleure réponse pour chaque cas.
  4. Le nombre de requêtes à exécuter NUM_REQUESTS est de 1000
  5. Le nombre de tâches concurrents NUM_WORKERS est de 100

Le cas étudié

Pour exposer mes dires, je vais partir d'un sujet très simple :

exécuter une requête HTTP de type GET

Le container Docker

FROM python:3.7-alpine
RUN apk update 
&& apk upgradeRUN apk add -f build-base python3-devRUN mkdir /app
WORKDIR /app
VOLUME /app
ADD ./requirements.txt /app/requirements.txtRUN pip install -r requirements.txtCMD ["sh"]

On part d'une base 3.7-alpine pour sa légèreté que l'on met à jour.

Nous rajouterons des paquets nécessaires à l'installation du module Python uvloop que nous verrons plus tard.

On crée un dossier de travail auquel on ajoute le fichier requirements.txt et on choisit d'avoir comme commande par défaut un Shell sh

Les modules Python supplémentaires

Vous trouverez ci-dessous le contenu du fichier requirements.txt

requests
uvloop

La fonction principale

def request_example(request_number):
    loop_start = time.time()
try:
    requests.get("http://example.com/")

except requests.exceptions.ConnectionError:
    pass

finally:
    execution_time = time.time() - loop_start
    print("request {}: {}s".format(request_number, execution_time))
    return execution_time

L'utilisation du domaine example.com permet d'une part de faire des tests et peut-être utilisé sans ce soucier d'éventuels effets indésirables. Ces spécifications viennent de la RFC 2606 Reserved Top Level DNS Names

Les différentes implémentations

L'exécution de fonction simple

Le code

# -*- coding: utf-8 -*-
import time
from . import NUM_REQUESTS, request_example

if name == "main":
    start = time.time()
    results = [request_example(_) for _ in range(NUM_REQUESTS)]
    end = time.time()
    
    print("-----")
    print("Min: {}s".format(min(results)))
    print("Max: {}s".format(max(results)))
    print("-----")
    print("Total: {}s".format(end - start))

L'explication

Comme on peut le voir, la fonction est dans une liste compréhension. Il n'y a rien de particulier à noter.

Le résultat

  • Min: 0,008s
  • Max: 2,735s
  • Total: 269,234s

L'exécution de coroutines avec asyncio

Le code

# -*- coding: utf-8 -*-
import asyncio
import time
from . import NUM_REQUESTS, request_example

async def main():
    loop = asyncio.get_event_loop()  # run_in_executor(executor, function, *args)
    futures = [
        loop.run_in_executor(
            None,
            request_example,
            _
        ) for _ in range(NUM_REQUESTS)
    ]

    responses = []
    for response in await asyncio.gather(*futures):
        responses.append(response)

    return responses

if name == 'main':
    event_loop = asyncio.get_event_loop()start = time.time()
    
    try:
        results = event_loop.run_until_complete(main())
    finally:
        event_loop.close()

    print("-----")
    print("Min: {}s".format(min(results)))
    print("Max: {}s".format(max(results)))
    print("-----")
    print("Total: {}s".format(time.time() - start))

L'explication

L'étape initiale

On remarque que le code devient tout d'un coup plus long. Pour commencer il faut déjà importer le module asyncio, ce qui est assez normal si l'on veut l'utiliser.

import asyncio

Puis il faut instancier une boucle d'événements (event_loop). C'est l'élément central fourni par asyncio pour :

  • Enregistrer, exécuter et annuler les tâches.
  • Créer différents protocoles de communication de type client/serveur.
  • De lancer des sous-tâches et d'y associer des protocoles pour communiquer avec des programmes externes.
  • Déléguer les appels à des fonctions coûteuses à un pool de threads.

Ensuite choisir le type d'exécution, on a le choix en run_forever et run_until_complete, pour le coup le nom des méthodes est transparent.

La première sera exécutée indéfiniment jusqu'à ce qu'une erreur se produise ou que l'on demande explicitement son arrêt.

La seconde sera exécutée jusqu'à ce que la tâche (Future) que l'on passe en paramètre, se termine ou échouée.

Et enfin, on ferme la boucle d'événements.

event_loop = asyncio.get_event_loop()
try:
    results = event_loop.run_until_complete(main())

finally:
    event_loop.close()

La tâche qui va être transmise à la boucle d'événements

On entre dans le cœur de cette nouvelle façon de programmer, on constate dans un premier temps que la fonction est préfixée par le mot clé async, cela permet de traiter cette fonction comme une coroutine, définie par la PEP 492 – Coroutines with async and await syntax.

async def main():

C'est de cette façon que l'interpréteur Python va savoir que cette fonction devra avoir un traitement différent.

On va tout d'abord se rattacher à la boucle d'événements

loop = asyncio.get_event_loop()

Puis définir les tâches qui devront être exécutées, nous choisirons de les transmettre à l'exécuteur (concurrent.futures.Executor) en utilisant la méthode suivante run_in_executor, nous utiliserons l'exécuteur par défaut pour le moment (le premier paramètre qui est à None). Cela pour exécuter la fonction request_example avec pour argument _.

futures = [
    loop.run_in_executor(
        None,
        request_example,
        _
    ) for _ in range(NUM_REQUESTS)
]

Pour le moment nous avons une liste de tâches prêtes à être exécutées que l'on va transmettre à la boucle d'événements en utilisant la méthode suivante asyncio.gather et que l'on va parcourir pour y lire le résultat :

responses = []
for response in await asyncio.gather(*futures):
    responses.append(response)

Le mot clé await qui permet de suspendre l'exécution de la coroutine (la tâche qui aura été piochée dans la liste de tâches).

Le résultat

  • Min: 0,166s
  • Max: 2,735s
  • Total: 12,048s

L'exécution de coroutines avec asyncio en précisant l'exécuteur (ThreadPoolExecutor)

Le code

# -*- coding: utf-8 -*-
import asyncio
import concurrent.futures
import time
from . import NUM_REQUESTS, NUM_WORKERS, request_example

async def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    loop = asyncio.get_event_loop()    # run_in_executor(executor, function, *args)
    futures = [
        loop.run_in_executor(
            executor,
            request_example,
            _
        ) for _ in range(NUM_REQUESTS)
    ]

    responses = []
    for response in await asyncio.gather(*futures):
        responses.append(response)

    return responses
    
if name == 'main':
    event_loop = asyncio.get_event_loop()start = time.time()
    try:
        results = event_loop.run_until_complete(main())
    
    finally:
        event_loop.close()

    print("-----")
    print("Min: {}s".format(min(results)))
    print("Max: {}s".format(max(results)))
    print("-----")
    print("Total: {}s".format(time.time() - start))

L'explication

Comme vous pouvez le constater, l'initialisation de la boucle d'événements reste la même, ainsi que la façon de construire les tâches à exécuter et le traitement des résultats.

La seule différence est que l'on ne va pas utiliser l'exécuteur par défaut, mais transmettre un que l'on aura construit nous même.

Dans un premier temps il faut importer le module qui va nous permettre de définir l'exécuteur :

import concurrent.futures

Puis on va définir le passer en argument lors de la construction de la liste des tâches


with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    loop = asyncio.get_event_loop()
    futures = [
        loop.run_in_executor(
            executor,
            request_example,
            _
        ) for _ in range(NUM_REQUESTS)
    ]

Le résultat

  • Min: 0,173s
  • Max: 2,736s
  • Total: 3,525s

L'exécution de coroutines avec asyncio en précisant l'exécuteur (ThreadPoolExecutor) et le moteur pour la boucle d'événements (uvloop)

Ça va loin non?

Nous avons vu que l'on pouvait construire de simple coroutines pour avoir des tâches asynchrones et que l'on pouvait choisir l'exécuteur de tâches pour gagner en performances mais on peut aller encore plus loin car il est possible de choisir aussi le type de boucle d'événements.

Pour illustrer cet exemple,  je vais vous présenter le module uvloop. Ce dernier est extrêmement simple d'utilisation et est surtout basé sur libuv. Oui, vous avez bien lu, la librairie C qui est au cœur de Node.js et qui permet de gérer les événements asynchrones !

Alors ... Oui, ça va loin !!!

Le code

# -*- coding: utf-8 -*-
import asyncio
import concurrent.futures
import timeimport uvloop
from . import NUM_REQUESTS, NUM_WORKERS, request_example


async def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
        loop = asyncio.get_event_loop()    # run_in_executor(executor, function, *args)
        futures = [
            loop.run_in_executor(
                executor,
                request_example,
                _
            ) for _ in range(NUM_REQUESTS)
        ]

        responses = []
        for response in await asyncio.gather(*futures):
            responses.append(response)

        return responses
        
if name == 'main':
    # Change event loop policy
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    event_loop = asyncio.get_event_loop()

    start = time.time()
    try:
        results = event_loop.run_until_complete(main())
    finally:
        event_loop.close()

    print("-----")
    print("Min: {}s".format(min(results)))
    print("Max: {}s".format(max(results)))
    print("-----")
    print("Total: {}s".format(time.time() - start))

L'explication

C'est quasiment la même chose que le cas précédents, la seule différence vient de ces 2 lignes

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

La première permet d'importer le module uvloop et la seconde permet de changer le moteur de boucle d'événements pour asyncio.

Le résultat

  • Min: 0,008s
  • Max: 2,682s
  • Total: 2,951s

Quels enseignements en tirer ?

La syntaxe

Ce que l'on constate dans un premier temps, c'est que la syntaxe reste très proche de ce que l'on connaît et que si l'on découpe bien son code, on peut utiliser la même fonction aussi bien dans du code simple que dans du code asynchrone utilisant asyncio. Il y a quelques subtilités, mais elles sont assez décoratives car elles concernent principalement l'initialisation de la boucle d'événements.

La personnalisation

J'entends par là que l'on peut partir sur une implémentation très simple fournie en standard par le langage mais on peut aussi choisir quand cela est nécessaire de personnalisation chacun des éléments pour avoir soit de meilleures performances soit une gestion plus adaptée à ces besoins.

Les performances

On y arrive, généralement lorsque l'on fait usage de la programmation concurrente, c'est que l'on peut rendre ces tâches indépendantes et que l'on a besoin qu'elles soient exécutées indépendamment. Soit pour de la rapidité, soit pour de la disponibilité. C'est à dire que si une tâche prend plus de temps qu'une autre, il ne faut pas qu'elle bloque les autres. Ce cas qui ne fut pas spécialement exposé ici, mais on peut facilement deviner que si une des requêtes mettaient plus de temps à répondre, les autres pourraient s'exécuter en même temps.

Si l'on résume les résultats mesurés dans un tableau comparatif

Mesure Simple Async Async + Executor Async + Executor + uvloop
Temps d'exécution total 269,234s 12,048s 3,525s 2,951s
Δ Référence Ø ×22,34 ×76,37 ×91,23
Δ implémentation précédente Ø ×22,34 ×3,41 ×1,19

La complexité

Quand s'intéresse à la programmation concurrente, c'est comme ouvrir une boîte de Pandore. C'est génial tant que l'on ne regarde pas ce qu'il y a sous le capot :

  • Des concepts très difficiles à prendre en main et qui sont souvent contre-intuitifs (voir la taille de la documentation Python pour asyncio).
  • L'étape de débogage qui est complexifiée du fait de la nature de l'exécution.
  • Tout les travers de la programmation concurrentes (gestion des blocages, difficulté à suivre le schéma d'exécution, ...)

Mais il y a une belle page dans la documentation qui vous permettra d'appréhender sereinement le sujet.


Sources :