
InfoFiltre : Pipeline ETL haute fréquence pour la détection de fake news financières
Jun 2026Création d'un pipeline d'ingestion en continu et retraitement batch pour analyser la fiabilité des flux d'actualités financiers via Machine Learning et Fact-Checking.
Contexte et Objectif
Dans le cadre de ma formation en ingénierie des données, j’ai été amené à travailler sur un projet vraiment cool qui exploite une architecture hybride (ingestion rapide d’un côté, consolidation lourde de l’autre).
L’objectif: Alimenter un système d’aide à la décision pour anticiper l’évolution des marchés financiers. Le besoin strict était de fournir un flux d’actualités structuré en temps réel, capable d’isoler la vérité des “fake news”. Le système devait intégrer la donnée en continu, tout en étant capable de recalculer et vérifier toute la fiabilité de l’historique toutes les 6 heures.
Contraintes du Cahier des Charges
Avant de rentrer dans le vif du sujet sur les étapes de développement, il est important de préciser le cadre de ce projet académique:
- Temps de développemen:t Nous disposions d’un délai strict de 14 heures pour livrer un MVP fonctionnel.
- Temps réel et Batch: Création et vérification des informations en temps réel, avec un retraitement global obligatoire toutes les 6 heures.
- Structure de la donnée: Les informations extraites devaient obligatoirement contenir : un titre, un résumé, une date d’occurrence de l’événement, et une date de publication.
- Automatisation: La vérification de l’information (Fact-checking) devait être entièrement automatisée.
Bien que le délai fut court pour implémenter des concepts de Data Engineering plus poussés, l’objectif du cahier des charges a été atteint. J’envisage d’ailleurs de continuer à faire évoluer ce projet pour le rendre encore plus robuste.
Ma prise de recul : L’illusion du “Temps Réel” et le choix de l’ETL
C’est en rédigeant cette documentation et en prenant du recul sur mon travail que je me suis posé une question fondamentale : a-t-on vraiment fait du temps réel comme mentionné dans le cahier des charges ?
La réponse est non.
Souvent, le cahier des charges ou les équipes métier demandent du temps réel parce qu’ils veulent simplement voir les mises à jour le plus vite possible dès qu’il y a un changement. Mais techniquement, le vrai temps réel (Streaming) implique une infrastructure événementielle lourde (comme Kafka) qui réagit à la milliseconde.
Ici, ce n’est pas le cas. J’ai conçu un Batch à haute fréquence (toutes les 15 minutes). Ça donne l’illusion du temps réel pour l’utilisateur final, mais l’architecture reste fondamentalement basée sur des lots. Avoir cet esprit critique m’a permis d’utiliser les bons termes techniques et de comprendre exactement ce que j’avais construit. Donc, pour être précis, il s’agit d’un pipeline ETL à haute fréquence et non d’un pipeline de en temps réel et batch. C’est vrai que dans certaines partie de la documentation, j’ai utilisé le terme “temps réel” pour simplifier la compréhension, mais techniquement, c’est un pipeline ETL à haute fréquence.
ETL ou ELT ? Dans cette même logique de recul, je me suis posé la question de notre pattern d’intégration. J’ai réalisé que nous étions sur un pur modèle ETL (Extract, Transform, Load). Pourquoi ? Parce que toute la transformation (nettoyage du code HTML, standardisation des dates via Pandas, et même l’évaluation par le modèle Machine Learning) est effectuée en mémoire par Python avant d’être chargée dans la base de données. Si nous j’avais fait de l’ELT, j’aurais chargé le HTML brut dans la base et utilisé SQL pour le nettoyer, ce qui n’était pas adapté ici.
Ma façon de procéder (Étape par étape)
Pour ce pipeline, j’ai essayé d’être le plus structuré possible. Voici comment j’ai découpé mon travail :
1. Phase de Découverte (Audit des sources)
J’ai commencé par auditer mes sources de données. J’ai ciblé plusieurs médias (Le Monde, Le Figaro, Les Échos) et des sites satiriques (Le Gorafi) pour tester la détection.
Avant de concevoir le pipeline, j’ai dû répondre à plusieurs questions afin de mieux comprendre ma source de données et ses caractéristiques :
- Quelles sont les caractéristiques les plus essentielles de notre source de données ? Réponse : Il s’agissait de données non structurées : du flux RSS (XML) et du code HTML brut nécessitant du Web Scraping.
- Comment les données sont-elles stockées dans le système source et pendant combien de temps ? Réponse : Dans le système source (les serveurs du journal), les données sont probablement dans une base relationnelle lourde, mais elles nous sont exposées via un fichier XML généré dynamiquement. La durée de stockage n’est pas connue, mais les flux RSS ne contiennent que les dernières actualités (environ 20 articles par flux).
- Quel est le niveau de consistance et quels sont les champs affectés par ces inconsistances ? Réponse : Les données peuvent présenter des inconsistances en termes de formatage, de contenu ou de disponibilité (être vide = null), ce qui affecte les champs comme la date et la description.
- À quelle fréquence les erreurs de données se produisent-elles ? Réponse : Selon mes recherches, Les erreurs de structure stricte (XML cassé) sont très rares car les flux RSS sont générés automatiquement par des CMS. Bon en revanche, les erreurs de qualité (textes tronqués par l’éditeur, résumés manquants) sont fréquentes et quotidiennes surtout sur les sites satiriques (ex: Le Gorafi).
- Devons-nous joindre plusieurs tables/sources pour obtenir une vue d’ensemble complète ? Réponse : NON. je fais juste une opération d’Union (concaténation) des listes d’articles provenant des différents sites pour former un DataFrame Pandas unique avant l’insertion en base de données.
- Si le schéma change (ajout d’une nouvelle colonne), comment allons-nous gérer cela ? Réponse : L’ajout d’un nouveau champ inconnu dans le flux RSS source sera silencieusement ignoré par notre code vu que ce cette colone(ex: “author”) n’est pas nécessaire pour notre pipeline. Si un champ requis disparaît, le pipeline affecte tout simplement la valeur null cependant cet article sera pas ajouter en base de données (car il ne respecte pas la contrainte NOT NULL sur ce champ) pour en savoir plus lien.
- À quelle fréquence les données seront extraites ? Réponse : Les données sont extraites toutes les 15 minutes (stratégie de Batch Haute Fréquence).
- Quelles sont les limites de notre source de données ? Aucun historique : Il est impossible de récupérer les articles du mois dernier via le flux RSS. Troncature du texte : Le RSS ne fournit souvent qu’un résumé d’une phrase. Le corps complet de l’article n’est pas disponible sans devoir scraper agressivement la page HTML de l’article (ce que nous avons évité pour ce MVP). Pas de notification Push : La source ne nous prévient pas quand un article sort, c’est à nous d’aller vérifier (Pull) continuellement, d’où la nécessité de la planification (Schedule).
2. Extraction (toutes les 15 minutes)
L’extraction des données a été réalisée via un script Python utilisant les bibliothèques requests et BeautifulSoup4. Le script se connecte aux flux RSS des différents médias, télécharge le contenu XML, et parse les éléments pour extraire les champs nécessaires (Titre, Résumé, Date d’événement, Date de publication). rss_scapper.py
3. Structuration et nettoyage (toutes les 15 minutes)
Au niveau de la structuration, l’objectif était d’isoler précisément les champs requis (Titre, Résumé, Date d’événement, Date de publication).
- Nettoyage (Parsing): Supprimer toutes les balises HTML, les scripts et les blocs de publicité pour ne conserver que le texte brut.
- Standardisation temporelle: Formatage des dates au format ISO pour faciliter le stockage et la manipulation en base de données.
- Troncature (Truncate): Limitation du nombre de mots pour le résumé (fixé à 100 mots maximum) afin d’optimiser le traitement par le modèle d’Intelligence Artificielle. cleaner.py
4. Enrichissement (toutes les 15 minutes via le modèle d’IA)
Une fois le texte propre extrait lors du cycle de 15 minutes, je l’envoie (titre + résumé) à un modèle de Machine Learning local conteneurisé basé sur DistilBERT. En gros, je lui fais une requête HTTP POST, et il me renvoie une probabilité (score de fiabilité) qui est ajoutée dans une nouvelle colonne ml_prediction au moment de l’insertion en base.
5. Validation et publication (Retraitement batch: toutes les 6h)
C’est la phase la plus intéressante de l’architecture. Toutes les 6 heures, le pipeline effectue un retraitement complet:
Extraction de l’historique: Requête sur la base analytique (DuckDB) pour récupérer toutes les actualités ingérées.
Croisement Fact-Check: Il va scrapper le site de l’AFP (Agence France-Presse) pour voir quels articles ont été officiellement certifiés Vrais ou Faux.
Mise à jour (Match et UPDATE):
- Si une correspondance est trouvée : Le pipeline met à jour la ligne dans DuckDB (UPDATE), écrase la prédiction probabiliste du ML, et applique un tag définitif “Vrai” ou “Faux - Certifié AFP”.
- Si aucune correspondance n’est trouvée : Le score ML initial est conservé.
Note importante sur la stratégie de traitement en lot (Batch)
Lors de la conception du pipeline, une réflexion majeure s’est posée sur la stratégie de retraitement des données lors du Batch de fact-checking toutes les 6 heures. J’ai identifié un risque et si cette étape n’est pas optimisée, ce qui m’a poussé à faire des recherches sur la stratégie du Delta Load (Traitement Incrémental).
L’Observation: Le piège du full load Imaginons que le pipeline tourne en continu pendant une semaine. Le flux temps réel injecte de nouvelles données toutes les 15 minutes. Si mon batch de 6 heures devait relire et retraiter l’ensemble de la base de données depuis le début (Full Load), le volume à traiter deviendrait exponentiel : 12 articles, puis 24, puis 500, puis 2000… Au-delà du temps de calcul qui exploserait, cette approche qui marche bien sur papier engendrerait des coûts financiers colossaux si j’utilisais une API payante (requêter 2000 fois la même information à chaque cycle).
La solution actuelle: Le flag d’état Pour le périmètre de ce MVP, j’ai contourné ce problème en implémentant un système d’état dans la base de données. Mon script SQL utilise un filtre strict (WHERE is_fact_checked = FALSE). Ainsi, dès qu’un article est traité, son statut passe à TRUE. Le prochain batch ignorera tout l’historique et ne se concentrera que sur le “Delta” (les données strictement nouvelles). fact_checker.py
Perspective d’évolution: Le High-Water Mark (Par Date) Pour une version de production plus robuste, j’ai conscience que se baser sur un simple booléen (vrai/faux) comporte des risques (difficulté de rejouer des données en cas de crash, désynchronisation). L’évolution architecturale prévue est d’implémenter un traitement par High-Water Mark basé sur les timestamps. Au lieu de vérifier un statut “faux”, le batch de 6 heures enregistrera la date exacte de sa dernière exécution, et ne sélectionnera que les articles dont la date d’ingestion (created_at) est strictement supérieure à cette dernière exécution. C’est la méthode standard pour garantir qu’aucune donnée n’est traitée deux fois, tout en gardant une base de données immuable.
Mes choix techniques et pourquoi je les ai faits
Pour tenir le délai de 14h, j’ai dû choisir des outils rapides à mettre en place.
Langage et gestion de l’environnement
- Langage: Python (Permet de tout centraliser : scraping, requêtes API, nettoyage, ML).
- Gestionnaire de dépendances: uv. Ça remplace pip, mais c’est écrit en Rust donc c’est incroyablement plus rapide pour installer les dépendances.
uv init
uv pip install requests beautifulsoup4 pandas duckdb schedule
Extraction et ingestion
- Scraping:
requestspour télécharger la page, etBeautifulSoup4pour fouiller dans le code XML/HTML. C’est léger et ça fait le job sans avoir besoin de monter des usines à gaz.
import requests
from bs4 import BeautifulSoup
reponse = requests.get("https://www.lemonde.fr/rss/une.xml")
soup = BeautifulSoup(reponse.text, 'xml')
titres = [item.title.text for item in soup.find_all('item')]
Traitement et enrichissement
- Manipulation: pandas (Standardisation des dates en une ligne et le nettoyage rapide).
- Communication ML: requests en mode POST vers l’API Docker locale.
import pandas as pd
df = pd.DataFrame(donnees_scrapees)
# Standardisation ISO des dates en une ligne
df['event_date'] = pd.to_datetime(df['event_date']).dt.strftime('%Y-%m-%d %H:%M:%S')
# Appel au modèle ML
payload = {"text": "Titre et résumé de l'article"}
reponse_ml = requests.post("http://localhost:5001/detect_json", json=payload)
Stockage des données
- Base de données: J’ai choisi DuckDB. C’est parfait pour un MVP : pas besoin d’installer de gros serveurs, c’est juste un fichier local, et ça permet d’insérer des DataFrames Pandas entiers directement en SQL
-- Modèle de données (DDL: Data Definition Language)
CREATE TABLE IF NOT EXISTS articles (
source_name VARCHAR NOT NULL, -- Bloque si la source est vide
title VARCHAR NOT NULL, -- Bloque si le titre est vide
summary VARCHAR, -- Autorise un résumé vide
event_date TIMESTAMP,
publication_date TIMESTAMP,
is_satire BOOLEAN,
ml_prediction VARCHAR,
is_fact_checked BOOLEAN DEFAULT FALSE,
final_status VARCHAR DEFAULT NULL,
UNIQUE(source_name, title); -- Gère la déduplication (Idempotence)
Orchestration
- Planificateur: Mettre en place Airflow ou Dagster aurait pris trop de temps pour ce MVP. J’ai utilisé la petite librairie schedule en pur Python. C’est ultra lisible et simple.
import schedule
import time
from ingestion import run_realtime_ingestion
from validation import run_validation_batch
# Planification du micro-batch (15min) et du batch de validation (6h)
schedule.every(15).minutes.do(run_realtime_ingestion)
schedule.every(6).hours.do(run_validation_batch)
# Boucle infinie qui maintient le programme en vie
try:
while True:
schedule.run_pending()
time.sleep(1) # Pause d'une seconde pour ne pas surcharger le processeur
except KeyboardInterrupt:
print("\nArrêt manuel de l'orchestrateur.")
Stratégie de configuration des sources (Pilotage par Configuration)
Au lieu de mettre les liens RSS (Le Monde, Le Gorafi) directement dans mon code Python, je les ai mis dans un fichier sources.json. Pourquoi ? Parce que si une URL change, je modifie le JSON en 2 secondes sans toucher au code. Ça permet aussi à quelqu’un qui ne sait pas coder (comme un analyste métier) d’ajouter lui-même une nouvelle source d’info très facilement. C’est ce qu’on appelle le Pilotage par configuration.
Exemple du fichier config/sources.json:
{
"news_sources": [
{
"name": "Le Monde",
"url": "https://www.lemonde.fr/rss/une.xml",
"type": "general",
"is_satire": false
},
{
"name": "Le Figaro Economie",
"url": "https://www.lefigaro.fr/rss/figaro_economie.xml",
"type": "finance",
"is_satire": false
},
{
"name": "Le Gorafi",
"url": "https://www.legorafi.fr/feed/",
"type": "satire",
"is_satire": true
}
]
}
Stratégie des appels API et gestion des pannes
Dans mon pipeline, je n’ai pas mis de système de Retry complexe. Si le flux du “Monde” plante ou ne répond pas à 10h00, mon script affiche une erreur et passe direct au “Figaro”. L’idée c’est de ne jamais bloquer le pipeline en attendant une source morte. Au pire, le script récupérera l’info au prochain tour 15 minutes plus tard ! De plus, Pour éviter de polluer la base de données avec des doublons, le pipeline s’appuie sur une conception Idempotente. Le travail de tri n’est pas fait par le scraper, mais par la base de données DuckDB. Lors de la création de la table, j’ai défini une contrainte UNIQUE(source_name, title)
Structure du projet et exécution
info-filtre/
├── data/ # Stockage local
│ └── pipeline.db # Fichier DuckDB
├── src/
│ ├── __init__.py
│ ├── extract/ # Appels HTTP avec 'requests' et 'BeautifulSoup'
│ │ └── rss_scraper.py
│ ├── transform/ # Nettoyage Pandas et requêtes API vers le modèle ML
│ │ └── cleaner.py
│ ├── load/ # Fonctions d'insertion/extraction
│ │ └── duckdb_client.py
│ ├── validation/ # Logique métier du batch (Croisement Fact Check AFP)
│ │ └── fact_checker.py
│ └── main_pipeline.py # Orchestrateur utilisant 'schedule'
├── config/
│ └── sources.json # Liste des URL (Le Monde, Les Echos, etc.)
├── tests/ # Dossier pour les tests unitaires
├── .env.example # Template des variables d'environnement
├── pyproject.toml # Dépendances (généré par 'uv')
└── README.md
Lancement du conteneur ML (DistilBERT)
docker pull josumsc/flask-fake-news
docker run -d -p 5001:5000 josumsc/flask-fake-news
Lancement du pipeline
uv run python src/main_pipeline.py
Vérification et qualité des données (DBeaver)
Pour valider l’ingestion, le retraitement et les mises à jour (Updates) de DuckDB, j’utilise l’outil de gestion de base de données DBeaver. Dbeaver me permet d’exécuter des requêtes SQL analytiques directement sur le fichier local.
-- Vérification de l'analyse de fact-checking croisée avec l'AFP
SELECT
source_name,
title,
final_status
FROM articles
WHERE is_fact_checked = TRUE;
Ce que je veux améliorer pour la V2
Bien que le MVP soit fonctionnel, j’ai identifié plusieurs axes d’amélioration critiques afin rendre ce projet plus robuste:
- 1. Passer au High-Water Mark (Delta Load robuste): Remplacer le système actuel de “Flag” booléen par un suivi précis des dates d’ingestion pour fiabiliser le retraitement incrémental en cas de panne.
- 2. L’API de Google: Remplacer mon scraping de l’AFP (qui risque de casser si leur site web change) par l’API officielle “Google Fact Check”.
- 3. De vrais tests unitaires: Utiliser pytest pour m’assurer mathématiquement que mes fonctions marchent (ex: vérifier que je coupe exactement à 100 mots) et automatiser ça avec GitHub Actions
- 4. Passer à la vitesse supérieure: Migrer sur un vrai orchestrateur comme Airflow ou Dagster pour paralléliser mes appels API ou implémenter du threading pour paralleliser le scraping.
- 5. Dashboard: Créer une belle interface avec Streamlit pour que les utilisateurs puissent voir les résultats.