Ressources PYA
TP : Comparaison de deux listes
set1 = {1, 2, 3, 4}
set2 = {3, 4, 5, 6}
# Intersection (elements commun)
intersection = set1 & set2 # Output: {3, 4}
# Union (tout les élements uniques)
union = set1 | set2 # Output: {1, 2, 3, 4, 5, 6}
# Difference (différences dans le set1 mais pas dans le set2)
difference = set1 - set2 # Output: {1, 2}
# Difference Symétrique (soit dans le set1 soit dans le set2)
symmetric_difference = set1 ^ set2 # Output: {1, 2, 5, 6}TP : Ensemble d'entrée
Créer un programme qui demande à l'utilisateur d'entrer des noms et de les ajouter à une liste. Implémentez des fonctionnalités pour :
Fonction : input()
- Afficher tous les noms dans la liste.
- Supprimer un nom donné par l'utilisateur.
- Trier la liste par ordre alphabétique.
- Compter le nombre d'occurrences d'un nom spécifique.
- Rechercher si un nom donné est présent dans la liste.
TP : Mise en place d'une pile d'évènements
Sujet 1 : Créer un programme simulant une file d'attente pour un service client :
- Utiliser une
dequepour ajouter les clients à la file d'attente. - Offrir la possibilité de traiter le premier client dans la file (supprimer de la tête).
- Ajouter une fonctionnalité pour ajouter un client prioritaire (ajouter à la tête de la deque).
- Afficher la file d'attente actuelle après chaque opération.
Sujet 2 : Implémentez un système de gestion de navigation (comme un navigateur web simple) avec la possibilité d'aller en avant et en arrière dans l'historique :
- Utilisez une
dequepour stocker les pages visitées. - Permettez à l'utilisateur d'aller à une nouvelle page (ajouter à la deque).
- Gérer la navigation arrière et avant avec des méthodes spécifiques de la deque.
- Revenir a un moment spécifique de la navigation (par exemple 4 page en arrière)
TP : Mise en pratique dict
- Créer un programme qui analyse un texte et calcule la fréquence d'apparition de chaque mot :
- Lire un texte fourni par l'utilisateur.
- Stocker chaque mot comme clé d'un dictionnaire et le nombre d'apparitions comme valeur.
- Afficher les mots les plus fréquents.
- Implémentez un carnet d'adresses utilisant un
dict:- Chaque contact a un nom comme clé et un numéro de téléphone comme valeur.
- Permettre l'ajout, la modification, et la suppression de contacts.
- Rechercher un contact par son nom.
- Afficher tous les contacts dans l'ordre alphabétique.
- Supression des contacts en double
TP : Mise en pratique globale
Créer une application de gestion de bibliothèque :
- Utiliser un
listpour stocker les livres disponibles. - Un
setpour suivre les genres de livres uniques dans la bibliothèque. - Un
dequepour gérer les emprunts et les retours de livres (les premiers empruntés doivent être les premiers rendus). - Un
dictpour gérer les informations des livres (titre, auteur, année, genre, nombre de livres). - Un
defaultdict(list)pour regrouper les livres par auteur.
Règles à implémenter :
- L'ajout d'un livre contenant un genre non existant est interdit
- Un livre peut avoir plusieurs occurrences (e.g. deux livre avec le même titre)
- Les livres sont référencés par leur titre
- Un livre ne peut pas être emprunté plus de fois que d'exemplaire disponible (à vous de trouver le format de donnée pour l'implémentation)
- Un livre peut être rendu
- On doit pouvoir retrouver les livres par leur auteur
Optionnel
- On peut rendre les livres dans le désordre
- Si on supprime un genre tous les livres du genre sont supprimés
- Si on supprime un auteur tous les livres de l'auteur sont supprimés
TP Finaux
TP 1 : Manipulation de fichier
Partie 1 : Manipulation d’un Fichier CSV
1. Création d’un fichier CSV
Crée un fichier produits.csv contenant les données suivantes (avec un script python) :
| ID | Nom du Produit | Catégorie | Prix (€) | Stock |
|---|
| 1 | Ordinateur | Informatique | 1000 | 10 |
| 2 | Souris | Informatique | 20 | 50 |
| 3 | Clavier | Informatique | 30 | 40 |
| 4 | Téléphone | Téléphonie | 500 | 25 |
| 5 | Casque Audio | Audio | 100 | 15 |
Solution
import csv
ID = "ID"
NOM = "Nom du Produit"
CATEGORIE = "Catégorie"
PRIX = "Prix (€)"
STOCK = "Stock"
# Création du fichier CSV
with open('produits.csv', 'w', newline='', encoding='utf-8') as fichier_csv:
writer = csv.writer(fichier_csv)
# Écriture de l'en-tête
writer.writerow(['ID', 'Nom du Produit', 'Catégorie', 'Prix (€)', 'Stock'])
# Écriture des données
writer.writerows([
[1, 'Ordinateur', 'Informatique', 1000, 10],
[2, 'Souris', 'Informatique', 20, 50],
[3, 'Clavier', 'Informatique', 30, 40],
[4, 'Téléphone', 'Téléphonie', 500, 25],
[5, 'Casque Audio', 'Audio', 100, 15],
])
- Lire et afficher le fichier en python
Solution
# Lecture du fichier CSV
with open('produits.csv', 'r', encoding='utf-8') as fichier_csv:
reader = csv.reader(fichier_csv)
for ligne in reader:
print(ligne)
Partie 2 : Manipulation d’un Fichier JSON
1. Conversion du CSV en JSON
Écrire un script qui lit le fichier produits.csv et le convertit en fichier JSON nommé produits.json.
Solution
import json
# Lecture du fichier CSV et conversion en liste de dictionnaires
produits = []
with open('produits.csv', 'r', encoding='utf-8') as fichier_csv:
reader = csv.DictReader(fichier_csv)
for ligne in reader:
produits.append(ligne)
# Écriture dans un fichier JSON
with open('produits.json', 'w', encoding='utf-8') as fichier_json:
json.dump(produits, fichier_json, indent=4, ensure_ascii=False)
2. Lecture et Modification du fichier JSON
Modifier le prix de tous les produits en appliquant une réduction de 10 %.
Solution
# Lecture du fichier JSON
with open('produits.json', 'r', encoding='utf-8') as fichier_json:
produits = json.load(fichier_json)
# Application de la réduction de 10 % sur les prix
for produit in produits:
produit[PRIX] = round(float(produit[PRIX]) * 0.9, 2) # Prix est sur la position 3
# Écriture des modifications dans le fichier JSON
with open('produits.json', 'w', encoding='utf-8') as fichier_json:
json.dump(produits, fichier_json, indent=4, ensure_ascii=False)
Partie 3 : Extraction et Analyse des Données
Extraction : Créer une liste des produits dont le stock est inférieur à 20.
Analyse : Afficher la somme totale des stocks.
Solution
# Extraction des produits avec un stock inférieur à 20
produits_stock_limite = [produit for produit in produits if int(produit[STOCK]) < 20]
print("Produits avec stock inférieur à 20 :")
for produit in produits_stock_limite:
print(produit[NOM], "-", produit[STOCK])
# Calcul de la somme totale des stocks
stock_total = sum(int(produit[STOCK]) for produit in produits)
print(f"Stock total : {stock_total}")
Bonus : Exportation des Données Modifiées en CSV
Recréer un fichier CSV à partir des données modifiées du JSON.
Solution
# Écriture du fichier CSV à partir des données JSON
with open('produits_modifies.csv', 'w', newline='', encoding='utf-8') as fichier_csv:
writer = csv.DictWriter(fichier_csv, fieldnames=produits[0].keys())
writer.writeheader()
writer.writerows(produits)
TP 2 : Programmation Réseau avec Python
Partie 1 : Création d’un Serveur TCP
Le serveur doit écouter sur une adresse IP locale et un port spécifique, recevoir des messages d’un client, et répondre avec une confirmation.
Solution
import socket
# Création du serveur TCP
serveur = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serveur.bind(('127.0.0.1', 12345))
serveur.listen(1)
print("Serveur en attente de connexion...")
connexion, adresse = serveur.accept()
print(f"Connecté à {adresse}")
# Boucle pour recevoir et répondre aux messages
while True:
message = connexion.recv(1024).decode('utf-8')
if not message or message.lower() == 'quit':
print("Fermeture de la connexion.")
break
print(f"Message reçu : {message}")
connexion.sendall("Message reçu".encode('utf-8'))
connexion.close()
serveur.close()
Partie 2 : Création du client TCP
Le client doit se connecter au serveur, envoyer un message, puis afficher la réponse du serveur.
Solution
import socket
# Création du client TCP
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 12345))
# Envoi d'un message
message = input("Entrez un message à envoyer au serveur : ")
client.sendall(message.encode('utf-8'))
# Réception de la réponse
reponse = client.recv(1024).decode('utf-8')
print(f"Réponse du serveur : {reponse}")
client.close()
Partie 3 : Communication UDP
- Le serveur UDP
Solution
Le serveur UDP doit écouter sur un port et répondre à chaque message reçu.
import socket
# Création du serveur UDP
serveur = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serveur.bind(('127.0.0.1', 12345))
print("Serveur UDP en attente de messages...")
while True:
message, adresse = serveur.recvfrom(1024)
print(f"Message reçu de {adresse} : {message.decode('utf-8')}")
serveur.sendto("Message bien reçu".encode('utf-8'), adresse)
- Le client UDP
Le client UDP envoie un message et attend la réponse du serveur.
Solution
import socket
# Création du client UDP
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
message = input("Entrez un message à envoyer au serveur : ")
client.sendto(message.encode('utf-8'), ('127.0.0.1', 12345))
reponse, _ = client.recvfrom(1024)
print(f"Réponse du serveur : {reponse.decode('utf-8')}")
client.close()
Partie 4 : Analyse Simple des Réseaux
- Récupération d’une Adresse IP depuis un Nom de Domaine
Écrire un script qui demande à l’utilisateur un nom de domaine et affiche son adresse IP.
Solution
import socket
nom_domaine = input("Entrez un nom de domaine : ")
adresse_ip = socket.gethostbyname(nom_domaine)
print(f"L'adresse IP de {nom_domaine} est : {adresse_ip}")
- Vérification de Ports Ouverts
Écrire un script qui scanne les ports d’une machine locale pour vérifier s’ils sont ouverts.
Solution
import socket
hote = '127.0.0.1'
ports_a_verifier = [22, 80, 443, 12345]
for port in ports_a_verifier:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
resultat = s.connect_ex((hote, port))
if resultat == 0:
print(f"Port {port} : OUVERT")
else:
print(f"Port {port} : FERMÉ")
Partie 5 : Serveur Multi-Clients (Bonus)
Modifier le serveur TCP pour gérer plusieurs connexions en parallèle avec des threads.
Solution
import socket
import threading
def gerer_client(connexion, adresse):
print(f"Nouvelle connexion : {adresse}")
while True:
message = connexion.recv(1024).decode('utf-8')
if not message or message.lower() == 'quit':
break
print(f"Message de {adresse} : {message}")
connexion.sendall("Message reçu".encode('utf-8'))
connexion.close()
print(f"Connexion fermée : {adresse}")
serveur = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serveur.bind(('127.0.0.1', 12345))
serveur.listen(5)
print("Serveur en attente de connexions...")
while True:
connexion, adresse = serveur.accept()
thread = threading.Thread(target=gerer_client, args=(connexion, adresse))
thread.start()
TP 3 : Introduction IA avec tensorflow
Étape 1 : Chargement des données
- TensorFlow propose des datasets intégrés. Utilisez le dataset MNIST (chiffres manuscrits).
# Install: pip install tensorflow-datasets
import tensorflow as tf
import tensorflow_datasets as tfds
mnist_data = tfds.load("mnist")
# Charger les données et les séparer en ensembles d'entraînement et de test
mnist_train, mnist_test = mnist_data["train"], mnist_data["test"]
batch_size = 32
train_size = mnist_train.cardinality().numpy() * batch_size
test_size = mnist_test.cardinality().numpy() * batch_size
print(f"Taille du jeu d'entraînement : {train_size} échantillons")
print(f"Taille du jeu de test : {test_size} échantillons")
Étape 2 : Prétraitement des données
- Normalisez les images pour que les valeurs des pixels soient comprises entre 0 et 1.
- Transformez les labels (y) en vecteurs one-hot encodés.
def preprocess(features):
image = features["image"]
label = features["label"]
image = tf.cast(image, tf.float32) / 255.0 # Normalize to [0, 1]
return image, label
# Prepare the training and test datasets
batch_size = 32
mnist_train = mnist_train.map(preprocess).shuffle(10000).batch(batch_size)
mnist_test = mnist_test.map(preprocess).batch(batch_size)
Étape 3 : Création du modèle
- Créez un modèle simple avec une couche d'entrée flatten, une couche cachée dense avec activation ReLU, et une couche de sortie softmax.
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense
model = Sequential([
Flatten(input_shape=(28, 28, 1)),
Dense(128, activation='relu'),
Dense(10, activation='softmax')
])
Étape 4 : Compilation et entraînement
- Compilez le modèle avec une fonction de perte
categorical_crossentropy, un optimiseuradam, et mesurez l’accuracy. - Entraînez le modèle sur les données d’entraînement avec validation sur les données de test.
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
model.fit(mnist_train, epochs=5)
Étape 5 : Évaluation du modèle
- Évaluez la performance du modèle sur le jeu de test.
- Affichez les résultats et tracez les courbes de perte et d’accuracy.
test_loss, test_acc = model.evaluate(mnist_test)
print(f"Précision sur le jeu de test : {test_acc:.2f}")
import matplotlib.pyplot as plt
# Evaluate the model
test_loss, test_acc = model.evaluate(mnist_test)
# Plot the results
metrics = ['Test Loss', 'Test Accuracy']
values = [test_loss, test_acc]
# Create a bar chart
plt.figure(figsize=(8, 5))
plt.bar(metrics, values, color=['skyblue', 'lightgreen'])
plt.ylim(0, 1) # Accuracy is between 0 and 1, so we set limits for better visualization
plt.title('Model Evaluation Metrics')
plt.ylabel('Value')
plt.text(0, values[0] + 0.02, f'{values[0]:.4f}', ha='center')
plt.text(1, values[1] + 0.02, f'{values[1]:.4f}', ha='center')
# Display the plot
plt.show()
TP 4 : Utilisation de Docker + Python (automatisation)
pip install dockerPartie 1 : Automatisation du Démarrage et de l’Arrêt des Conteneurs
1. Script Python pour Démarrer un Conteneur Docker
Le script suivant démarre un conteneur à partir d’une image donnée (par exemple, nginx).
Solution
import docker
# Connexion au daemon Docker
client = docker.from_env()
def demarrer_conteneur(image_name):
try:
conteneur = client.containers.run(image_name, detach=True, name="mon_conteneur")
print(f"Conteneur démarré : {conteneur.short_id}")
except docker.errors.APIError as e:
print(f"Erreur lors du démarrage : {e}")
# Démarrer un conteneur nginx
demarrer_conteneur("nginx")
2. Script Python pour Arrêter et Supprimer un Conteneur
Le script suivant arrête et supprime le conteneur démarré précédemment.
Solution
def arreter_et_supprimer_conteneur(container_name):
try:
conteneur = client.containers.get(container_name)
conteneur.stop()
conteneur.remove()
print(f"Conteneur arrêté et supprimé : {container_name}")
except docker.errors.NotFound:
print(f"Conteneur {container_name} introuvable.")
except docker.errors.APIError as e:
print(f"Erreur : {e}")
# Arrêter et supprimer le conteneur nommé "mon_conteneur"
arreter_et_supprimer_conteneur("mon_conteneur")
Partie 2 : Gestion des Logs des Conteneurs
1. Affichage des Logs d’un Conteneur en Temps Réel
Ce script affiche les logs en continu pour un conteneur spécifique.
Solution
def afficher_logs(container_name):
try:
conteneur = client.containers.get(container_name)
for ligne in conteneur.logs(stream=True):
print(ligne.decode('utf-8').strip())
except docker.errors.NotFound:
print(f"Conteneur {container_name} introuvable.")
# Afficher les logs du conteneur nommé "mon_conteneur"
afficher_logs("mon_conteneur")
2. Sauvegarde des Logs dans un Fichier
Le script suivant enregistre les logs d’un conteneur dans un fichier.
Solution
def sauvegarder_logs(container_name, fichier_log):
try:
conteneur = client.containers.get(container_name)
with open(fichier_log, 'w') as fichier:
fichier.write(conteneur.logs().decode('utf-8'))
print(f"Logs sauvegardés dans {fichier_log}.")
except docker.errors.NotFound:
print(f"Conteneur {container_name} introuvable.")
# Sauvegarder les logs du conteneur "mon_conteneur" dans un fichier
sauvegarder_logs("mon_conteneur", "logs_conteneur.txt")
Partie 3 : Automatisation Avancée
1. Automatiser le Démarrage de Plusieurs Conteneurs
Démarre plusieurs conteneurs à partir d’une liste d’images.
def demarrer_conteneurs(images):
for image in images:
demarrer_conteneur(image)
# Démarrer des conteneurs pour plusieurs images
images = ["nginx", "redis", "alpine"]
demarrer_conteneurs(images)
- Nettoyage Automatique des Conteneurs Arrêtés
Ce script supprime tous les conteneurs arrêtés.
def nettoyage_conteneurs_arretes():
conteneurs = client.containers.list(all=True)
for conteneur in conteneurs:
if conteneur.status == 'exited':
print(f"Suppression du conteneur : {conteneur.name}")
conteneur.remove()
# Nettoyage des conteneurs arrêtés
nettoyage_conteneurs_arretes()
Partie 4 : Créer un Service Automatisé avec systemd
1. Fichier de Service systemd
Crée un fichier /etc/systemd/system/gestion_docker.service :
[Unit]
Description=Service d'automatisation Docker
After=docker.service
[Service]
ExecStart=/usr/bin/python3 /chemin/vers/script_docker.py
Restart=always
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable gestion_docker.service
sudo systemctl start gestion_docker.service
Calcul Scientifique avec Python
pip install numpy scipy matplotlib
Partie 1 : Manipulation des Matrices avec NumPy
1. Création et Opérations sur les Matrices
Crée deux matrices et effectue des opérations basiques comme l'addition, la multiplication, et le calcul du déterminant.
import numpy as np
# Création de matrices
A = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
B = np.array([[9, 8, 7], [6, 5, 4], [3, 2, 1]])
# Addition
C = A + B
print("Addition des matrices :\n", C)
# Multiplication élément par élément
D = A * B
print("Multiplication élément par élément :\n", D)
# Produit matriciel
E = np.dot(A, B)
print("Produit matriciel :\n", E)
# Calcul du déterminant
det_A = np.linalg.det(A)
print("Déterminant de A :", det_A)
2. Résolution d’un Système Linéaire
Résolvons le système d’équations suivant :
2x+y−z=8
−3x−y+2z=−11
−2x+y+2z=−3
# Matrice des coefficients
coefficients = np.array([[2, 1, -1], [-3, -1, 2], [-2, 1, 2]])
# Matrice des constantes
constantes = np.array([8, -11, -3])
# Résolution du système
solution = np.linalg.solve(coefficients, constantes)
print("Solution du système : x = {}, y = {}, z = {}".format(*solution))
Partie 2 : Calculs Avancés avec SciPy
1. Calcul d’Intégrale
Calculons l’intégrale de la fonction f(x) = x^2 entre 0 et 4.
from scipy.integrate import quad
# Définition de la fonction
def f(x):
return x**2
# Calcul de l'intégrale
resultat, erreur = quad(f, 0, 4)
print("Résultat de l'intégrale :", resultat)
2. Résolution d’une Équation Différentielle
Résolvons l’équation différentielle suivante :
dy / dt =−2y ,y(0)=1
from scipy.integrate import solve_ivp
# Définition de l'équation différentielle
def equation(t, y):
return -2 * y
# Résolution de l'équation
solution = solve_ivp(equation, [0, 5], [1], t_eval=np.linspace(0, 5, 100))
print("Solution :\n", solution.y[0])Partie 3 : Visualisation des Données avec Matplotlib
1. Tracé d’une Fonction
Traçons la fonction f(x)=sin(x)
import matplotlib.pyplot as plt
# Définition des données
x = np.linspace(0, 2 * np.pi, 100)
y = np.sin(x)
# Tracé
plt.plot(x, y, label='sin(x)')
plt.title('Tracé de sin(x)')
plt.xlabel('x')
plt.ylabel('sin(x)')
plt.legend()
plt.grid(True)
plt.show()
2. Visualisation des Solutions de l’Équation Différentielle
Représentons graphiquement la solution obtenue précédemment.
# Tracé de la solution
plt.plot(solution.t, solution.y[0], label='Solution de dy/dt = -2y')
plt.title('Solution de l\'équation différentielle')
plt.xlabel('Temps (t)')
plt.ylabel('y(t)')
plt.legend()
plt.grid(True)
plt.show()
Partie 4 : Analyse Statistique
1. Calcul de Statistiques de Base
Analyse des données suivantes : [10, 20, 30, 40, 50].
# Données
donnees = np.array([10, 20, 30, 40, 50])
# Calcul des statistiques
moyenne = np.mean(donnees)
mediane = np.median(donnees)
ecart_type = np.std(donnees)
print(f"Moyenne : {moyenne}, Médiane : {mediane}, Écart-type : {ecart_type}")
TP : Connexion avec la base de donnée MYSQL
pip install mysql-connector-python
Partie 1 : Connexion à une Base de Données MySQL
Établissons une connexion à une base de données MySQL locale.
import mysql.connector
# Connexion à MySQL
conn = mysql.connector.connect(
host="localhost",
user="root", # Remplacez par votre nom d'utilisateur
password="password" # Remplacez par votre mot de passe
)
if conn.is_connected():
print("Connexion réussie à MySQL")
else:
print("Échec de la connexion")
Partie 2 : Création d'une Base de Données et d'une Table
Créons une base de données nommée tp_mysql et une table etudiants.
# Création d'un curseur
cursor = conn.cursor()
# Création de la base de données
cursor.execute("CREATE DATABASE IF NOT EXISTS tp_mysql")
# Connexion à la base de données créée
conn.database = "tp_mysql"
# Création de la table
cursor.execute("""
CREATE TABLE IF NOT EXISTS etudiants (
id INT AUTO_INCREMENT PRIMARY KEY,
nom VARCHAR(255),
age INT,
filiere VARCHAR(255)
)
""")
print("Table 'etudiants' créée.")
Partie 3 : Insertion de Données
Ajoutons des enregistrements dans la table etudiants.
# Requête d'insertion
sql = "INSERT INTO etudiants (nom, age, filiere) VALUES (%s, %s, %s)"
valeurs = [
("Alice", 20, "Informatique"),
("Bob", 22, "Mathématiques"),
("Charlie", 21, "Physique")
]
# Exécution de la requête
cursor.executemany(sql, valeurs)
conn.commit()
print(f"{cursor.rowcount} enregistrements insérés.")
Partie 4 : Lecture des Données
Récupérons les données de la table etudiants.
# Requête de sélection
cursor.execute("SELECT * FROM etudiants")
resultats = cursor.fetchall()
# Affichage des résultats
for etudiant in resultats:
print(etudiant)
Partie 5 : Mise à Jour des Données
Modifions l'age d'un étudiant
# Requête de mise à jour
sql = "UPDATE etudiants SET age = %s WHERE nom = %s"
valeurs = (23, "Alice")
# Exécution de la requête
cursor.execute(sql, valeurs)
conn.commit()
print(f"{cursor.rowcount} enregistrement(s) mis à jour.")
Partie 6 : Suppression de Données
Supprimons un étudiant de la table.
# Requête de suppression
sql = "DELETE FROM etudiants WHERE nom = %s"
valeurs = ("Bob",)
# Exécution de la requête
cursor.execute(sql, valeurs)
conn.commit()
print(f"{cursor.rowcount} enregistrement(s) supprimé(s).")
Partie 7 : Gestion des Erreurs et Fermeture de la Connexion
Assurons-nous de gérer les erreurs et de fermer proprement la connexion.
# Gestion des erreurs
try:
cursor.execute("SELECT * FROM etudiants")
except mysql.connector.Error as err:
print(f"Erreur : {err}")
# Fermeture des connexions
cursor.close()
conn.close()
print("Connexion fermée.")
Exercices Complémentaires :
Ajoutez de nouvelles colonnes dans la table etudiants pour stocker des informations supplémentaires, comme l'adresse ou le numéro de téléphone.
Effectuez des requêtes filtrées pour récupérer uniquement les étudiants d’une certaine filière.
Ajoutez des contraintes comme des clés étrangères en créant une nouvelle table.
Map Reduce
# Sum of Square
# tasks.py
from celery import Celery
app = Celery('mapreduce', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def square(n):
return n * n
@app.task
def reduce_sum(values):
return sum(values)
# main.py
from tasks import square, reduce_sum
from celery.result import GroupResult
from celery import group
def map_reduce(data):
# Step 1: Map Phase (square each number)
map_jobs = group(square.s(n) for n in data)()
map_results = map_jobs.get()
print("Mapped results:", map_results)
# Step 2: Reduce Phase (sum the results)
reduce_job = reduce_sum.delay(map_results)
final_result = reduce_job.get()
print("Reduced result:", final_result)
return final_result
if __name__ == '__main__':
numbers = list(range(1, 6)) # [1, 2, 3, 4, 5]
result = map_reduce(numbers)
print("Sum of squares:", result)
python -m celery -A tasks worker --pool=solo -l infoTP : gRPC python
Partie 1
Dans cette étape, vous concevez l’interface publique de votre microservice en utilisant le langage de description d’interface de Protocol Buffers (proto3). Vous définissez :
- Les messages échangés :
NoteRequest,StudentRequest, etc. - Les méthodes du service :
AddNote,GetNotes,GetAverage.
Cela permet de générer automatiquement les classes nécessaires pour que le client et le serveur puissent communiquer.
grpc-notes/
├── client/
│ └── client.py
├── generated/
│ ├── notes_pb2.py
│ └── notes_pb2_grpc.py
├── proto/
│ └── notes.proto
├── server/
│ ├── notes_service.py
│ └── server.py
├── requirements.txt
└── README.mdsyntax = "proto3";
package notes;
service NotesService {
rpc AddNote (NoteRequest) returns (NoteResponse);
rpc GetNotes (StudentRequest) returns (NotesList);
rpc GetAverage (StudentRequest) returns (AverageResponse);
}
message NoteRequest {
string student_id = 1;
float note = 2;
}
message StudentRequest {
string student_id = 1;
}
message NoteResponse {
string message = 1;
}
message NotesList {
repeated float notes = 1;
}
message AverageResponse {
float average = 1;
}
proto/notes.proto
Vous installez les dépendances Python (grpcio, grpcio-tools) nécessaires au fonctionnement de gRPC en Python.
Ensuite, vous utilisez la commande protoc pour générer deux fichiers Python à partir de notes.proto :
notes_pb2.py: contient les définitions de messages.notes_pb2_grpc.py: contient les stubs du service (interface client et base du serveur).
Ces fichiers sont utilisés pour écrire le serveur et le client.
grpcio
grpcio-tools
requirements.txt
python -m grpc_tools.protoc -Iproto --python_out=generated --grpc_python_out=generated proto/notes.proto
Partie 2
Vous implémentez une classe NotesService qui hérite du service gRPC généré (NotesServiceServicer). Cette classe contient :
- une méthode pour ajouter une note,
- une pour récupérer toutes les notes d’un étudiant,
- une pour calculer la moyenne.
Les données sont stockées en mémoire dans un dictionnaire Python ({student_id: [notes]}).
Ensuite, vous configurez et démarrez le serveur avec grpc.server(), sur le port 50051, dans server.py.
from generated import notes_pb2, notes_pb2_grpc
class NotesService(notes_pb2_grpc.NotesServiceServicer):
def __init__(self):
self.notes = {}
def AddNote(self, request, context):
student_id = request.student_id
note = request.note
self.notes.setdefault(student_id, []).append(note)
return notes_pb2.NoteResponse(message=f"Note {note} ajoutée pour {student_id}.")
def GetNotes(self, request, context):
student_id = request.student_id
notes = self.notes.get(student_id, [])
return notes_pb2.NotesList(notes=notes)
def GetAverage(self, request, context):
student_id = request.student_id
notes = self.notes.get(student_id, [])
if not notes:
return notes_pb2.AverageResponse(average=0.0)
avg = sum(notes) / len(notes)
return notes_pb2.AverageResponse(average=avg)
server/notes_service.py
from concurrent import futures
import grpc
import time
from generated import notes_pb2_grpc
from server.notes_service import NotesService
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
notes_pb2_grpc.add_NotesServiceServicer_to_server(NotesService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Serveur gRPC lancé sur le port 50051.")
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
print("Arrêt du serveur.")
server.stop(0)
if __name__ == '__main__':
serve()
server/server.py
Partie 3
Le client gRPC :
- Établit une connexion avec le serveur (
localhost:50051), - Utilise le stub généré pour envoyer des requêtes au serveur,
- Affiche les réponses dans un menu interactif.
Trois fonctionnalités sont disponibles :
- Ajouter une note à un étudiant.
- Afficher toutes les notes d’un étudiant.
- Calculer la moyenne des notes d’un étudiant.
Le client permet de tester l’ensemble des fonctionnalités exposées par le serveur.
import grpc
from generated import notes_pb2, notes_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = notes_pb2_grpc.NotesServiceStub(channel)
while True:
print("\n--- Menu ---")
print("1. Ajouter une note")
print("2. Voir les notes d’un étudiant")
print("3. Calculer la moyenne")
print("4. Quitter")
choice = input("Votre choix : ")
if choice == "1":
sid = input("ID étudiant : ")
note = float(input("Note : "))
resp = stub.AddNote(notes_pb2.NoteRequest(student_id=sid, note=note))
print(resp.message)
elif choice == "2":
sid = input("ID étudiant : ")
resp = stub.GetNotes(notes_pb2.StudentRequest(student_id=sid))
print("Notes :", resp.notes)
elif choice == "3":
sid = input("ID étudiant : ")
resp = stub.GetAverage(notes_pb2.StudentRequest(student_id=sid))
print("Moyenne :", resp.average)
elif choice == "4":
break
else:
print("Option invalide.")
if __name__ == '__main__':
run()
client/client.py
Partie 4
Vous commencez par générer le code gRPC avec protoc.
Puis vous lancez le serveur (server.py) et, dans une autre console, le client (client.py).
Vous pouvez ensuite interagir avec le service via le menu du client. Chaque action déclenche une requête gRPC.
## Démarer le serveur
python server/server.py
## Démarer le client
python client/client.pyTP : Numba
Dans ce TP, vous découvrirez comment optimiser les performances d'un programme Python à l'aide de la bibliothèque Numba, un compilateur JIT (Just-In-Time) qui permet d’accélérer considérablement l'exécution de certaines fonctions, notamment celles impliquant des calculs intensifs ou des boucles lourdes.
À travers plusieurs exercices progressifs, vous comparerez le temps d'exécution de fonctions classiques en Python avec leurs équivalents optimisés par Numba. Vous utiliserez à la fois des décorateurs simples comme @njit et des fonctions traitant des tableaux NumPy. Le TP se termine par une mise en œuvre graphique d’un ensemble de Mandelbrot, montrant visuellement les avantages de Numba.
- nopython=True
- Active le mode "nopython", dans lequel tout le code est compilé en machine, sans tomber en mode Python. C’est le mode le plus rapide.
- parallel=True
- Active l'exécution parallèle automatique (multi-cœurs) lorsque c’est possible. Nécessite prange() dans les boucles.
- cache=True
- Active la mise en cache du code compilé pour réutilisation sans recompilation lors des prochains lancements.
- nogil=True
- Permet l’exécution sans verrou Python (GIL), utile dans un contexte multi-thread.
pip install numba numpy matplotlibStructure attendue
numba-tp/
├── exercice_1_basique.py
├── exercice_2_numpy.py
├── exercice_3_fractale_mandelbrot.py
└── requirements.txt
Partie 1 :
Vous allez comparer deux fonctions qui calculent une factorielle de manière répétée :
- Une fonction Python classique, volontairement lente.
- Une version compilée avec Numba (
@njit).
Vous mesurerez le temps d’exécution dans chaque cas afin de quantifier le gain de performance.
import time
from numba import jit
def slow_factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
@jit(nopython=True)
def fast_factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
N = 10_000
start = time.time()
for _ in range(1000):
slow_factorial(500)
print("Sans Numba :", time.time() - start, "secondes")
start = time.time()
for _ in range(1000):
fast_factorial(500)
print("Avec Numba :", time.time() - start, "secondes")
Partie 2 :
Dans cette partie, vous allez appliquer une opération mathématique complexe (combinaison de puissance, sinus, logarithme) à un très grand tableau NumPy (10 millions d’éléments). Vous comparerez trois versions :
- Traitement NumPy vectorisé seul,
- Traitement Python avec une boucle,
- Traitement Python + boucle optimisée avec Numba.
import numpy as np
import time
from numba import njit
@njit
def compute(data):
result = np.empty_like(data)
for i in range(data.shape[0]):
result[i] = data[i] ** 1.5 + np.sin(data[i]) - np.log1p(data[i])
return result
data = np.linspace(0.01, 100, 10_000_000)
# Version NumPy seule
start = time.time()
result_numpy = data ** 1.5 + np.sin(data) - np.log1p(data)
print("NumPy :", time.time() - start, "s")
# Version Numba
start = time.time()
result_numba = compute(data)
print("Numba :", time.time() - start, "s")
Partie 3 :
Vous implémentez une version simple du générateur de l’ensemble de Mandelbrot. Ce type de fractale nécessite de nombreuses itérations sur chaque pixel de l’image. Sans Numba, l’affichage est lent. Avec Numba, vous observerez une nette amélioration des performances, rendant la génération interactive.
import numpy as np
import matplotlib.pyplot as plt
from numba import njit
import time
@njit
def mandelbrot(width, height, max_iter):
result = np.zeros((height, width), dtype=np.uint8)
for x in range(width):
for y in range(height):
zx, zy = 0.0, 0.0
cx = (x - width / 2) * 4.0 / width
cy = (y - height / 2) * 4.0 / width
i = 0
while zx * zx + zy * zy < 4 and i < max_iter:
tmp = zx * zx - zy * zy + cx
zy, zx = 2.0 * zx * zy + cy, tmp
i += 1
result[y, x] = i
return result
WIDTH, HEIGHT = 800, 600
MAX_ITER = 100
start = time.time()
image = mandelbrot(WIDTH, HEIGHT, MAX_ITER)
print("Temps de calcul :", time.time() - start, "secondes")
plt.imshow(image, cmap='inferno')
plt.title("Fractale Mandelbrot accélérée avec Numba")
plt.axis('off')
plt.show()
TP : Corruption de python
En CPython (l'implémentation standard de Python), les objets int sont représentés par la structure C PyLongObject. Les petits entiers (entre -5 et 256) sont internés — c’est-à-dire partagés globalement pour des raisons de performance.
Ce TP manipule directement la mémoire de l’objet 1, pour altérer sa valeur de manière globale dans l’interpréteur.
- Importation de
ctypes: pour accéder bas-niveau à la mémoire. - Obtention de l’adresse mémoire de l’objet
1viaid(one). - Décalage mémoire de 24 octets pour accéder au champ
ob_digit[0](la "valeur" de l'entier, codée en base 2³⁰ sur les systèmes 64 bits). - Modification de la valeur de l'entier
1en mémoire :digit_ptr[0] = 2. - Observation des effets de la corruption : par exemple,
1 + 1ne vaut plus 2.
import ctypes
# ⚠️ WARNING: This will break Python
# All uses of 1 will now act weird
one = 1
print("Before corruption: 1 + 1 =", 1 + 1)
print("Before corruption: one + one =", one + one)
# Get memory address of `1`
addr = id(one)
# PyLongObject internals:
# [ ob_refcnt | ob_type | ob_size | ob_digit[0] ... ]
# On 64-bit systems, that's 24 bytes offset to the digits
digit_ptr = ctypes.cast(addr + 24, ctypes.POINTER(ctypes.c_uint32))
# Modify the digit
digit_ptr[0] = 2 # Now, `1` becomes 2 everywhere
a = 1
b = 1
print("Var after corruption a + b = ", a + b)
print("After corruption: 1 + 1 =", 1 + 1)
print("Before corruption: one + one =", one + one)
print("True == 1:", True == 1)TP : Lire / Écrire un fichier binaire via memoryview
Comprendre comment manipuler un fichier binaire contenant des structures typées (int, float) sans recopier la mémoire. Cela est utile en data science, réseaux, ou systèmes embarqués pour accéder à des données binaires efficacement.
Étape 1 – Générer un fichier binaire contenant des paires (int, float)
On va utiliser le module struct pour transformer des données Python en bytes de manière structurée (comme en C). Ici, chaque structure contiendra un int suivi d’un float.
import struct
with open("data.bin", "wb") as f:
for i in range(10):
data = struct.pack("if", i, i * 1.5) # 'i' = int32, 'f' = float32
f.write(data)Ce que fait ce code :
struct.pack("if", i, f)convertit deux valeurs en 8 octets binaires.- On écrit 10 blocs de 8 octets dans un fichier.
Étape 2 – Lire le fichier et créer un memoryview
Plutôt que de parser les bytes un à un ou de faire des copies, on crée une vue mémoire sur le contenu lu, ce qui permet une lecture ultra efficace sans recopier les données.
with open("data.bin", "rb") as f:
content = f.read() # contenu binaire brut
view = memoryview(content)Ce que fait ce code :
f.read()lit tous les octets du fichier.memoryview(content)permet d’accéder aux données en lecture seule sans les copier.
Étape 3 – Parcourir les structures via memoryview
On lit les données par blocs de 8 octets (4 pour l’int, 4 pour le float), puis on les décode avec struct.unpack().
for i in range(0, len(view), 8):
int_part, float_part = struct.unpack("if", view[i:i+8])
print(int_part, float_part)
Ce que fait ce code :
view[i:i+8]sélectionne une tranche de 8 octets.struct.unpack("if", ...)décode ces octets en deux valeurs Python.
TP : Injecter une DLL/SO via ctypes.CDLL
Apprendre à utiliser ctypes pour charger dynamiquement une bibliothèque partagée (.so ou .dll) et appeler ses fonctions depuis Python.
Étape 1 – Créer une bibliothèque partagée en C
On crée une fonction C (triple) simple, puis on la compile pour qu'elle soit accessible à Python.
// libmath.c
int triple(int x) {
return x * 3;
}gcc -shared -o libmath.so -fPIC libmath.c # LinuxLa fonction triple prend un entier et renvoie sa valeur triplée. -fPIC génère du code position-independent, requis pour les bibliothèques partagées.
Étape 2 – Charger la bibliothèque avec ctypes.CDLL
import ctypes
lib = ctypes.CDLL("./libmath.so")CDLL charge la bibliothèque dynamique pour pouvoir utiliser ses fonctions.
On charge le fichier compilé libmath.so et on le lie à l’objet lib.
Étape 3 – Appeler une fonction depuis Python
lib.triple.argtypes = [ctypes.c_int]
lib.triple.restype = ctypes.c_int
res = lib.triple(7)
print("Résultat :", res) # 21
.argtypesdéfinit les types d'arguments attendus..restypedéfinit le type de retour.- On déclare que la fonction
tripleprend unintet retourne unint. Ensuite, on l’appelle avec l’argument7.
TP : Microbench entre +, +=, sum(), np.sum(), numba
On compare les performances de différentes méthodes de somme (+, +=, sum(), np.sum(), numba) sur un grand tableau.
Étape 1 – Générer les données
import numpy as np
arr = np.arange(10_000_000, dtype=np.float64)
On prépare un grand vecteur pour simuler une charge réaliste de calcul en générant un tableau de 10 millions de nombres flottants consécutifs.
Étape 2 – Benchmark + (for classique)
import time
t0 = time.time()
s = 0.0
for x in arr:
s = s + x
print("for + :", time.time() - t0)
Étape 3 – Benchmark +=
t0 = time.time()
s = 0.0
for x in arr:
s += x
print("for += :", time.time() - t0)
Étape 4 – Benchmark sum()
t0 = time.time()
s = sum(arr)
print("sum() :", time.time() - t0)Étape 5 – Benchmark np.sum
t0 = time.time()
s = np.sum(arr)
print("np.sum() :", time.time() - t0)
Étape 6 – Benchmark avec numba
from numba import njit
@njit
def fast_sum(arr):
total = 0.0
for x in arr:
total += x
return total
t0 = time.time()
s = fast_sum(arr)
print("numba sum :", time.time() - t0)
Explications
+,+=sont lents car non vectorisés.sum()est en pur Python.np.sum()est optimisé C.numbacompile en code natif avec LLVM.
TP : Espionner une fonction avec sys.settrace
Utiliser sys.settrace pour suivre l’exécution ligne par ligne d’un script.
Étape 1 – Définir une fonction trace
import sys
def tracer(frame, event, arg):
if event == "line":
lineno = frame.f_lineno
print(f"Exécution de la ligne {lineno}")
return tracer
On utilise le système de hook interne pour capturer l'exécution des lignes.
Étape 2 – Définir un code simple à tracer
def foo():
a = 1
b = 2
c = a + b
print(c)
Étape 3 – Activer le tracer et appeler la fonction
sys.settrace(tracer)
foo()
sys.settrace(None)
settrace() active le hook, puis on exécute foo(), ligne par ligne.
TP : Générer du code Python à l’exécution (exec, compile)
Utiliser compile() pour générer dynamiquement du code et l’exécuter avec exec.
Étape 1 – Générer dynamiquement une fonction
code_str = """
def dynamic_func(x):
return x * 2
"""
compiled = compile(code_str, "<string>", "exec")
exec(compiled)
print(dynamic_func(5)) # 10
On transforme une chaîne de texte en fonction Python exécutable.
Étape 2 – Compiler une expression
expr = compile("3 * 7 + 1", "<expr>", "eval")
print(eval(expr)) # 22
compile(..., "eval") est utilisé pour les expressions simples (contrairement à "exec").
TP : Manipulation avancée de threads et GIL
Comprendre le GIL (Global Interpreter Lock) et ses effets sur les threads.
Étape 1 – Code multithread simple
import threading
counter = 0
def f():
global counter
for i in range(10**6):
a = counter
counter = a + 1
pass
threads = [threading.Thread(target=f)) for _ in range(4)]
[t.start() for t in threads]
[t.join() for t in threads]
print(counter)On observe que les threads ne sont pas si parallèles à cause du GIL.
Étape 2 – Comparer avec multiprocessing
from multiprocessing import Process
counter = 0
processes = [Process(target=f) for _ in range(4)]
[p.start() for p in processes]
[p.join() for p in processes]
print(counter)On contourne le GIL en créant des processus séparés.
Étape 3 – Utiliser threading.Lock
lock = threading.Lock()
counter = 0
def g():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = [threading.Thread(target=g) for _ in range(4)]
[t.start() for t in threads]
[t.join() for t in threads]
print(counter)
Lock explicite avec threads
TP : Créer une base mémoire + gRPC
Créer un microservice gRPC qui stocke des données en mémoire (clé/valeur).
Étape 1 – Définir le .proto
syntax = "proto3";
service MemoryDB {
rpc Put(Pair) returns (Empty);
rpc Get(Key) returns (Value);
}
message Pair {
string key = 1;
string value = 2;
}
message Key {
string key = 1;
}
message Value {
string value = 1;
}
message Empty {}
On définit les messages et services gRPC. Cela sert d’interface réseau entre client et serveur.
Étape 2 – Générer le code gRPC
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. memory.protoÉtape 3 – Implémenter le serveur
import grpc
from concurrent import futures
import memory_pb2
import memory_pb2_grpc
class Memory(memory_pb2_grpc.MemoryDBServicer):
def __init__(self):
self.store = {}
def Put(self, request, context):
self.store[request.key] = request.value
return memory_pb2.Empty()
def Get(self, request, context):
value = self.store.get(request.key, "")
return memory_pb2.Value(value=value)
server = grpc.server(futures.ThreadPoolExecutor())
memory_pb2_grpc.add_MemoryDBServicer_to_server(Memory(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
Le serveur maintient un dictionnaire Python en RAM et expose les méthodes via gRPC.
Étape 4 – Client
import grpc
import memory_pb2
import memory_pb2_grpc
channel = grpc.insecure_channel("localhost:50051")
stub = memory_pb2_grpc.MemoryDBStub(channel)
stub.Put(memory_pb2.Pair(key="name", value="Alice"))
resp = stub.Get(memory_pb2.Key(key="name"))
print("Value:", resp.value)
TP Python : Manipuler des fichiers Excel avec openpyxl
Objectifs pédagogiques
- Apprendre à lire, écrire et modifier un fichier Excel (
.xlsx) en Python. - Savoir créer des feuilles, ajouter des données, formater des cellules.
- Comprendre comment automatiser des traitements de données dans Excel.
Prérequis
Installer la bibliothèque openpyxl :
pip install openpyxlEtape 1 : Création d’un fichier Excel
- Crée un fichier Excel
rapport.xlsx. - Crée une feuille appelée
"Données".
Solution
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
ws.title = "Données"
wb.save("rapport.xlsx")
Étape 2 : Remplir des données de ventes
- Ajouter une ligne d’en-tête :
"Mois"et"Ventes".
Ajouter les données de ventes suivantes :
| Mois | Ventes |
|---|---|
| Janvier | 1200 |
| Février | 1450 |
| Mars | 1600 |
| Avril | 1300 |
| Mai | 1550 |
| Juin | 1700 |
Solution
from openpyxl import load_workbook
# Charger le fichier
wb = load_workbook("rapport.xlsx")
ws = wb["Données"]
# Ajouter l’en-tête
ws.append(["Mois", "Ventes"])
# Ajouter les données
donnees = [
["Janvier", 1200],
["Février", 1450],
["Mars", 1600],
["Avril", 1300],
["Mai", 1550],
["Juin", 1700]
]
for ligne in donnees:
ws.append(ligne)
wb.save("rapport.xlsx")
Etape 3: Ajouter un total automatique
- Ajouter une ligne
"Total"en dessous du tableau. - Utiliser une formule Excel pour calculer la somme de la colonne des ventes.
Solution
wb = load_workbook("rapport.xlsx")
ws = wb["Données"]
# Ajouter la ligne Total
ws["A8"] = "Total"
ws["B8"] = "=SUM(B2:B7)"
wb.save("rapport.xlsx")
Etape 4: Mise en forme
- Mettre l’en-tête (ligne 1) en gras et fond gris clair.
- Mettre la ligne de total (ligne 8) en gras.
Solution
from openpyxl.styles import Font, PatternFill
wb = load_workbook("rapport.xlsx")
ws = wb["Données"]
# Style de l’en-tête
header_font = Font(bold=True)
header_fill = PatternFill(start_color="DDDDDD", fill_type="solid")
for cell in ws[1]: # Ligne 1
cell.font = header_font
cell.fill = header_fill
# Style de la ligne Total
for cell in ws[8]: # Ligne 8
cell.font = Font(bold=True)
wb.save("rapport.xlsx")
Etape 5 : Ajouter une colonne conditionnelle "Niveau"
- Ajouter une colonne
"Niveau"à droite des ventes. - Si la vente > 1300, écrire
"Haute", sinon"Basse"via une formule Excel.
Solution
wb = load_workbook("rapport.xlsx")
ws = wb["Données"]
# Ajouter le titre de la colonne
ws["C1"] = "Niveau"
# Ajouter les formules conditionnelles
for i in range(2, 8): # lignes 2 à 7
ws[f"C{i}"] = f'=IF(B{i}>1300, "Haute", "Basse")'
wb.save("rapport.xlsx")
Etape 6 : Créer un graphique
- Créer un graphique en barres affichant les ventes mensuelles.
- Positionner le graphique à droite du tableau (ex. colonne E).
Solution
from openpyxl.chart import BarChart, Reference
wb = load_workbook("rapport.xlsx")
ws = wb["Données"]
# Créer le graphique
chart = BarChart()
chart.title = "Ventes mensuelles"
# Définir les données du graphique
data = Reference(ws, min_col=2, min_row=1, max_row=7) # B1:B7
cats = Reference(ws, min_col=1, min_row=2, max_row=7) # A2:A7
chart.add_data(data, titles_from_data=True)
chart.set_categories(cats)
# Ajouter le graphique
ws.add_chart(chart, "E2")
wb.save("rapport.xlsx")
Etape 7 : Lire et filtrer les données Excel
- Charger les données du fichier.
- Afficher uniquement les mois dont les ventes > 1300€.
Solution
wb = load_workbook("rapport.xlsx", data_only=True)
ws = wb["Données"]
print("Mois avec ventes > 1300 € :")
for row in ws.iter_rows(min_row=2, max_row=7, min_col=1, max_col=2, values_only=True):
mois, vente = row
if vente > 1300:
print(f"{mois} : {vente} €")
TP : Création de graphes en Python avec Matplotlib
Étape 1 : Tracer une courbe simple
🎯 Objectif pédagogique
Découvrir la création d’une figure, tracer une liste de valeurs, comprendre la base de Matplotlib.
📌 Consigne
- Crée une liste contenant les valeurs de 0 à 9.
- Calcule pour chaque valeur son carré.
- Trace la courbe
y = x². - Ajoute un titre au graphique.
Solution :
import matplotlib.pyplot as plt
# 1. Liste de valeurs
x = list(range(10))
# 2. Carré des valeurs
y = [i**2 for i in x]
# 3. Tracé
plt.plot(x, y)
# 4. Titre
plt.title("Courbe de y = x²")
# Affichage
plt.show()
Étape 2 : Ajouter des labels et une grille
🎯 Objectif pédagogique
Apprendre à annoter un graphique : axes, grille, personnalisation simple.
📌 Consigne
Reprends le graphique précédent et ajoute :
- un label pour l’axe X,
- un label pour l’axe Y,
- une grille.
Solution
import matplotlib.pyplot as plt
x = list(range(10))
y = [i**2 for i in x]
plt.plot(x, y)
plt.title("Courbe de y = x²")
# Labels
plt.xlabel("Valeur de x")
plt.ylabel("Valeur de y")
# Grille
plt.grid(True)
plt.show()
Étape 3 : Tracer plusieurs courbes dans la même figure
🎯 Objectif pédagogique
Comprendre comment superposer plusieurs séries de données.
📌 Consigne
- Dans une même figure, trace :
y1 = x²y2 = x³
- Ajoute une légende pour identifier chaque courbe.
Solution
import matplotlib.pyplot as plt
x = list(range(10))
y1 = [i**2 for i in x]
y2 = [i**3 for i in x]
plt.plot(x, y1, label="x²")
plt.plot(x, y2, label="x³")
plt.title("Comparaison entre x² et x³")
plt.xlabel("x")
plt.ylabel("Valeurs")
plt.grid(True)
# Ajout de la légende
plt.legend()
plt.show()
Étape 4 : Barres : créer un graphique à barres
🎯 Objectif pédagogique
Découvrir un autre type de graphique : histogramme / bar chart.
📌 Consigne
- Crée une liste de catégories :
["A", "B", "C", "D"]. - Associe une liste de valeurs au choix.
- Tracer un graphique en barres.
Solution
import matplotlib.pyplot as plt
categories = ["A", "B", "C", "D"]
valeurs = [5, 7, 3, 9]
plt.bar(categories, valeurs)
plt.title("Graphique à barres simple")
plt.xlabel("Catégories")
plt.ylabel("Valeurs")
plt.show()
Étape 5 : Sous-graphes (subplot)
🎯 Objectif pédagogique
Créer plusieurs graphiques dans une seule figure.
📌 Consigne
- Crée une figure contenant 2 graphiques côte à côte.
- À gauche :
y = x² - À droite :
y = x³
Solution
import matplotlib.pyplot as plt
x = list(range(10))
# Création d'une figure avec 1 ligne, 2 colonnes
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
# Graphique de gauche
axes[0].plot(x, [i**2 for i in x])
axes[0].set_title("y = x²")
# Graphique de droite
axes[1].plot(x, [i**3 for i in x])
axes[1].set_title("y = x³")
plt.show()
Étape 6 : Courbes stylisées
🎯 Objectif pédagogique
Apprendre à changer :
- couleur,
- style de ligne,
- style des points.
📌 Consigne
Trace y = sqrt(x) avec :
- une ligne pointillée,
- des points cerclés,
- en rouge,
- un titre.
import matplotlib.pyplot as plt
import math
x = [i for i in range(1, 20)]
y = [math.sqrt(i) for i in x]
plt.plot(x, y, "ro--") # r = red, o = cercle, -- = pointillé
plt.title("y = √x (courbe stylisée)")
plt.xlabel("x")
plt.ylabel("√x")
plt.grid(True)
plt.show()
Étape 7 – Importer des données et les tracer
🎯 Objectif pédagogique
Travailler avec de vraies données.
📌 Consigne
- Crée un fichier
data.txtcontenant une valeur par ligne, par exemple :
3
8
5
12
7
- Charge ces valeurs depuis le fichier.
- Trace un graphique en ligne.
import matplotlib.pyplot as plt
# 1. Lecture du fichier
with open("data.txt", "r") as f:
valeurs = [int(ligne.strip()) for ligne in f]
# 2. Tracé
plt.plot(valeurs, marker="o")
plt.title("Données importées depuis data.txt")
plt.xlabel("Index")
plt.ylabel("Valeur")
plt.grid(True)
plt.show()
🐳 TP : Créer un Dockerfile pour une application Python
Ce TP vous guide pas à pas pour créer une application Python, l’encapsuler dans un conteneur Docker et l’optimiser.
Étape 1 – Créer une application Python simple
🎯 Objectif
Créer un script Python minimal qui servira de base.
📌 Consigne
Créer un fichier app.py affichant "Hello Docker".
✅ Solution
# app.py
print("Hello Docker")
Étape 2 – Ajouter un requirements.txt (optionnel)
🎯 Objectif
Gérer les dépendances Python dans l’image Docker.
📌 Consigne
- Créer
requirements.txt. - Ajouter
requests. - Modifier
app.pypour l’importer.
✅ Solution
requirements.txt
requests==2.32.0
app.py
import requests
print("Hello Docker + Requests")
Étape 3 – Écrire un Dockerfile
🎯 Objectif
Créer un Dockerfile pour construire une image contenant l'application Python.
📌 Consigne
Créer un fichier Dockerfile contenant :
- une image Python de base
- les fichiers copiés
- l'installation des dépendances
- la commande de lancement
✅ Solution
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
COPY app.py .
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python", "app.py"]
Étape 4 – Construire l’image Docker
🎯 Objectif
Construire une image locale basée sur le Dockerfile.
📌 Consigne
Créer une image nommée python-app:1.0.
✅ Solution
docker build -t python-app:1.0 .
Étape 5 – Exécuter le conteneur
🎯 Objectif
Vérifier que l’image fonctionne.
📌 Consigne
Lancer l’image créée.
✅ Solution
docker run --rm python-app:1.0
Étape 6 – Ajouter un serveur web Flask
🎯 Objectif
Apprendre à containeriser une vraie application web Python.
📌 Consigne
- Ajouter Flask dans
requirements.txt - Modifier
app.pypour créer un mini serveur - Exposer le port 5000
- Builder puis lancer l’image
✅ Solution
requirements.txt
flask==3.0.0
app.py
from flask import Flask
app = Flask(__name__)
@app.route("/")
def home():
return "Hello from Flask inside Docker"
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
EXPOSE 5000
CMD ["python", "app.py"]
Construire l’image
docker build -t flask-app:1.0 .
Exécuter le serveur
docker run -p 5000:5000 flask-app:1.0
Étape 7 – Optimiser l’image (bonus)
🎯 Objectif
Découvrir les bonnes pratiques pour réduire la taille de l'image.
📌 Consigne
Optimiser le Dockerfile en utilisant une image slim.
✅ Solution
FROM python:3.11-slim AS base
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
CMD ["python", "app.py"]
🧪 TP : Bonnes pratiques de conception de classes en Python
Ce TP vous guide pour apprendre à structurer des classes propres, lisibles, maintenables et conformes aux bonnes pratiques (PEP8, encapsulation, méthodes, héritage, dataclasses, etc.).
Étape 1 - Créer une classe simple bien structurée
🎯 Objectif
Comprendre la structure minimale d’une classe propre : attributs, constructeur, méthode.
📌 Consigne
Créer une classe Personne avec :
- un attribut
nom - un attribut
age - une méthode
se_presenter()qui affiche :"Je m'appelle X et j'ai Y ans"
Respecter la convention PEP8 :
- noms en
snake_case - classes en
CamelCase
✅ Solution
class Personne:
def __init__(self, nom: str, age: int):
self.nom = nom
self.age = age
def se_presenter(self):
print(f"Je m'appelle {self.nom} et j'ai {self.age} ans.")
Étape 2 - Encapsulation et propriétés (getter/setter)
🎯 Objectif
Apprendre à protéger les attributs internes grâce aux propriétés Python.
📌 Consigne
Dans la classe Personne :
- Rendre l’attribut
ageprivé (self._age) - Empêcher qu’un âge négatif soit défini
- Exposer un
@property age+ un setter
✅ Solution
class Personne:
def __init__(self, nom: str, age: int):
self.nom = nom
self._age = None
self.age = age # passe par le setter
@property
def age(self):
return self._age
@age.setter
def age(self, valeur):
if valeur < 0:
raise ValueError("L'âge ne peut pas être négatif.")
self._age = valeur
Étape 3 - Méthodes de classes, statiques et d’instances
🎯 Objectif
Comprendre la différence entre :
- méthode d’instance (
self) - méthode de classe (
cls) - méthode statique (pas d'argument automatique)
📌 Consigne
Dans Personne :
- Ajouter une méthode de classe
depuis_chaine("Nom,Age") - Ajouter une méthode statique
est_majeur(age) - Utiliser ces méthodes pour créer un objet et tester si la personne est majeure.
✅ Solution
class Personne:
def __init__(self, nom: str, age: int):
self.nom = nom
self.age = age
@classmethod
def depuis_chaine(cls, chaine: str):
nom, age = chaine.split(",")
return cls(nom, int(age))
@staticmethod
def est_majeur(age: int):
return age >= 18
Étape 4 - Bonnes pratiques avec l’héritage
🎯 Objectif
Découvrir un héritage simple, propre et maîtrisé.
📌 Consigne
Créer une classe Employe qui hérite de Personne, avec :
- un attribut supplémentaire :
salaire - une méthode
afficher_salaire()
✅ Solution
class Employe(Personne):
def __init__(self, nom: str, age: int, salaire: float):
super().__init__(nom, age)
self.salaire = salaire
def afficher_salaire(self):
print(f"Salaire : {self.salaire} €")
Étape 5 - Utiliser les dataclasses
🎯 Objectif
Apprendre une alternative propre et moderne pour définir des classes simples.
📌 Consigne
Réécrire la classe Personne en utilisant @dataclass.
⚠️ Rappel
Les dataclasses :
- génèrent automatiquement
__init__,__repr__,__eq__… - sont idéales pour des objets “données”
✅ Solution
from dataclasses import dataclass
@dataclass
class Personne:
nom: str
age: int
Étape 6 - Ajouter des validations dans une dataclass
🎯 Objectif
Comprendre comment valider les données dans une dataclass via __post_init__.
📌 Consigne
Dans une dataclass :
- vérifier que l’âge n’est pas négatif
- lever une exception sinon
✅ Solution
from dataclasses import dataclass
@dataclass
class Personne:
nom: str
age: int
def __post_init__(self):
if self.age < 0:
raise ValueError("L'âge ne peut pas être négatif.")
Étape 7 - Créer une classe propre avec logique métier
🎯 Objectif
Assembler toutes les bonnes pratiques en un seul exercice.
📌 Consigne
Créer une classe CompteBancaire propre, contenant :
- attribut privé
_solde - méthode
deposer(montant) - méthode
retirer(montant)qui empêche le solde négatif - propriété
soldeen lecture seule - méthode
__repr__lisible
✅ Solution
class CompteBancaire:
def __init__(self, titulaire: str, solde_initial: float = 0.0):
self.titulaire = titulaire
self._solde = solde_initial
@property
def solde(self):
return self._solde
def deposer(self, montant: float):
if montant <= 0:
raise ValueError("Le montant doit être positif.")
self._solde += montant
def retirer(self, montant: float):
if montant <= 0:
raise ValueError("Le montant doit être positif.")
if montant > self._solde:
raise ValueError("Fonds insuffisants.")
self._solde -= montant
def __repr__(self):
return f"CompteBancaire(titulaire='{self.titulaire}', solde={self._solde})"
🧪 TP : Visualisation et exploration de la mémoire Python
Ce TP vous permet de comprendre et explorer l’usage de la mémoire dans vos programmes Python à l’aide d’outils comme sys.getsizeof, pympler et tracemalloc.
Étape 1 - Mesurer la taille d’objets simples
🎯 Objectif
Comprendre combien de mémoire est utilisée par les objets Python de base.
📌 Consigne
- Importer le module
sys - Créer différents objets Python : int, float, str, list
- Afficher leur taille mémoire avec
sys.getsizeof()
✅ Solution
import sys
a = 10
b = 3.14
c = "Bonjour"
d = [1, 2, 3, 4, 5]
print("Taille de a (int) :", sys.getsizeof(a), "octets")
print("Taille de b (float) :", sys.getsizeof(b), "octets")
print("Taille de c (str) :", sys.getsizeof(c), "octets")
print("Taille de d (list) :", sys.getsizeof(d), "octets")
Étape 2 - Explorer la mémoire d’une liste et de ses éléments
🎯 Objectif
Voir la différence entre la taille de la structure et des éléments qu’elle contient.
📌 Consigne
- Créer une liste de 1000 entiers
- Afficher la taille de la liste seule
- Calculer la taille totale incluant tous les éléments
✅ Solution
import sys
ma_liste = list(range(1000))
taille_liste = sys.getsizeof(ma_liste)
taille_elements = sum(sys.getsizeof(x) for x in ma_liste)
print("Taille de la liste :", taille_liste, "octets")
print("Taille totale des éléments :", taille_elements, "octets")
print("Taille totale approximative :", taille_liste + taille_elements, "octets")
Étape 3 - Visualiser la mémoire avec pympler
🎯 Objectif
Découvrir un outil spécialisé pour suivre la mémoire occupée par vos objets Python.
📌 Consigne
- Installer
pymplersi nécessaire :
pip install pympler
- Utiliser
asizeofpour mesurer la mémoire totale utilisée par un objet complexe.
✅ Solution
from pympler import asizeof
ma_liste = [list(range(100)) for _ in range(1000)]
print("Mémoire totale de ma_liste :", asizeof.asizeof(ma_liste), "octets")
Étape 4 - Suivre l’allocation mémoire avec tracemalloc
🎯 Objectif
Suivre les allocations mémoire dans votre programme en temps réel.
📌 Consigne
- Importer
tracemalloc - Démarrer le suivi
- Créer des objets volumineux
- Afficher les 5 plus gros emplacements de mémoire
✅ Solution
import tracemalloc
tracemalloc.start()
# Création de données volumineuses
data = [list(range(1000)) for _ in range(1000)]
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("Top 5 lignes consommant le plus de mémoire :")
for stat in top_stats[:5]:
print(stat)
Étape 5 - Comparer l’usage mémoire avant et après suppression d’objets
🎯 Objectif
Observer l’effet du garbage collector sur la mémoire.
📌 Consigne
- Créer un objet volumineux
- Mesurer la mémoire utilisée
- Supprimer l’objet avec
del - Forcer le garbage collector et mesurer à nouveau
✅ Solution
import gc
import tracemalloc
tracemalloc.start()
big_list = [list(range(1000)) for _ in range(1000)]
print("Mémoire après création :", tracemalloc.get_traced_memory()[1], "octets")
del big_list
gc.collect()
print("Mémoire après suppression et GC :", tracemalloc.get_traced_memory()[1], "octets")
Étape 6 - Bonus : Créer une fonction pour visualiser l’usage mémoire d’un objet
🎯 Objectif
Créer un outil réutilisable pour explorer la mémoire de n’importe quel objet.
📌 Consigne
Créer une fonction explorer_memoire(obj) qui affiche :
- taille totale avec
asizeof - taille de l’objet avec
sys.getsizeof
✅ Solution
import sys
from pympler import asizeof
def explorer_memoire(obj):
taille_simple = sys.getsizeof(obj)
taille_totale = asizeof.asizeof(obj)
print(f"Taille simple : {taille_simple} octets")
print(f"Taille totale : {taille_totale} octets")
ma_liste = [list(range(100)) for _ in range(1000)]
explorer_memoire(ma_liste)
TP : Modifier, Filtrer et Valider un CSV avec des Décorateurs en Python
Partie 1 : Lecture d'un fichier CSV
Objectif
Apprendre à lire un fichier CSV ligne par ligne en Python.
Consigne
Créez une fonction lire_csv qui lit un fichier CSV et renvoie chaque ligne sous forme de dictionnaire.
Solution
import csv
def lire_csv(fichier):
with open(fichier, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for ligne in reader:
yield ligne
# Exemple d'utilisation
for ligne in lire_csv('exemple.csv'):
print(ligne)
Partie 2 : Créer un décorateur pour modifier les lignes
Objectif
Appliquer une transformation à chaque ligne du CSV via un décorateur.
Consigne
Créez un décorateur modifier_ligne qui prend une fonction de transformation et l'applique à chaque ligne.
Solution
def modifier_ligne(func):
def wrapper(ligne):
return func(ligne)
return wrapper
@modifier_ligne
def majuscule_nom(ligne):
ligne['nom'] = ligne['nom'].upper()
return ligne
# Exemple
for ligne in lire_csv('exemple.csv'):
print(majuscule_nom(ligne))
Partie 3 : Créer un décorateur pour filtrer les lignes
Objectif
Filtrer certaines lignes selon un critère avec un décorateur.
Consigne
Créez un décorateur filtrer_ligne qui garde uniquement les lignes répondant à une condition.
Solution
def filtrer_ligne(func):
def wrapper(ligne):
if int(ligne['age']) >= 18:
return func(ligne)
return None
return wrapper
@filtrer_ligne
def afficher_adultes(ligne):
return ligne
# Exemple
for ligne in lire_csv('exemple.csv'):
resultat = afficher_adultes(ligne)
if resultat:
print(resultat)
Partie 4 : Créer un décorateur pour valider les lignes
Objectif
Valider que chaque ligne respecte certaines règles.
Consigne
Créez un décorateur valider_ligne qui lève une exception si une règle n’est pas respectée.
Solution
def valider_ligne(func):
def wrapper(ligne):
if 'nom' not in ligne or not ligne['nom']:
raise ValueError('La ligne doit contenir un nom')
return func(ligne)
return wrapper
@valider_ligne
def afficher_ligne_validee(ligne):
return ligne
# Exemple
for ligne in lire_csv('exemple.csv'):
try:
print(afficher_ligne_validee(ligne))
except ValueError as e:
print('Erreur:', e)
Partie 5 : Combiner modification, filtrage et validation
Objectif
Appliquer successivement modification, filtrage et validation sur chaque ligne.
Consigne
Combinez les trois décorateurs pour traiter le CSV en une seule fonction.
Solution
@valider_ligne
@filtrer_ligne
@modifier_ligne
def traiter_ligne(ligne):
ligne['nom'] = ligne['nom'].title()
return ligne
# Exemple
for ligne in lire_csv('exemple.csv'):
resultat = traiter_ligne(ligne)
if resultat:
print(resultat)
TP : Créer un .exe à partir d'un script Python
Partie 1 : Installer PyInstaller
Objectif
Installer l'outil nécessaire pour convertir un script Python en exécutable Windows.
Consigne
Installez PyInstaller via pip.
Solution
pip install pyinstaller
Partie 2 : Créer un script Python simple
Objectif
Préparer un script Python à convertir en .exe.
Consigne
Créez un fichier bonjour.py qui affiche un message.
Solution
# bonjour.py
print("Bonjour, ceci est un script Python converti en exe !")
input("Appuyez sur Entrée pour fermer...")
Partie 3 : Générer un .exe simple
Objectif
Utiliser PyInstaller pour créer un exécutable simple.
Consigne
Exécutez PyInstaller pour générer le fichier .exe.
Solution
pyinstaller --onefile bonjour.py
Partie 4 : Localiser le fichier .exe
Objectif
Trouver et tester l'exécutable généré.
Consigne
Vérifiez dans le dossier dist le fichier bonjour.exe et exécutez-le.
Solution
# Le fichier se trouve ici
dist/bonjour.exe
# Double-cliquez dessus ou exécutez dans le terminal
dist\bonjour.exe
Partie 5 : Ajouter une icône à l'exécutable
Objectif
Personnaliser le .exe avec une icône.
Consigne
Créez une icône .ico et utilisez-la avec PyInstaller.
Solution
pyinstaller --onefile --icon=icone.ico bonjour.py
Partie 6 : Créer un .exe sans console
Objectif
Supprimer la console pour les applications GUI.
Consigne
Utilisez l’option --windowed pour générer un .exe sans console.
Solution
pyinstaller --onefile --windowed --icon=icone.ico bonjour.py
TP : Projet complet : Architecture Python, classes avancées, décorateurs, closures, traitement CSV, serveur web et persistance (MySQL + Docker optionnel)
Partie 1 : Conception et architecture
Objectifs pédagogiques
- Structurer le code en modules clairs.
- Montrer usage de classes, métaclasse, classes abstraites.
- Séparer responsabilité : configuration, modèles, stockage, pipeline, serveur.
Consignes
- Crée un dossier
project/contenant une arborescence claire inspirée de celle proposée dans le sujet. - Organise ton application Python en plusieurs modules :
config.pypour gérer la configuration.models.pypour les classes métier.pipeline.pypour la logique de transformation/validation.storage.pypour la persistance des données.server.pypour exposer une API web.utils.pypour les fonctions génériques.
- Crée un fichier
requirements.txtlistant toutes les dépendances nécessaires. - Ajoute un dossier
data/pour stocker les données brutes (ex : CSV). - Assure-toi que chaque fichier module contient un rôle clair et unique.
Solution
project/
├── app/
│ ├── __init__.py
│ ├── config.py
│ ├── models.py
│ ├── storage.py
│ ├── pipeline.py
│ ├── server.py
│ └── utils.py
├── data/
│ └── people.csv
├── migrations/ # (optionnel) migrations SQL
├── Dockerfile # (optionnel)
├── docker-compose.yml # (optionnel pour MySQL)
├── requirements.txt
└── README.md
Partie 2 : Fichiers essentiels (exemples)
Consignes
- Crée un fichier
requirements.txtavec les dépendances nécessaires au projet. - Ajoute un fichier CSV dans
data/people.csvcontenant des exemples de données volontairement invalides.
requirements.txt
flask==2.2.5
pandas==2.1.0
sqlalchemy==2.0.19
pymysql==1.0.3
python-dotenv==1.0.0
Vérifie que ton projet peut être installé via :
pip install -r requirements.txt
Exemple de data/people.csv (jeu de données)
id,name,age,email,role
1,Alice,30,alice@example.com,admin
2,Bob,notanumber,bob@example.com,user
3,,22,anon@example.com,user
4,Charlie,17,charlie@example.com,guest
5,David,45,david@example.com,user
6,Eve,130,eve@example.com,user
Partie 3 : Configuration
Consignes
- Crée le fichier
app/config.py. - Mets en place un chargement automatique de variables d’environnement via
dotenv. - Ajoute des variables configurables :
- Mode (
ENV) - Debug (
DEBUG) - Connexion MySQL (host, port, user, password, database)
- Mode (
- Expose une variable unique :
DATABASE_URI. - Teste ton fichier en modifiant tes variables d’environnement.
app/config.py : lecture de la config via env et fichier .env
# app/config.py
import os
from dotenv import load_dotenv
load_dotenv() # charge .env si présent
class Config:
ENV = os.getenv("ENV", "development")
DEBUG = os.getenv("DEBUG", "1") == "1"
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
MYSQL_PORT = int(os.getenv("MYSQL_PORT", 3306))
MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "password")
MYSQL_DB = os.getenv("MYSQL_DB", "example_db")
DATABASE_URI = (
f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}"
)
Partie 4 : Modèles, métaclasse et classe abstraite
Consignes
- Dans
app/models.py, crée :- Une métaclasse
ModelRegistryqui enregistre les classes créées. - Une classe abstraite
BaseModelimposant :to_dict()validate()
- Une métaclasse
- Crée une classe
Personhéritant deBaseModel:- Ajoute des attributs (
id,name,age,email,role). - Implémente une méthode
validate()qui vérifie :idetagesont des ints valides.namen’est pas vide.emailcontient@.
- Ajoute des attributs (
- Vérifie que ta classe apparaît bien dans
ModelRegistry.registry.
Contexte
- On crée une
BaseModelabstraite qui imposeto_dict()etvalidate().- to_dict permet d'avoir un objet safe a retourner en sortie publique (API etc...)
- On crée une métaclasse qui enregistre les classes modèles pour introspection.
- Cela permet de sauvegarder les modèles de données
app/models.py
# app/models.py
from abc import ABC, abstractmethod, ABCMeta
from typing import Dict
# Metaclass simple pour enregistrer modèles
class ModelRegistry(ABCMeta):
registry = {}
def __new__(mcls, name, bases, namespace):
cls = super().__new__(mcls, name, bases, namespace)
if name != "BaseModel":
ModelRegistry.registry[name] = cls
return cls
class BaseModel(ABC, metaclass=ModelRegistry):
"""Interface pour tous les modèles du domaine."""
@abstractmethod
def to_dict(self) -> dict:
raise NotImplementedError
@abstractmethod
def validate(self) -> bool:
raise NotImplementedError
# Exemple de modèle concret
class Person(BaseModel):
def __init__(self, id: int, name: str, age: int, email: str, role: str):
self.id = id
self.name = name
self.age = age
self.email = email
self.role = role
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"age": self.age,
"email": self.email,
"role": self.role,
}
def validate(self) -> bool:
"""Validation basique : id int, name non vide, 0<=age<=120, email contient @"""
if not isinstance(self.id, int):
return False
if not self.name:
return False
if not isinstance(self.age, int) or not (0 <= self.age <= 120):
return False
if "@" not in self.email:
return False
return True
Partie 5 : Décorateurs & closures,pipeline de transformation/validation
Consignes
- Crée dans
app/pipeline.pyplusieurs décorateurs paramétrables :cast_field(field, caster): transforme le type d’un champ.normalize_field(field, func_norm): nettoie un champ string.require_field(field): ignore la ligne si le champ est vide.filter_condition(predicate): ne garde la ligne que si la condition est vraie.
- Implémente ces décorateurs via des closures.
- Crée une fonction finale
process_row_to_person(row):- Empile plusieurs décorateurs.
- Construit un objet
Personsi tous les tests passent. - Retourne
Nonesi une étape échoue.
- Teste ton pipeline avec un jeu de données simple.
Objectif
Construire un pipeline où chaque étape est un décorateur (modification, filtrage, validation). Utiliser closures pour paramétrer les décorateurs.
app/pipeline.py
# app/pipeline.py
from functools import wraps
from typing import Callable, Optional
# décorateur pour caster un champ
def cast_field(field: str, caster: Callable):
def decorator(func):
@wraps(func)
def wrapper(row: dict):
try:
row[field] = caster(row[field])
except Exception:
row[field] = None
return func(row)
return wrapper
return decorator
# décorateur pour normaliser une chaine (closure qui renvoie le décorateur)
def normalize_field(field: str, func_norm: Callable[[str], str]):
def decorator(fn):
@wraps(fn)
def wrapper(row: dict):
v = row.get(field)
if isinstance(v, str):
row[field] = func_norm(v)
return fn(row)
return wrapper
return decorator
# décorateur pour valider un champ obligatoire
def require_field(field: str):
def decorator(fn):
@wraps(fn)
def wrapper(row: dict):
if row.get(field) in ("", None):
return None
return fn(row)
return wrapper
return decorator
# décorateur filtre général
def filter_condition(predicate: Callable[[Dict], bool]):
def decorator(fn):
@wraps(fn)
def wrapper(row: dict):
try:
if not predicate(row):
return None
except Exception:
return None
return fn(row)
return wrapper
return decorator
# pipeline "final" qui construit un Person si tout ok
from .models import Person
@cast_field("id", int)
@cast_field("age", int)
@normalize_field("name", lambda s: s.strip().title())
@require_field("name")
@filter_condition(lambda r: r.get("age") is not None and r["age"] >= 18)
def process_row_to_person(row: dict) -> Optional[Person]:
# row now has typed fields; build model
p = Person(
id=row["id"],
name=row["name"],
age=row["age"],
email=row.get("email", ""),
role=row.get("role", "user"),
)
if not p.validate():
return None
return p
Partie 6 : Utils: lecture CSV et fichier de config
Consignes
- Dans
app/utils.py, crée :- Une fonction
read_csv(path)qui renvoie une liste de dicts. - Une fonction
write_csv(path, rows, fieldnames)qui enregistre une liste de dicts.
- Une fonction
- Charge le fichier CSV
data/people.csv. - Passe chaque ligne dans
process_row_to_person. - Vérifie que certaines lignes sont invalides et correctement filtrées.
app/utils.py
# app/utils.py
import csv
from typing import Callable, Optional
from pathlib import Path
def read_csv(path: str) -> list[dict]:
with open(path, newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
return [dict(row) for row in reader]
def write_csv(path: str, rows: list[dict], fieldnames):
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in rows:
writer.writerow(r)
Partie 7 : Stockage (SQLAlchemy) app/storage.py
Consignes
- Dans
app/storage.py, crée une classeStorage:- Initialise un moteur SQLAlchemy à partir de
Config.DATABASE_URI. - Crée une table SQL
person.
- Initialise un moteur SQLAlchemy à partir de
- Crée un DTO (objet de transfert de données)
PersonDTOmappé à la table via le registry SQLAlchemy. - Ajoute une méthode
save_person(person)pour faire un upsert simple. - Ajoute une méthode optionnelle
fetch_all_persons(). - Teste :
- Insertion d’un enregistrement valide.
- Récupération des données depuis la base.
Objectif
Sauvegarder les Person dans MySQL via SQLAlchemy, en utilisant l'URI de config.Config.
app/storage.py
# app/storage.py
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table
from sqlalchemy.orm import registry, Session
from .config import Config
mapper_registry = registry()
metadata = MetaData()
person_table = Table(
"person",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(255)),
Column("age", Integer),
Column("email", String(255)),
Column("role", String(50)),
)
# simple mapper object
class PersonDTO:
def __init__(self, id, name, age, email, role):
self.id = id
self.name = name
self.age = age
self.email = email
self.role = role
mapper_registry.map_imperatively(PersonDTO, person_table)
class Storage:
def __init__(self, uri: str = None):
uri = uri or Config.DATABASE_URI
self.engine = create_engine(uri, echo=False, future=True)
metadata.create_all(self.engine)
def save_person(self, person):
with Session(self.engine) as session:
dto = PersonDTO(person.id, person.name, person.age, person.email, person.role)
session.merge(dto) # merge pour upsert basique
session.commit()
Partie 8 : Serveur web (Flask) app/server.py
Consignes
- Dans
app/server.py, crée une application Flask aveccreate_app. - Ajoute une route POST
/import:- Reçoit un JSON contenant
{ "path": "<fichier.csv>" }. - Lit le CSV.
- Passe chaque ligne dans le pipeline.
- Sauvegarde les
Personvalides. - Retourne la liste des personnes importées.
- Reçoit un JSON contenant
- Ajoute une route GET
/persons:- Retourne le contenu de la table SQL.
- Vérifie le comportement avec des données invalides.
Démarre ton serveur et teste avec :
curl -X POST -H "Content-Type: application/json" \
-d '{"path": "data/people.csv"}' \
http://localhost:5000/import
Objectif
Exposer une API minimaliste :
- POST /import : lit CSV et importe valid rows
- GET /persons : retourne personnes importées (depuis DB ou mémoire selon implémentation)
app/server.py
# app/server.py
from flask import Flask, request, jsonify
from .utils import read_csv
from .pipeline import process_row_to_person
from .storage import Storage
def create_app(storage: Storage = None):
app = Flask(__name__)
storage = storage or Storage()
@app.route("/import", methods=["POST"])
def import_csv():
data_path = request.json.get("path")
if not data_path:
return jsonify({"error": "path required"}), 400
rows = read_csv(data_path)
imported = []
for row in rows:
person = process_row_to_person(row)
if person is not None:
storage.save_person(person)
imported.append(person.to_dict())
return jsonify({"imported": imported}), 200
@app.route("/persons", methods=["GET"])
def list_persons():
# Pour simplicité, faites une requête directe SQL (ou ajout d'un method fetch_all)
with storage.engine.connect() as conn:
result = conn.execute("SELECT id,name,age,email,role FROM person")
persons = [dict(row._mapping) for row in result]
return jsonify(persons)
return app
# pour lancer localement
if __name__ == "__main__":
app = create_app()
app.run(host="0.0.0.0", port=5000, debug=True)
Partie 9 : Docker & MySQL (optionnel)
Consignes
- Crée un
Dockerfilepour packager ton application. - Crée un
docker-compose.ymlcontenant :- Un service MySQL.
- Un service
webqui dépend de MySQL.
- Configure les variables d’environnement dans
docker-compose.yml. - Teste ensuite les routes via l’API avec les deux services en fonctionnement.
Lance :
docker compose up --build
Dockerfile pour l'app
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "-m", "app.server"]
docker-compose.yml (optionnel)
version: "3.8"
services:
db:
image: mysql:8.0
restart: always
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: example_db
ports:
- "3306:3306"
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5
web:
build: .
depends_on:
db:
condition: service_healthy
environment:
MYSQL_HOST: db
MYSQL_USER: root
MYSQL_PASSWORD: password
MYSQL_DB: example_db
ports:
- "5000:5000"
Partie 10 : Tests rapides & exécution
Consignes
- Crée un dossier
tests/. - Ajoute un test pour :
process_row_to_person()(cas valides et invalides).Storage.save_person().
- Ajoute un test d’intégration sur
/import. - (Optionnel) Configure GitHub Actions pour automatiser les tests.
Lancer sans Docker
- Installer dépendances :
python -m pip install -r requirements.txt
- Créer une base MySQL locale ou modifier
Config.DATABASE_URIpour utiliser SQLite temporairement :
# dans app/config.py pour dev rapide
DATABASE_URI = "sqlite:///dev.db"
- Lancer le serveur :
python -m app.server
- Importer via curl :
curl -X POST -H "Content-Type: application/json" -d '{"path":"data/people.csv"}' http://localhost:5000/import
- Consulter personnes :
curl http://localhost:5000/persons
Lancer avec Docker Compose (optionnel)
docker compose up --build
# puis la même requête POST vers le service web http://localhost:5000/import
Partie 11 : Exercice guidé (à faire)
Étapes demandées
- Séparer les responsabilités : déplacer la logique SQL dans
storage.py(méthodefetch_all_persons) et ne pas exécuter SQL brut dansserver.py. - Logger : ajouter logging structuré partout (
logging). - Tests unitaires : écrire pytest pour
pipeline.process_row_to_personavec cas valides/invalides. - Sécurité : échappez/validez les champs (déjà partiellement fait).
- Pagination : implémenter pagination pour
/persons. - Métriques : exposer le nombre de lignes importées et le nombre rejeté.
Partie 12 : Explications
Métaclasse
ModelRegistrypermet d'introspecter toutes les classes modèles créées (utile pour migrations, sérialisation automatique).
Classe abstraite
BaseModelforce l'implémentation des méthodes essentielles (to_dict,validate) pour homogénéité.
Décorateurs & closures
- Les décorateurs
cast_field,normalize_field,filter_conditionsont paramétrables grâce aux closures et composables (empilement@).
Architecture
config.py= configurationmodels.py= règles métier (validation)pipeline.py= transformation/validation (stateless)storage.py= accès à la baseserver.py= interface HTTP
Solutions / snippets utiles supplémentaires
storage.fetch_all_persons() (amélioration)
# dans app/storage.py, ajouter :
from sqlalchemy import select
class Storage:
# ... init ...
def fetch_all_persons(self, limit=100, offset=0):
stmt = select(person_table).limit(limit).offset(offset)
with self.engine.connect() as conn:
result = conn.execute(stmt)
return [dict(r._mapping) for r in result]
Exemple de test pour pipeline (pytest)
# tests/test_pipeline.py
from app.pipeline import process_row_to_person
def test_valid_row():
row = {"id": "10", "name": " john doe ", "age": "25", "email": "j@example.com", "role": "user"}
p = process_row_to_person(row)
assert p is not None
assert p.name == "John Doe"
assert p.age == 25
def test_invalid_age():
row = {"id":"11","name":"Ann","age":"notanint","email":"a@b.com","role":"user"}
p = process_row_to_person(row)
assert p is None
Conseils & bonnes pratiques
- Séparez logique métier et persistence (Repository pattern).
- Validez tôt : cast/validate en entrée.
- Utilisez env vars pour secrets (ne pas committer
.env). - Faites des migrations (Alembic) pour la base SQL si projet évolue.
- Ajoutez des métriques et logs pour production.
- Tester votre pipeline avec divers cas (limites, valeurs manquantes, injections).