Ressources PYA
TP : Comparaison de deux listes
set1 = {1, 2, 3, 4}
set2 = {3, 4, 5, 6}
# Intersection (elements commun)
intersection = set1 & set2 # Output: {3, 4}
# Union (tout les élements uniques)
union = set1 | set2 # Output: {1, 2, 3, 4, 5, 6}
# Difference (différences dans le set1 mais pas dans le set2)
difference = set1 - set2 # Output: {1, 2}
# Difference Symétrique (soit dans le set1 soit dans le set2)
symmetric_difference = set1 ^ set2 # Output: {1, 2, 5, 6}
TP : Ensemble d'entrée
Créer un programme qui demande à l'utilisateur d'entrer des noms et de les ajouter à une liste. Implémentez des fonctionnalités pour :
Fonction : input()
- Afficher tous les noms dans la liste.
- Supprimer un nom donné par l'utilisateur.
- Trier la liste par ordre alphabétique.
- Compter le nombre d'occurrences d'un nom spécifique.
- Rechercher si un nom donné est présent dans la liste.
TP : Mise en place d'une pile d'évènements
Sujet 1 : Créer un programme simulant une file d'attente pour un service client :
- Utiliser une
deque
pour ajouter les clients à la file d'attente. - Offrir la possibilité de traiter le premier client dans la file (supprimer de la tête).
- Ajouter une fonctionnalité pour ajouter un client prioritaire (ajouter à la tête de la deque).
- Afficher la file d'attente actuelle après chaque opération.
Sujet 2 : Implémentez un système de gestion de navigation (comme un navigateur web simple) avec la possibilité d'aller en avant et en arrière dans l'historique :
- Utilisez une
deque
pour stocker les pages visitées. - Permettez à l'utilisateur d'aller à une nouvelle page (ajouter à la deque).
- Gérer la navigation arrière et avant avec des méthodes spécifiques de la deque.
- Revenir a un moment spécifique de la navigation (par exemple 4 page en arrière)
TP : Mise en pratique dict
- Créer un programme qui analyse un texte et calcule la fréquence d'apparition de chaque mot :
- Lire un texte fourni par l'utilisateur.
- Stocker chaque mot comme clé d'un dictionnaire et le nombre d'apparitions comme valeur.
- Afficher les mots les plus fréquents.
- Implémentez un carnet d'adresses utilisant un
dict
:- Chaque contact a un nom comme clé et un numéro de téléphone comme valeur.
- Permettre l'ajout, la modification, et la suppression de contacts.
- Rechercher un contact par son nom.
- Afficher tous les contacts dans l'ordre alphabétique.
- Supression des contacts en double
TP : Mise en pratique globale
Créer une application de gestion de bibliothèque :
- Utiliser un
list
pour stocker les livres disponibles. - Un
set
pour suivre les genres de livres uniques dans la bibliothèque. - Un
deque
pour gérer les emprunts et les retours de livres (les premiers empruntés doivent être les premiers rendus). - Un
dict
pour gérer les informations des livres (titre, auteur, année, genre, nombre de livres). - Un
defaultdict(list)
pour regrouper les livres par auteur.
Règles à implémenter :
- L'ajout d'un livre contenant un genre non existant est interdit
- Un livre peut avoir plusieurs occurrences (e.g. deux livre avec le même titre)
- Les livres sont référencés par leur titre
- Un livre ne peut pas être emprunté plus de fois que d'exemplaire disponible (à vous de trouver le format de donnée pour l'implémentation)
- Un livre peut être rendu
- On doit pouvoir retrouver les livres par leur auteur
Optionnel
- On peut rendre les livres dans le désordre
- Si on supprime un genre tous les livres du genre sont supprimés
- Si on supprime un auteur tous les livres de l'auteur sont supprimés
TP Finaux
TP 1 : Manipulation de fichier
Partie 1 : Manipulation d’un Fichier CSV
1. Création d’un fichier CSV
Crée un fichier produits.csv contenant les données suivantes (avec un script python) :
ID | Nom du Produit | Catégorie | Prix (€) | Stock |
---|
1 | Ordinateur | Informatique | 1000 | 10 |
2 | Souris | Informatique | 20 | 50 |
3 | Clavier | Informatique | 30 | 40 |
4 | Téléphone | Téléphonie | 500 | 25 |
5 | Casque Audio | Audio | 100 | 15 |
Solution
import csv
# Création du fichier CSV
with open('produits.csv', 'w', newline='', encoding='utf-8') as fichier_csv:
writer = csv.writer(fichier_csv)
# Écriture de l'en-tête
writer.writerow(['ID', 'Nom du Produit', 'Catégorie', 'Prix (€)', 'Stock'])
# Écriture des données
writer.writerows([
[1, 'Ordinateur', 'Informatique', 1000, 10],
[2, 'Souris', 'Informatique', 20, 50],
[3, 'Clavier', 'Informatique', 30, 40],
[4, 'Téléphone', 'Téléphonie', 500, 25],
[5, 'Casque Audio', 'Audio', 100, 15],
])
- Lire et afficher le fichier en python
Solution
# Lecture du fichier CSV
with open('produits.csv', 'r', encoding='utf-8') as fichier_csv:
reader = csv.reader(fichier_csv)
for ligne in reader:
print(ligne)
Partie 2 : Manipulation d’un Fichier JSON
1. Conversion du CSV en JSON
Écrire un script qui lit le fichier produits.csv et le convertit en fichier JSON nommé produits.json.
Solution
import json
# Lecture du fichier CSV et conversion en liste de dictionnaires
produits = []
with open('produits.csv', 'r', encoding='utf-8') as fichier_csv:
reader = csv.DictReader(fichier_csv)
for ligne in reader:
produits.append(ligne)
# Écriture dans un fichier JSON
with open('produits.json', 'w', encoding='utf-8') as fichier_json:
json.dump(produits, fichier_json, indent=4, ensure_ascii=False)
2. Lecture et Modification du fichier JSON
Modifier le prix de tous les produits en appliquant une réduction de 10 %.
Solution
# Lecture du fichier JSON
with open('produits.json', 'r', encoding='utf-8') as fichier_json:
produits = json.load(fichier_json)
# Application de la réduction de 10 % sur les prix
for produit in produits:
produit['Prix (€)'] = round(float(produit['Prix (€)']) * 0.9, 2)
# Écriture des modifications dans le fichier JSON
with open('produits.json', 'w', encoding='utf-8') as fichier_json:
json.dump(produits, fichier_json, indent=4, ensure_ascii=False)
Partie 3 : Extraction et Analyse des Données
Extraction : Créer une liste des produits dont le stock est inférieur à 20.
Analyse : Afficher la somme totale des stocks.
Solution
# Extraction des produits avec un stock inférieur à 20
produits_stock_limite = [produit for produit in produits if int(produit['Stock']) < 20]
print("Produits avec stock inférieur à 20 :")
for produit in produits_stock_limite:
print(produit['Nom du Produit'], "-", produit['Stock'])
# Calcul de la somme totale des stocks
stock_total = sum(int(produit['Stock']) for produit in produits)
print(f"Stock total : {stock_total}")
Bonus : Exportation des Données Modifiées en CSV
Recréer un fichier CSV à partir des données modifiées du JSON.
Solution
# Écriture du fichier CSV à partir des données JSON
with open('produits_modifies.csv', 'w', newline='', encoding='utf-8') as fichier_csv:
writer = csv.DictWriter(fichier_csv, fieldnames=produits[0].keys())
writer.writeheader()
writer.writerows(produits)
TP 2 : Programmation Réseau avec Python
Partie 1 : Création d’un Serveur TCP
Le serveur doit écouter sur une adresse IP locale et un port spécifique, recevoir des messages d’un client, et répondre avec une confirmation.
Solution
import socket
# Création du serveur TCP
serveur = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serveur.bind(('127.0.0.1', 12345))
serveur.listen(1)
print("Serveur en attente de connexion...")
connexion, adresse = serveur.accept()
print(f"Connecté à {adresse}")
# Boucle pour recevoir et répondre aux messages
while True:
message = connexion.recv(1024).decode('utf-8')
if not message or message.lower() == 'quit':
print("Fermeture de la connexion.")
break
print(f"Message reçu : {message}")
connexion.sendall("Message reçu".encode('utf-8'))
connexion.close()
serveur.close()
Partie 2 : Création du client TCP
Le client doit se connecter au serveur, envoyer un message, puis afficher la réponse du serveur.
Solution
import socket
# Création du client TCP
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 12345))
# Envoi d'un message
message = input("Entrez un message à envoyer au serveur : ")
client.sendall(message.encode('utf-8'))
# Réception de la réponse
reponse = client.recv(1024).decode('utf-8')
print(f"Réponse du serveur : {reponse}")
client.close()
Partie 3 : Communication UDP
- Le serveur UDP
Solution
Le serveur UDP doit écouter sur un port et répondre à chaque message reçu.
import socket
# Création du serveur UDP
serveur = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serveur.bind(('127.0.0.1', 12345))
print("Serveur UDP en attente de messages...")
while True:
message, adresse = serveur.recvfrom(1024)
print(f"Message reçu de {adresse} : {message.decode('utf-8')}")
serveur.sendto("Message bien reçu".encode('utf-8'), adresse)
- Le client UDP
Le client UDP envoie un message et attend la réponse du serveur.
Solution
import socket
# Création du client UDP
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
message = input("Entrez un message à envoyer au serveur : ")
client.sendto(message.encode('utf-8'), ('127.0.0.1', 12345))
reponse, _ = client.recvfrom(1024)
print(f"Réponse du serveur : {reponse.decode('utf-8')}")
client.close()
Partie 4 : Analyse Simple des Réseaux
- Récupération d’une Adresse IP depuis un Nom de Domaine
Écrire un script qui demande à l’utilisateur un nom de domaine et affiche son adresse IP.
Solution
import socket
nom_domaine = input("Entrez un nom de domaine : ")
adresse_ip = socket.gethostbyname(nom_domaine)
print(f"L'adresse IP de {nom_domaine} est : {adresse_ip}")
- Vérification de Ports Ouverts
Écrire un script qui scanne les ports d’une machine locale pour vérifier s’ils sont ouverts.
Solution
import socket
hote = '127.0.0.1'
ports_a_verifier = [22, 80, 443, 12345]
for port in ports_a_verifier:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
resultat = s.connect_ex((hote, port))
if resultat == 0:
print(f"Port {port} : OUVERT")
else:
print(f"Port {port} : FERMÉ")
Partie 5 : Serveur Multi-Clients (Bonus)
Modifier le serveur TCP pour gérer plusieurs connexions en parallèle avec des threads.
Solution
import socket
import threading
def gerer_client(connexion, adresse):
print(f"Nouvelle connexion : {adresse}")
while True:
message = connexion.recv(1024).decode('utf-8')
if not message or message.lower() == 'quit':
break
print(f"Message de {adresse} : {message}")
connexion.sendall("Message reçu".encode('utf-8'))
connexion.close()
print(f"Connexion fermée : {adresse}")
serveur = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serveur.bind(('127.0.0.1', 12345))
serveur.listen(5)
print("Serveur en attente de connexions...")
while True:
connexion, adresse = serveur.accept()
thread = threading.Thread(target=gerer_client, args=(connexion, adresse))
thread.start()
TP 3 : Introduction IA avec tensorflow
Étape 1 : Chargement des données
- TensorFlow propose des datasets intégrés. Utilisez le dataset MNIST (chiffres manuscrits).
# Install: pip install tensorflow-datasets
import tensorflow as tf
import tensorflow_datasets as tfds
mnist_data = tfds.load("mnist")
# Charger les données et les séparer en ensembles d'entraînement et de test
mnist_train, mnist_test = mnist_data["train"], mnist_data["test"]
batch_size = 32
train_size = mnist_train.cardinality().numpy() * batch_size
test_size = mnist_test.cardinality().numpy() * batch_size
print(f"Taille du jeu d'entraînement : {train_size} échantillons")
print(f"Taille du jeu de test : {test_size} échantillons")
Étape 2 : Prétraitement des données
- Normalisez les images pour que les valeurs des pixels soient comprises entre 0 et 1.
- Transformez les labels (y) en vecteurs one-hot encodés.
def preprocess(features):
image = features["image"]
label = features["label"]
image = tf.cast(image, tf.float32) / 255.0 # Normalize to [0, 1]
return image, label
# Prepare the training and test datasets
batch_size = 32
mnist_train = mnist_train.map(preprocess).shuffle(10000).batch(batch_size)
mnist_test = mnist_test.map(preprocess).batch(batch_size)
Étape 3 : Création du modèle
- Créez un modèle simple avec une couche d'entrée flatten, une couche cachée dense avec activation ReLU, et une couche de sortie softmax.
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense
model = Sequential([
Flatten(input_shape=(28, 28, 1)),
Dense(128, activation='relu'),
Dense(10, activation='softmax')
])
Étape 4 : Compilation et entraînement
- Compilez le modèle avec une fonction de perte
categorical_crossentropy
, un optimiseuradam
, et mesurez l’accuracy
. - Entraînez le modèle sur les données d’entraînement avec validation sur les données de test.
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
model.fit(mnist_train, epochs=5)
Étape 5 : Évaluation du modèle
- Évaluez la performance du modèle sur le jeu de test.
- Affichez les résultats et tracez les courbes de perte et d’accuracy.
test_loss, test_acc = model.evaluate(mnist_test)
print(f"Précision sur le jeu de test : {test_acc:.2f}")
import matplotlib.pyplot as plt
# Evaluate the model
test_loss, test_acc = model.evaluate(mnist_test)
# Plot the results
metrics = ['Test Loss', 'Test Accuracy']
values = [test_loss, test_acc]
# Create a bar chart
plt.figure(figsize=(8, 5))
plt.bar(metrics, values, color=['skyblue', 'lightgreen'])
plt.ylim(0, 1) # Accuracy is between 0 and 1, so we set limits for better visualization
plt.title('Model Evaluation Metrics')
plt.ylabel('Value')
plt.text(0, values[0] + 0.02, f'{values[0]:.4f}', ha='center')
plt.text(1, values[1] + 0.02, f'{values[1]:.4f}', ha='center')
# Display the plot
plt.show()
TP 4 : Utilisation de Docker + Python (automatisation)
pip install docker
Partie 1 : Automatisation du Démarrage et de l’Arrêt des Conteneurs
1. Script Python pour Démarrer un Conteneur Docker
Le script suivant démarre un conteneur à partir d’une image donnée (par exemple, nginx
).
Solution
import docker
# Connexion au daemon Docker
client = docker.from_env()
def demarrer_conteneur(image_name):
try:
conteneur = client.containers.run(image_name, detach=True, name="mon_conteneur")
print(f"Conteneur démarré : {conteneur.short_id}")
except docker.errors.APIError as e:
print(f"Erreur lors du démarrage : {e}")
# Démarrer un conteneur nginx
demarrer_conteneur("nginx")
2. Script Python pour Arrêter et Supprimer un Conteneur
Le script suivant arrête et supprime le conteneur démarré précédemment.
Solution
def arreter_et_supprimer_conteneur(container_name):
try:
conteneur = client.containers.get(container_name)
conteneur.stop()
conteneur.remove()
print(f"Conteneur arrêté et supprimé : {container_name}")
except docker.errors.NotFound:
print(f"Conteneur {container_name} introuvable.")
except docker.errors.APIError as e:
print(f"Erreur : {e}")
# Arrêter et supprimer le conteneur nommé "mon_conteneur"
arreter_et_supprimer_conteneur("mon_conteneur")
Partie 2 : Gestion des Logs des Conteneurs
1. Affichage des Logs d’un Conteneur en Temps Réel
Ce script affiche les logs en continu pour un conteneur spécifique.
Solution
def afficher_logs(container_name):
try:
conteneur = client.containers.get(container_name)
for ligne in conteneur.logs(stream=True):
print(ligne.decode('utf-8').strip())
except docker.errors.NotFound:
print(f"Conteneur {container_name} introuvable.")
# Afficher les logs du conteneur nommé "mon_conteneur"
afficher_logs("mon_conteneur")
2. Sauvegarde des Logs dans un Fichier
Le script suivant enregistre les logs d’un conteneur dans un fichier.
Solution
def sauvegarder_logs(container_name, fichier_log):
try:
conteneur = client.containers.get(container_name)
with open(fichier_log, 'w') as fichier:
fichier.write(conteneur.logs().decode('utf-8'))
print(f"Logs sauvegardés dans {fichier_log}.")
except docker.errors.NotFound:
print(f"Conteneur {container_name} introuvable.")
# Sauvegarder les logs du conteneur "mon_conteneur" dans un fichier
sauvegarder_logs("mon_conteneur", "logs_conteneur.txt")
Partie 3 : Automatisation Avancée
1. Automatiser le Démarrage de Plusieurs Conteneurs
Démarre plusieurs conteneurs à partir d’une liste d’images.
def demarrer_conteneurs(images):
for image in images:
demarrer_conteneur(image)
# Démarrer des conteneurs pour plusieurs images
images = ["nginx", "redis", "alpine"]
demarrer_conteneurs(images)
- Nettoyage Automatique des Conteneurs Arrêtés
Ce script supprime tous les conteneurs arrêtés.
def nettoyage_conteneurs_arretes():
conteneurs = client.containers.list(all=True)
for conteneur in conteneurs:
if conteneur.status == 'exited':
print(f"Suppression du conteneur : {conteneur.name}")
conteneur.remove()
# Nettoyage des conteneurs arrêtés
nettoyage_conteneurs_arretes()
Partie 4 : Créer un Service Automatisé avec systemd
1. Fichier de Service systemd
Crée un fichier /etc/systemd/system/gestion_docker.service :
[Unit]
Description=Service d'automatisation Docker
After=docker.service
[Service]
ExecStart=/usr/bin/python3 /chemin/vers/script_docker.py
Restart=always
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable gestion_docker.service
sudo systemctl start gestion_docker.service
Calcul Scientifique avec Python
pip install numpy scipy matplotlib
Partie 1 : Manipulation des Matrices avec NumPy
1. Création et Opérations sur les Matrices
Crée deux matrices et effectue des opérations basiques comme l'addition, la multiplication, et le calcul du déterminant.
import numpy as np
# Création de matrices
A = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
B = np.array([[9, 8, 7], [6, 5, 4], [3, 2, 1]])
# Addition
C = A + B
print("Addition des matrices :\n", C)
# Multiplication élément par élément
D = A * B
print("Multiplication élément par élément :\n", D)
# Produit matriciel
E = np.dot(A, B)
print("Produit matriciel :\n", E)
# Calcul du déterminant
det_A = np.linalg.det(A)
print("Déterminant de A :", det_A)
2. Résolution d’un Système Linéaire
Résolvons le système d’équations suivant :
2x+y−z=8
−3x−y+2z=−11
−2x+y+2z=−3
# Matrice des coefficients
coefficients = np.array([[2, 1, -1], [-3, -1, 2], [-2, 1, 2]])
# Matrice des constantes
constantes = np.array([8, -11, -3])
# Résolution du système
solution = np.linalg.solve(coefficients, constantes)
print("Solution du système : x = {}, y = {}, z = {}".format(*solution))
Partie 2 : Calculs Avancés avec SciPy
1. Calcul d’Intégrale
Calculons l’intégrale de la fonction f(x) = x^2 entre 0 et 4.
from scipy.integrate import quad
# Définition de la fonction
def f(x):
return x**2
# Calcul de l'intégrale
resultat, erreur = quad(f, 0, 4)
print("Résultat de l'intégrale :", resultat)
2. Résolution d’une Équation Différentielle
Résolvons l’équation différentielle suivante :
dx / dy=−2y ,y(0)=1
from scipy.integrate import solve_ivp
# Définition de l'équation différentielle
def equation(t, y):
return -2 * y
# Résolution de l'équation
solution = solve_ivp(equation, [0, 5], [1], t_eval=np.linspace(0, 5, 100))
print("Solution :\n", solution.y[0])
Partie 3 : Visualisation des Données avec Matplotlib
1. Tracé d’une Fonction
Traçons la fonction f(x)=sin(x)
import matplotlib.pyplot as plt
# Définition des données
x = np.linspace(0, 2 * np.pi, 100)
y = np.sin(x)
# Tracé
plt.plot(x, y, label='sin(x)')
plt.title('Tracé de sin(x)')
plt.xlabel('x')
plt.ylabel('sin(x)')
plt.legend()
plt.grid(True)
plt.show()
2. Visualisation des Solutions de l’Équation Différentielle
Représentons graphiquement la solution obtenue précédemment.
# Tracé de la solution
plt.plot(solution.t, solution.y[0], label='Solution de dy/dx = -2y')
plt.title('Solution de l\'équation différentielle')
plt.xlabel('Temps (t)')
plt.ylabel('y(t)')
plt.legend()
plt.grid(True)
plt.show()
Partie 4 : Analyse Statistique
1. Calcul de Statistiques de Base
Analyse des données suivantes : [10, 20, 30, 40, 50].
# Données
donnees = np.array([10, 20, 30, 40, 50])
# Calcul des statistiques
moyenne = np.mean(donnees)
mediane = np.median(donnees)
ecart_type = np.std(donnees)
print(f"Moyenne : {moyenne}, Médiane : {mediane}, Écart-type : {ecart_type}")
TP : Connexion avec la base de donnée MYSQL
pip install mysql-connector-python
Partie 1 : Connexion à une Base de Données MySQL
Établissons une connexion à une base de données MySQL locale.
import mysql.connector
# Connexion à MySQL
conn = mysql.connector.connect(
host="localhost",
user="root", # Remplacez par votre nom d'utilisateur
password="password" # Remplacez par votre mot de passe
)
if conn.is_connected():
print("Connexion réussie à MySQL")
else:
print("Échec de la connexion")
Partie 2 : Création d'une Base de Données et d'une Table
Créons une base de données nommée tp_mysql et une table etudiants.
# Création d'un curseur
cursor = conn.cursor()
# Création de la base de données
cursor.execute("CREATE DATABASE IF NOT EXISTS tp_mysql")
# Connexion à la base de données créée
conn.database = "tp_mysql"
# Création de la table
cursor.execute("""
CREATE TABLE IF NOT EXISTS etudiants (
id INT AUTO_INCREMENT PRIMARY KEY,
nom VARCHAR(255),
age INT,
filiere VARCHAR(255)
)
""")
print("Table 'etudiants' créée.")
Partie 3 : Insertion de Données
Ajoutons des enregistrements dans la table etudiants.
# Requête d'insertion
sql = "INSERT INTO etudiants (nom, age, filiere) VALUES (%s, %s, %s)"
valeurs = [
("Alice", 20, "Informatique"),
("Bob", 22, "Mathématiques"),
("Charlie", 21, "Physique")
]
# Exécution de la requête
cursor.executemany(sql, valeurs)
conn.commit()
print(f"{cursor.rowcount} enregistrements insérés.")
Partie 4 : Lecture des Données
Récupérons les données de la table etudiants.
# Requête de sélection
cursor.execute("SELECT * FROM etudiants")
resultats = cursor.fetchall()
# Affichage des résultats
for etudiant in resultats:
print(etudiant)
Partie 5 : Mise à Jour des Données
Modifions l'age d'un étudiant
# Requête de mise à jour
sql = "UPDATE etudiants SET age = %s WHERE nom = %s"
valeurs = (23, "Alice")
# Exécution de la requête
cursor.execute(sql, valeurs)
conn.commit()
print(f"{cursor.rowcount} enregistrement(s) mis à jour.")
Partie 6 : Suppression de Données
Supprimons un étudiant de la table.
# Requête de suppression
sql = "DELETE FROM etudiants WHERE nom = %s"
valeurs = ("Bob",)
# Exécution de la requête
cursor.execute(sql, valeurs)
conn.commit()
print(f"{cursor.rowcount} enregistrement(s) supprimé(s).")
Partie 7 : Gestion des Erreurs et Fermeture de la Connexion
Assurons-nous de gérer les erreurs et de fermer proprement la connexion.
# Gestion des erreurs
try:
cursor.execute("SELECT * FROM etudiants")
except mysql.connector.Error as err:
print(f"Erreur : {err}")
# Fermeture des connexions
cursor.close()
conn.close()
print("Connexion fermée.")
Exercices Complémentaires :
Ajoutez de nouvelles colonnes dans la table etudiants pour stocker des informations supplémentaires, comme l'adresse ou le numéro de téléphone.
Effectuez des requêtes filtrées pour récupérer uniquement les étudiants d’une certaine filière.
Ajoutez des contraintes comme des clés étrangères en créant une nouvelle table.
Map Reduce
# Sum of Square
# tasks.py
from celery import Celery
app = Celery('mapreduce', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def square(n):
return n * n
@app.task
def reduce_sum(values):
return sum(values)
# main.py
from tasks import square, reduce_sum
from celery.result import GroupResult
from celery import group
def map_reduce(data):
# Step 1: Map Phase (square each number)
map_jobs = group(square.s(n) for n in data)()
map_results = map_jobs.get()
print("Mapped results:", map_results)
# Step 2: Reduce Phase (sum the results)
reduce_job = reduce_sum.delay(map_results)
final_result = reduce_job.get()
print("Reduced result:", final_result)
return final_result
if __name__ == '__main__':
numbers = list(range(1, 6)) # [1, 2, 3, 4, 5]
result = map_reduce(numbers)
print("Sum of squares:", result)
python -m celery -A tasks worker --pool=solo -l info
TP : gRPC python
Partie 1
Dans cette étape, vous concevez l’interface publique de votre microservice en utilisant le langage de description d’interface de Protocol Buffers (proto3). Vous définissez :
- Les messages échangés :
NoteRequest
,StudentRequest
, etc. - Les méthodes du service :
AddNote
,GetNotes
,GetAverage
.
Cela permet de générer automatiquement les classes nécessaires pour que le client et le serveur puissent communiquer.
grpc-notes/
├── client/
│ └── client.py
├── generated/
│ ├── notes_pb2.py
│ └── notes_pb2_grpc.py
├── proto/
│ └── notes.proto
├── server/
│ ├── notes_service.py
│ └── server.py
├── requirements.txt
└── README.md
syntax = "proto3";
package notes;
service NotesService {
rpc AddNote (NoteRequest) returns (NoteResponse);
rpc GetNotes (StudentRequest) returns (NotesList);
rpc GetAverage (StudentRequest) returns (AverageResponse);
}
message NoteRequest {
string student_id = 1;
float note = 2;
}
message StudentRequest {
string student_id = 1;
}
message NoteResponse {
string message = 1;
}
message NotesList {
repeated float notes = 1;
}
message AverageResponse {
float average = 1;
}
proto/notes.proto
Vous installez les dépendances Python (grpcio
, grpcio-tools
) nécessaires au fonctionnement de gRPC en Python.
Ensuite, vous utilisez la commande protoc
pour générer deux fichiers Python à partir de notes.proto
:
notes_pb2.py
: contient les définitions de messages.notes_pb2_grpc.py
: contient les stubs du service (interface client et base du serveur).
Ces fichiers sont utilisés pour écrire le serveur et le client.
grpcio
grpcio-tools
requirements.txt
python -m grpc_tools.protoc -Iproto --python_out=generated --grpc_python_out=generated proto/notes.proto
Partie 2
Vous implémentez une classe NotesService
qui hérite du service gRPC généré (NotesServiceServicer
). Cette classe contient :
- une méthode pour ajouter une note,
- une pour récupérer toutes les notes d’un étudiant,
- une pour calculer la moyenne.
Les données sont stockées en mémoire dans un dictionnaire Python ({student_id: [notes]}
).
Ensuite, vous configurez et démarrez le serveur avec grpc.server()
, sur le port 50051
, dans server.py
.
from generated import notes_pb2, notes_pb2_grpc
class NotesService(notes_pb2_grpc.NotesServiceServicer):
def __init__(self):
self.notes = {}
def AddNote(self, request, context):
student_id = request.student_id
note = request.note
self.notes.setdefault(student_id, []).append(note)
return notes_pb2.NoteResponse(message=f"Note {note} ajoutée pour {student_id}.")
def GetNotes(self, request, context):
student_id = request.student_id
notes = self.notes.get(student_id, [])
return notes_pb2.NotesList(notes=notes)
def GetAverage(self, request, context):
student_id = request.student_id
notes = self.notes.get(student_id, [])
if not notes:
return notes_pb2.AverageResponse(average=0.0)
avg = sum(notes) / len(notes)
return notes_pb2.AverageResponse(average=avg)
server/notes_service.py
from concurrent import futures
import grpc
import time
from generated import notes_pb2_grpc
from server.notes_service import NotesService
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
notes_pb2_grpc.add_NotesServiceServicer_to_server(NotesService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Serveur gRPC lancé sur le port 50051.")
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
print("Arrêt du serveur.")
server.stop(0)
if __name__ == '__main__':
serve()
server/server.py
Partie 3
Le client gRPC :
- Établit une connexion avec le serveur (
localhost:50051
), - Utilise le stub généré pour envoyer des requêtes au serveur,
- Affiche les réponses dans un menu interactif.
Trois fonctionnalités sont disponibles :
- Ajouter une note à un étudiant.
- Afficher toutes les notes d’un étudiant.
- Calculer la moyenne des notes d’un étudiant.
Le client permet de tester l’ensemble des fonctionnalités exposées par le serveur.
import grpc
from generated import notes_pb2, notes_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = notes_pb2_grpc.NotesServiceStub(channel)
while True:
print("\n--- Menu ---")
print("1. Ajouter une note")
print("2. Voir les notes d’un étudiant")
print("3. Calculer la moyenne")
print("4. Quitter")
choice = input("Votre choix : ")
if choice == "1":
sid = input("ID étudiant : ")
note = float(input("Note : "))
resp = stub.AddNote(notes_pb2.NoteRequest(student_id=sid, note=note))
print(resp.message)
elif choice == "2":
sid = input("ID étudiant : ")
resp = stub.GetNotes(notes_pb2.StudentRequest(student_id=sid))
print("Notes :", resp.notes)
elif choice == "3":
sid = input("ID étudiant : ")
resp = stub.GetAverage(notes_pb2.StudentRequest(student_id=sid))
print("Moyenne :", resp.average)
elif choice == "4":
break
else:
print("Option invalide.")
if __name__ == '__main__':
run()
client/client.py
Partie 4
Vous commencez par générer le code gRPC avec protoc
.
Puis vous lancez le serveur (server.py
) et, dans une autre console, le client (client.py
).
Vous pouvez ensuite interagir avec le service via le menu du client. Chaque action déclenche une requête gRPC.
## Démarer le serveur
python server/server.py
## Démarer le client
python client/client.py
TP : Numba
Dans ce TP, vous découvrirez comment optimiser les performances d'un programme Python à l'aide de la bibliothèque Numba, un compilateur JIT (Just-In-Time) qui permet d’accélérer considérablement l'exécution de certaines fonctions, notamment celles impliquant des calculs intensifs ou des boucles lourdes.
À travers plusieurs exercices progressifs, vous comparerez le temps d'exécution de fonctions classiques en Python avec leurs équivalents optimisés par Numba. Vous utiliserez à la fois des décorateurs simples comme @njit
et des fonctions traitant des tableaux NumPy. Le TP se termine par une mise en œuvre graphique d’un ensemble de Mandelbrot, montrant visuellement les avantages de Numba.
- nopython=True
- Active le mode "nopython", dans lequel tout le code est compilé en machine, sans tomber en mode Python. C’est le mode le plus rapide.
- parallel=True
- Active l'exécution parallèle automatique (multi-cœurs) lorsque c’est possible. Nécessite prange() dans les boucles.
- cache=True
- Active la mise en cache du code compilé pour réutilisation sans recompilation lors des prochains lancements.
- nogil=True
- Permet l’exécution sans verrou Python (GIL), utile dans un contexte multi-thread.
pip install numba numpy matplotlib
Structure attendue
numba-tp/
├── exercice_1_basique.py
├── exercice_2_numpy.py
├── exercice_3_fractale_mandelbrot.py
└── requirements.txt
Partie 1 :
Vous allez comparer deux fonctions qui calculent une factorielle de manière répétée :
- Une fonction Python classique, volontairement lente.
- Une version compilée avec Numba (
@njit
).
Vous mesurerez le temps d’exécution dans chaque cas afin de quantifier le gain de performance.
import time
from numba import jit
def slow_factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
@jit(nopython=True)
def fast_factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
N = 10_000
start = time.time()
for _ in range(1000):
slow_factorial(500)
print("Sans Numba :", time.time() - start, "secondes")
start = time.time()
for _ in range(1000):
fast_factorial(500)
print("Avec Numba :", time.time() - start, "secondes")
Partie 2 :
Dans cette partie, vous allez appliquer une opération mathématique complexe (combinaison de puissance, sinus, logarithme) à un très grand tableau NumPy (10 millions d’éléments). Vous comparerez trois versions :
- Traitement NumPy vectorisé seul,
- Traitement Python avec une boucle,
- Traitement Python + boucle optimisée avec Numba.
import numpy as np
import time
from numba import njit
@njit
def compute(data):
result = np.empty_like(data)
for i in range(data.shape[0]):
result[i] = data[i] ** 1.5 + np.sin(data[i]) - np.log1p(data[i])
return result
data = np.linspace(0.01, 100, 10_000_000)
# Version NumPy seule
start = time.time()
result_numpy = data ** 1.5 + np.sin(data) - np.log1p(data)
print("NumPy :", time.time() - start, "s")
# Version Numba
start = time.time()
result_numba = compute(data)
print("Numba :", time.time() - start, "s")
Partie 3 :
Vous implémentez une version simple du générateur de l’ensemble de Mandelbrot. Ce type de fractale nécessite de nombreuses itérations sur chaque pixel de l’image. Sans Numba, l’affichage est lent. Avec Numba, vous observerez une nette amélioration des performances, rendant la génération interactive.
import numpy as np
import matplotlib.pyplot as plt
from numba import njit
import time
@njit
def mandelbrot(width, height, max_iter):
result = np.zeros((height, width), dtype=np.uint8)
for x in range(width):
for y in range(height):
zx, zy = 0.0, 0.0
cx = (x - width / 2) * 4.0 / width
cy = (y - height / 2) * 4.0 / width
i = 0
while zx * zx + zy * zy < 4 and i < max_iter:
tmp = zx * zx - zy * zy + cx
zy, zx = 2.0 * zx * zy + cy, tmp
i += 1
result[y, x] = i
return result
WIDTH, HEIGHT = 800, 600
MAX_ITER = 100
start = time.time()
image = mandelbrot(WIDTH, HEIGHT, MAX_ITER)
print("Temps de calcul :", time.time() - start, "secondes")
plt.imshow(image, cmap='inferno')
plt.title("Fractale Mandelbrot accélérée avec Numba")
plt.axis('off')
plt.show()
TP : Corruption de python
En CPython (l'implémentation standard de Python), les objets int
sont représentés par la structure C PyLongObject
. Les petits entiers (entre -5 et 256) sont internés — c’est-à-dire partagés globalement pour des raisons de performance.
Ce TP manipule directement la mémoire de l’objet 1
, pour altérer sa valeur de manière globale dans l’interpréteur.
- Importation de
ctypes
: pour accéder bas-niveau à la mémoire. - Obtention de l’adresse mémoire de l’objet
1
viaid(one)
. - Décalage mémoire de 24 octets pour accéder au champ
ob_digit[0]
(la "valeur" de l'entier, codée en base 2³⁰ sur les systèmes 64 bits). - Modification de la valeur de l'entier
1
en mémoire :digit_ptr[0] = 2
. - Observation des effets de la corruption : par exemple,
1 + 1
ne vaut plus 2.
import ctypes
# ⚠️ WARNING: This will break Python
# All uses of 1 will now act weird
one = 1
print("Before corruption: 1 + 1 =", 1 + 1)
print("Before corruption: one + one =", one + one)
# Get memory address of `1`
addr = id(one)
# PyLongObject internals:
# [ ob_refcnt | ob_type | ob_size | ob_digit[0] ... ]
# On 64-bit systems, that's 24 bytes offset to the digits
digit_ptr = ctypes.cast(addr + 24, ctypes.POINTER(ctypes.c_uint32))
# Modify the digit
digit_ptr[0] = 2 # Now, `1` becomes 2 everywhere
a = 1
b = 1
print("Var after corruption a + b = ", a + b)
print("After corruption: 1 + 1 =", 1 + 1)
print("Before corruption: one + one =", one + one)
print("True == 1:", True == 1)
TP : Lire / Écrire un fichier binaire via memoryview
Comprendre comment manipuler un fichier binaire contenant des structures typées (int, float) sans recopier la mémoire. Cela est utile en data science, réseaux, ou systèmes embarqués pour accéder à des données binaires efficacement.
Étape 1 – Générer un fichier binaire contenant des paires (int, float)
On va utiliser le module struct
pour transformer des données Python en bytes de manière structurée (comme en C). Ici, chaque structure contiendra un int
suivi d’un float
.
import struct
with open("data.bin", "wb") as f:
for i in range(10):
data = struct.pack("if", i, i * 1.5) # 'i' = int32, 'f' = float32
f.write(data)
Ce que fait ce code :
struct.pack("if", i, f)
convertit deux valeurs en 8 octets binaires.- On écrit 10 blocs de 8 octets dans un fichier.
Étape 2 – Lire le fichier et créer un memoryview
Plutôt que de parser les bytes un à un ou de faire des copies, on crée une vue mémoire sur le contenu lu, ce qui permet une lecture ultra efficace sans recopier les données.
with open("data.bin", "rb") as f:
content = f.read() # contenu binaire brut
view = memoryview(content)
Ce que fait ce code :
f.read()
lit tous les octets du fichier.memoryview(content)
permet d’accéder aux données en lecture seule sans les copier.
Étape 3 – Parcourir les structures via memoryview
On lit les données par blocs de 8 octets (4 pour l’int, 4 pour le float), puis on les décode avec struct.unpack()
.
for i in range(0, len(view), 8):
int_part, float_part = struct.unpack("if", view[i:i+8])
print(int_part, float_part)
Ce que fait ce code :
view[i:i+8]
sélectionne une tranche de 8 octets.struct.unpack("if", ...)
décode ces octets en deux valeurs Python.
TP : Injecter une DLL/SO via ctypes.CDLL
Apprendre à utiliser ctypes
pour charger dynamiquement une bibliothèque partagée (.so ou .dll) et appeler ses fonctions depuis Python.
Étape 1 – Créer une bibliothèque partagée en C
On crée une fonction C (triple
) simple, puis on la compile pour qu'elle soit accessible à Python.
// libmath.c
int triple(int x) {
return x * 3;
}
gcc -shared -o libmath.so -fPIC libmath.c # Linux
La fonction triple
prend un entier et renvoie sa valeur triplée. -fPIC
génère du code position-independent, requis pour les bibliothèques partagées.
Étape 2 – Charger la bibliothèque avec ctypes.CDLL
import ctypes
lib = ctypes.CDLL("./libmath.so")
CDLL
charge la bibliothèque dynamique pour pouvoir utiliser ses fonctions.
On charge le fichier compilé libmath.so
et on le lie à l’objet lib
.
Étape 3 – Appeler une fonction depuis Python
lib.triple.argtypes = [ctypes.c_int]
lib.triple.restype = ctypes.c_int
res = lib.triple(7)
print("Résultat :", res) # 21
.argtypes
définit les types d'arguments attendus..restype
définit le type de retour.- On déclare que la fonction
triple
prend unint
et retourne unint
. Ensuite, on l’appelle avec l’argument7
.
TP : Microbench entre +
, +=
, sum()
, np.sum()
, numba
On compare les performances de différentes méthodes de somme (+
, +=
, sum()
, np.sum()
, numba
) sur un grand tableau.
Étape 1 – Générer les données
import numpy as np
arr = np.arange(10_000_000, dtype=np.float32)
On prépare un grand vecteur pour simuler une charge réaliste de calcul en générant un tableau de 10 millions de nombres flottants consécutifs.
Étape 2 – Benchmark +
(for classique)
import time
t0 = time.time()
s = 0.0
for x in arr:
s = s + x
print("for + :", time.time() - t0)
Étape 3 – Benchmark +=
t0 = time.time()
s = 0.0
for x in arr:
s += x
print("for += :", time.time() - t0)
Étape 4 – Benchmark sum()
t0 = time.time()
s = sum(arr)
print("sum() :", time.time() - t0)
Étape 5 – Benchmark np.sum
t0 = time.time()
s = np.sum(arr)
print("np.sum() :", time.time() - t0)
Étape 6 – Benchmark avec numba
from numba import njit
@njit
def fast_sum(arr):
total = 0.0
for x in arr:
total += x
return total
t0 = time.time()
s = fast_sum(arr)
print("numba sum :", time.time() - t0)
Explications
+
,+=
sont lents car non vectorisés.sum()
est en pur Python.np.sum()
est optimisé C.numba
compile en code natif avec LLVM.
TP : Espionner une fonction avec sys.settrace
Utiliser sys.settrace
pour suivre l’exécution ligne par ligne d’un script.
Étape 1 – Définir une fonction trace
import sys
def tracer(frame, event, arg):
if event == "line":
lineno = frame.f_lineno
print(f"Exécution de la ligne {lineno}")
return tracer
On utilise le système de hook interne pour capturer l'exécution des lignes.
Étape 2 – Définir un code simple à tracer
def foo():
a = 1
b = 2
c = a + b
print(c)
Étape 3 – Activer le tracer et appeler la fonction
sys.settrace(tracer)
foo()
sys.settrace(None)
settrace()
active le hook, puis on exécute foo()
, ligne par ligne.
TP : Générer du code Python à l’exécution (exec, compile)
Utiliser compile()
pour générer dynamiquement du code et l’exécuter avec exec
.
Étape 1 – Générer dynamiquement une fonction
code_str = """
def dynamic_func(x):
return x * 2
"""
compiled = compile(code_str, "<string>", "exec")
exec(compiled)
print(dynamic_func(5)) # 10
On transforme une chaîne de texte en fonction Python exécutable.
Étape 2 – Compiler une expression
expr = compile("3 * 7 + 1", "<expr>", "eval")
print(eval(expr)) # 22
compile(..., "eval")
est utilisé pour les expressions simples (contrairement à "exec"
).
TP : Manipulation avancée de threads et GIL
Comprendre le GIL (Global Interpreter Lock) et ses effets sur les threads.
Étape 1 – Code multithread simple
import threading
counter = 0
def f():
global counter
for i in range(10**6):
a = counter
counter = a + 1
pass
threads = [threading.Thread(target=f)) for _ in range(4)]
[t.start() for t in threads]
[t.join() for t in threads]
print(counter)
On observe que les threads ne sont pas si parallèles à cause du GIL.
Étape 2 – Comparer avec multiprocessing
from multiprocessing import Process
counter = 0
processes = [Process(target=f) for _ in range(4)]
[p.start() for p in processes]
[p.join() for p in processes]
print(counter)
On contourne le GIL en créant des processus séparés.
Étape 3 – Utiliser threading.Lock
lock = threading.Lock()
counter = 0
def g():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = [threading.Thread(target=g) for _ in range(4)]
[t.start() for t in threads]
[t.join() for t in threads]
print(counter)
Lock explicite avec threads
TP : Créer une base mémoire + gRPC
Créer un microservice gRPC qui stocke des données en mémoire (clé/valeur).
Étape 1 – Définir le .proto
syntax = "proto3";
service MemoryDB {
rpc Put(Pair) returns (Empty);
rpc Get(Key) returns (Value);
}
message Pair {
string key = 1;
string value = 2;
}
message Key {
string key = 1;
}
message Value {
string value = 1;
}
message Empty {}
On définit les messages et services gRPC. Cela sert d’interface réseau entre client et serveur.
Étape 2 – Générer le code gRPC
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. memory.proto
Étape 3 – Implémenter le serveur
import grpc
from concurrent import futures
import memory_pb2
import memory_pb2_grpc
class Memory(memory_pb2_grpc.MemoryDBServicer):
def __init__(self):
self.store = {}
def Put(self, request, context):
self.store[request.key] = request.value
return memory_pb2.Empty()
def Get(self, request, context):
value = self.store.get(request.key, "")
return memory_pb2.Value(value=value)
server = grpc.server(futures.ThreadPoolExecutor())
memory_pb2_grpc.add_MemoryDBServicer_to_server(Memory(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
Le serveur maintient un dictionnaire Python en RAM et expose les méthodes via gRPC.
Étape 4 – Client
import grpc
import memory_pb2
import memory_pb2_grpc
channel = grpc.insecure_channel("localhost:50051")
stub = memory_pb2_grpc.MemoryDBStub(channel)
stub.Put(memory_pb2.Pair(key="name", value="Alice"))
resp = stub.Get(memory_pb2.Key(key="name"))
print("Value:", resp.value)