Lancement de jobs en parallèle en Perl : multithreading
Multiprocess (threads, fork, job parallel...)

Le , par djibril, Responsable Perl et Outils
Salut,

Compte tenu du nombre important de threads à ce sujet, j'ouvre un thread concacré à ce souci de parallelisation de job en perl.
Ce serait bien que chacun participe en donnant des exemples de scripts propres et commentés avec l'utilisation des threads, des fork et autres modules pour le lancement de jobs en paralelle via du multithreading ou de la programmation parallèle.
Ce serait bien aussi d'avoir une idée des modules les plus utilisés et surtout les plus fiables avec leur compatibilité Linux, Windows, Mac OS...

N'ayant jamais été amené à utiliser cette technique ou très peu, je suis bien évidemment intéressé et avec les petites recherches à ce sujet, j'ai déjà listé quelques modules intéressant que voici :
  • module thread : thread, thread::Shared (ici)
  • module threads : threads, threads::Shared (ici)
  • module parallel : Parallel::Simple, Parallel::Jobs, Parallel::Forker (ici)
  • module MPI : Parallel::MPI, Parallel::MPI::Simple, Parallel::Mpich::MPD ... (ici)


Y en a surement d'autres.

A vos claviers et exemples pour ceux qui s'y connaissent déjà bien.

Merci de votre participation

N.B. Dans l'idéal, un tutoriel verrait le jour.


Vous avez aimé cette actualité ? Alors partagez-la avec vos amis en cliquant sur les boutons ci-dessous :


 Poster une réponse

Avatar de Gardyen Gardyen - Membre éclairé https://www.developpez.com
le 22/11/2007 à 14:29
Mon petit laïus sur threads

en premier compatibilité linux/windows:
les threads fonctionnent sous les 2 OS, mais il faut avoir installer la version multithread, qui n'est pas forcement installée par défaut (ce qui a été mon cas sous linux). En cas de réinstallation il est à prévoir de recompiler/réinstaller certains modules (de mémoire j'ai du réinstaller DBI entre autres).
Sous windows, avec ActivePerl 5.8.8 build 819, les threads sont installés par défaut.

petit exemple de threading:
Voila un petit script que j'utilise pour tester les fonctionnalités. Il est largement améliorable !!

Code : Sélectionner tout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/perl 
use strict; 
use warnings; 
 
use Data::Dumper; 
 
use threads; 
use threads::shared; 
use Thread::Semaphore; 
 
my @list = ("command1", "command2", "command3", "command4", "command5", "command6", "command7", "command8", "command9", "command10"); 
 
my $nombre_de_jobs_en_parallele = 4; 
 
# le sémaphore permet de réguler le nombre de jobs en simultané, bloquant le processus principal tant qu'il n'y a pas de place de libre 
my $semaphore = Thread::Semaphore->new($nombre_de_jobs_en_parallele); 
 
# quelques variables partagées 
my $cpt:shared; 
my $failed:shared; 
 
$cpt = 0; 
$failed = 0; 
 
my $started = 0; 
 
# le point d'entrée du job en parallèle 
sub forked_job($$$$$){ 
	my $command = shift; 
	my $index = shift; 
	my $cpt_ref = shift; 
	my $failed_ref = shift; 
	my $sema_ref = shift; 
	 
	# simule un travail de durée indéterminée entre 0 et 10 sec 
	my $int = int 10 * rand(); 
	print "Le job $index va dormir $int sec au lieu d'exécuter la commande $command\n"; 
	sleep($int); 
	 
	# simule une erreur 
	my $str = ""; 
	if ($int == 4){ 
		$str = " avec une erreur"; 
		$$failed_ref++; 
	} 
	print "Le job $index vient de se terminer$str\n"; 
	 
	# incrémente le nombre de jobs finis 
	$$cpt_ref++; 
	 
	# on a une place de libre. Ne pas oublier de libérer le sémaphore même en cas d'erreur 
	$$sema_ref->up(); 
	 
	return; 
} 
 
# démarre tous les jobs, sauf erreur 
while ($started < scalar @list && !$failed){ 
	my $command = $list[$started]; 
	 
	# incrémente le compteur 
	$started++; 
	 
	# avons nous une place de libre ? 
	$semaphore->down(); 
	 
	# si le sémaphore est a 0, le processus principal va se bloquer en attendant une nouvelle place 
	print "Creation du job $started\n"; 
	my $thr = threads->create("forked_job", ( 
		$command, 
		$started, 
		\$cpt, 
		\$failed, 
		\$semaphore 
		) 
	); 
	# détache le job du thread principal, rend la main au thread principal 
	$thr->detach(); 
	 
	# si on veut attendre la fin du job pour redonner la main, on utilise 
	# $thr->join(); 
} 
 
# attend les derniers jobs 
while ($cpt < $started){ 
	print "Seul $cpt jobs finis sur $started, sleeping\n"; 
	sleep(3); 
} 
print "$cpt jobs lances, $failed échoués, sur les ".scalar @list." prévus\n"; 
print "The End.\n";
en attendant les experts
Avatar de Jedai Jedai - Expert éminent https://www.developpez.com
le 29/11/2007 à 13:15
Citation Envoyé par djibril  Voir le message
- module thread : Thread

Attention Thread est le vieux (5.6) module pour les threads, le module actuel est "threads" qui n'utilise plus le même modèle, il est bien plus fiable. threads::shared permet de partager certaines variables et de poser des verrous, attention, tous les types de variables ne peuvent pas être partagés entre threads, en particulier un grand nombre de modules souffrent de ce fait (et d'autres problèmes plus répandus et liés au parallélisme lui-même) et ne sont pas adaptés à un usage multi-thread (n'oubliez pas que ceci est vrai dans beaucoup d'autres langages).
Un point qui diffère entre les threads de Perl et ceux d'autres langages est que par défaut rien n'est partagé entre les threads.

Thread::Queue est sans doute l'un des modules les plus utiles pour véritablement travailler avec des threads, il permet d'envoyer des valeurs d'un thread à l'autre dans une file robuste. Thread::Semaphore fournit l'autre classique du parallélisme, c'est à dire le sémaphore. Ces deux modules sont dans le CORE.

Thread::Appartment et PSiCHE semblent proposer une solution ambitieuse, mais qui requiert sans doute un temps d'adaptation.

En attendant et avec un peu d'imagination, il n'est pas trop difficile de restreindre les modules qui ne sont pas thread-friendly à un seul thread et d'utiliser un système simple comme exposé dans ce post.

--
Jedaï
Avatar de SPKlls SPKlls - Membre régulier https://www.developpez.com
le 20/10/2008 à 21:55
Venant de poster concernant justement le fork()
J'ajoute à mon tour ma contribution.
Voici un code qui illustre fork() et l'utilisation de pipe() pour communiquer des enfants au père.

Code perl : Sélectionner tout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/perl 
  
use strict; 
use warnings; 
use constant { 
    MAX_PROCESS   => 10, 
}; 
  
my ($child, $pid, @childs, @process_list, $child_pid); 
  
  
for (1...MAX_PROCESS) { 
    pipe(FROM_CHILD, TO_PARENT); 
    $pid = fork(); 
    if ($pid) { # Parent code 
        push(@childs, $pid);         
        close TO_PARENT; 
        $child_pid = <FROM_CHILD>; #On lit la réponse du fils 
        chomp($child_pid); 
        close FROM_CHILD; 
        print "My child's pid : $child_pid \n"; 
        push(@process_list, $child_pid); 
    } 
    else {  #child 
        close FROM_CHILD; 
        print TO_PARENT $$,"\n";  #On écrit au parent le processus courant (du fils) 
        close TO_PARENT; 
        print "New process launched (",$_,"/",MAX_PROCESS,"): [$$]\n"; 
        sleep 5; 
        exit(0); 
    } 
} 
  
print "-> @process_list \n"; #resultat les pids de chaque processus enfants 
  
foreach (@childs) { 
    waitpid($_,0); 
}
Avatar de Super_carotte Super_carotte - Membre régulier https://www.developpez.com
le 02/12/2011 à 11:04
Voici un exemple avec Parralel::ForkManager. CF ici

Code : Sélectionner tout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/opt/perl/bin/perl 
 
use strict; 
use warnings; 
 
use ForkManager; 
 
my $pm = Parallel::ForkManager->new(10);        #nombre de processus max 
$pm->run_on_finish( sub { 
    printf "%s : Process completed: @_\n", scalar localtime 
}); 
 
for my $i (1..15) {     #combien d'action a faire 
    $pm->start($i) and next; 
    sleep rand 9; 
    $pm->finish; 
} 
 
printf "%s: Waiting for some child to finish\n", scalar localtime; 
$pm->wait_all_children; 
 
printf "%s: All processes finished.\n", scalar localtime;
Resultat affiché:
Code : Sélectionner tout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
perl ./script.pl 
Fri Dec  2 11:07:05 2011 : Process completed: 11995 0 7 0 0 
Fri Dec  2 11:07:05 2011 : Process completed: 11998 0 10 0 0 
Fri Dec  2 11:07:06 2011 : Process completed: 11992 0 4 0 0 
Fri Dec  2 11:07:07 2011 : Process completed: 11996 0 8 0 0 
Fri Dec  2 11:07:07 2011 : Process completed: 11999 0 11 0 0 
Fri Dec  2 11:07:07 2011: Waiting for some child to finish 
Fri Dec  2 11:07:07 2011 : Process completed: 12041 0 15 0 0 
Fri Dec  2 11:07:08 2011 : Process completed: 11990 0 2 0 0 
Fri Dec  2 11:07:08 2011 : Process completed: 12013 0 13 0 0 
Fri Dec  2 11:07:08 2011 : Process completed: 11997 0 9 0 0 
Fri Dec  2 11:07:09 2011 : Process completed: 11991 0 3 0 0 
Fri Dec  2 11:07:09 2011 : Process completed: 11993 0 5 0 0 
Fri Dec  2 11:07:09 2011 : Process completed: 11994 0 6 0 0 
Fri Dec  2 11:07:09 2011 : Process completed: 12040 0 14 0 0 
Fri Dec  2 11:07:10 2011 : Process completed: 11989 0 1 0 0 
Fri Dec  2 11:07:12 2011 : Process completed: 12000 0 12 0 0 
Fri Dec  2 11:07:12 2011: All processes finished.
Et résultat du ps -ef | grep perl:
Code : Sélectionner tout
1
2
3
4
5
6
7
8
9
10
11
12
user12  11968 16676  2 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11989 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11990 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11991 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11993 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11994 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11996 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11997 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  11999 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  12000 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  12013 11968  0 11:07 pts/3    00:00:00 perl ./script.pl 
user12  12016  4528  0 11:07 pts/5    00:00:00 grep perl
Cet exemple permet de voir la simplicité d'utilisation de ce module que je pense utiliser pour faire un démarrage en parallèle de différents jobs.

(Je suis un tout débutant en perl, si vous estimez que ce post ne sert pas ou n'est pas suffisamment pertinent, supprimez le, je ne vous en voudrai pas )
Avatar de djibril djibril - Responsable Perl et Outils https://www.developpez.com
le 02/12/2011 à 12:17
pour ce programme, toute participation est toujours appréciée .
Avatar de djibril djibril - Responsable Perl et Outils https://www.developpez.com
le 29/12/2013 à 23:33
Je relance le débat !
Avatar de Lolo78 Lolo78 - Rédacteur/Modérateur https://www.developpez.com
le 30/12/2013 à 14:12
J'ai utilisé MPI dans le passé, mais ce n'était pas en Perl (C et Fortran). MPI est surtout utile pour construire des systèmes distribués (processus tournant sur des bécanes différentes, distantes). On peut faire du parallélisme sur une même machine avec MPI, mais ce n'est pas vraiment le but.

Sinon, les seules fois où j'ai fait du parallélisme en Perl, c'était en lançant des processus en tâches de fond depuis le shell sous Unix ou l'équivalent du shell (DCL) sous VMS.
Avatar de JUSTIN Loïc JUSTIN Loïc - Membre habitué https://www.developpez.com
le 31/12/2013 à 11:21
Voici un exemple de script qui permet de lancer X traitements simultanés.
Pour information, ce script fonctionne depuis plusieurs années

Code : Sélectionner tout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
   
  $fichiers="$path  -iname '*nom_du_fichiert_*.$suffix[$k]*'"; 
  # on traite les noms de fichiers suffixés <jour> 
  open ( Filelist ,"/usr/bin/find $fichiers -ls |") 
    or die "Problème sur les noms de fichiers nom_du_fichier_<...>"; 
 #-------------------------------------------------- 
  # traitement des fichiers 
  #-------------------------------------------------- 
  while(<Filelist>) { 
    # pour chaque fichier nom_du_fichier_<cata>.<day> 
    $count_files++; 
    my @name=split(' '); 
    my $filename = $name[10]; 
    my @info_path = split('/',$filename); 
    my @status = stat ( $filename ); 
    my $cata_name = `basename $filename`; 
    chomp $cata_name; 
    $cata_name=~s/.*bpimagelist_//g; 
    $cata_name=~s/\.$suffix[$k].*//g; 
    my $other_cata_name = $info_path[6] ? $info_path[6] : ''; 
       if (( $status[9] > time - $file_max_time ) and ( $cata_name eq $other_cata_name )) { 
      # NOTA : pour eviter d'intégrer des fichiers obsolètes 
      print " jobs_vol_nb - $info_path[4] . $info_path[5] . $other_cata_name . $suffix[$k] : => traitement...\n"; 
      my $catalogue = $other_cata_name; 
      if ($pid = fork) { 
        # dans le processus pere 
        push ( @children, $pid); 
        $nb_process++; 
        if ( $nb_process >= $MAX_PROCESS ) { 
          # on ne lance pas plus de $MAX_PROCESS traitements simultanés 
          # donc on attend la terminaison d'un des fils... 
          $waitedpid = wait; 
          my $children_not_terminated = 1; 
          while ($children_not_terminated) { 
            # la terminaison des fils <<open ( Filelist ,"/usr/bin/find $fichiers -ls |")>> 
            # ne doit pas générer un nouveau fils import_job_vol_nb.pl 
            FOUND : { 
              for (my $i=0; $i < @children; $i++) { 
                if ( $children[$i] == $waitedpid ) { 
                  splice(@children, $i,1); 
                  $children_not_terminated = 0; 
                  last FOUND; 
                } 
              } 
            } 
            if ($children_not_terminated) { 
              $waitedpid = wait; 
            } 
          } 
          $nb_process--; 
        } 
      } 
      else { 
        # dans les processus fils 
        die "cannot fork: $!" unless defined $pid; 
        parse_jobs_vol_nb($filename, $catalogue); 
        exit; 
      } 
    } 
    else { 
      if ( $cata_name eq $other_cata_name ) { 
        print " jobs_vol_nb - $info_path[4] . $info_path[5] . $other_cata_name . $suffix[$k] : OBSOLETE : => ALARME !\n"; 
      } 
    } 
  } 
  close Filelist; 
  # on passe à la liste de fichiers suivante 
}
Ceci étant que la boucle qui gère la liste des fichiers à traiter
Avatar de djibril djibril - Responsable Perl et Outils https://www.developpez.com
le 31/12/2013 à 11:48
Dès que possible, je me lance dans un tutoriel.
Avatar de Lolo78 Lolo78 - Rédacteur/Modérateur https://www.developpez.com
le 31/12/2013 à 19:40
Un autre système distribué que j'ai utilisé et bien aimé est PVM (Parallel Virtual Machine). Ce n'était pas en Perl à l'époque (C et Fortran), mais il y a un module Perl qui a l'air assez complet pour l'utiliser.
Offres d'emploi IT
Chef projet big data - pse flotte H/F
Safran - Ile de France - Évry (91090)
Architecte électronique de puissance expérimenté H/F
Safran - Ile de France - Villaroche - Réau
Responsable transverse - engagement métiers H/F
Safran - Ile de France - Corbeil-Essonnes (91100)

Voir plus d'offres Voir la carte des offres IT
Responsable bénévole de la rubrique Perl : djibril -