Calcul parallèle en python avec dask
Il existe beaucoup de façon de faire du calcul parallèle en python. Dask est une solution agréable, testée et mise à profit par une communauté assez large d'utilisateur. Elle n'est pas conçue pour les très gros clusters (les communications n'y seraient pas optimisées), mais le notre (celui du LMO) n'entre pas dans cette catégorie :)
Vous trouverez une documentation complète sur dask.org. Dask construit de graphes de taches, qui peuvent ensuite automatiquement être répartis sur les cores et les machines disponible.
Avec une seule machine
Voici un exemple de programme avec exécution en local
import numpy as np
import dask.array as da
# on peut transformer une fonction en
# un version paresseuse qui génére
# un noeud dans le graphe de taches
@dask.delayed
def inc(x):
return x + 1
# données numpy standard (on n'est pas obligés
# de partir du données numpy, c'est juste pour
# montrer l'interopérabilité)
data = np.arange(100_000).reshape(200, 500)
# on découpe le tableau en sous-partie
# pour favoriser le calcul parallèle
a = da.from_array(data, chunks=(100, 100))
# on construit un graph avec les opérations
# à effectuer sur chaque partie
m = inc( np.sin( a ) ).mean()
# on demande l'exécution du graph
print( m.compute() )
Par défaut, Dask utilise tous les cœurs disponibles en local (indépendamment de slurm).
Avec plusieurs machines
Si la puissance d'une seule machine ne suffit pas vous pouvez demander à dask_jobqueue
de lancer plusieurs jobs pour vous.
La bonne nouvelle, c'est que dask_jobqueue
va lancer pour vous les srun
. La mauvaise nouvelle, c'est que dask_jobqueue
ne semble pas vraiment conçu pour les clusters hétérogènes : il prendra un nombre fixe de cores par job. Il y aura donc dans certains cas un coût de communication plus important... mais pas forcément rédhibitoire (il s'agira de communications locales).
Voici un exemple d'utilisation:
# pip install dask_jobqueue si ce n'est pas déjà installé
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
cores=4, # il faut mettre grand pour favoriser le multithreading
# mais pas trop grand, pour ne pas restreindre les
# machines à utiliser (c'est le nombre de cores
# PAR JOB)
memory='1GB',
name="mon_proj",
asynchronous=True
)
cluster.scale( jobs=2 ) # le nombre de machines à utiliser
from dask.distributed import Client
client = Client(cluster)
import dask.array as da
... # le même programme que précédemment