From fedb0b3c7a4ccdb1fc30315d4a83e56c977db13b Mon Sep 17 00:00:00 2001 From: jfontaine35 <76435797+jfontaine35@users.noreply.github.com> Date: Wed, 19 Nov 2025 14:30:46 +0100 Subject: [PATCH] Optimize CSV to XML generation and validation Refactor CSV to XML generator with improved structure and validation. --- convert_csv-xml.py | 502 ++++++++++++++++++++++++++++++++------------- 1 file changed, 356 insertions(+), 146 deletions(-) diff --git a/convert_csv-xml.py b/convert_csv-xml.py index c8155a3..fe821bc 100644 --- a/convert_csv-xml.py +++ b/convert_csv-xml.py @@ -1,159 +1,369 @@ +""" +Générateur XML pour certifications CPF - Version Optimisée +Génère un fichier XML conforme au schéma urn:cdc:cpf:pc5:schema:1.0.0 +""" + import xml.etree.ElementTree as ET +from xml.dom import minidom import csv -import random import logging import datetime -import subprocess import pytz -import os +import os +from pathlib import Path +from typing import Optional, List, Dict +import sys -# Emplacement du fichier CSV et XML -csv_file = "Dico2.csv" -xml_file = "file.xml" -tz_paris = pytz.timezone('Europe/Paris') - -#Efface la console -def clear_console(): - os.system('cls' if os.name == 'nt' else 'clear') -clear_console() +# Configuration +CSV_FILE = "Data.csv" +XML_FILE = "file.xml" +XSD_FILE = "validation.xsd" +TZ_PARIS = pytz.timezone('Europe/Paris') # Configuration du logging -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('xml_generation.log', encoding='utf-8'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) -def indent(elem, level=0): - """ Ajoute des indentations aux éléments pour une meilleure lisibilité du fichier XML. """ - i = "\n" + level * " " - if len(elem): - if not elem.text or not elem.text.strip(): - elem.text = i + " " - if not elem.tail or not elem.tail.strip(): - elem.tail = i - for elem in elem: - indent(elem, level + 1) - if not elem.tail or not elem.tail.strip(): - elem.tail = i + +class XMLValidator: + """Classe pour valider le XML généré contre le XSD""" + + @staticmethod + def validate_xml(xml_file: str, xsd_file: str) -> bool: + """Valide un fichier XML contre un schéma XSD""" + try: + from lxml import etree + + # Charger le schéma XSD + with open(xsd_file, 'rb') as f: + schema_root = etree.XML(f.read()) + schema = etree.XMLSchema(schema_root) + + # Charger le fichier XML + with open(xml_file, 'rb') as f: + xml_doc = etree.parse(f) + + # Valider + is_valid = schema.validate(xml_doc) + + if not is_valid: + logger.error("Erreurs de validation XSD:") + for error in schema.error_log: + logger.error(f" Ligne {error.line}: {error.message}") + return False + + logger.info("✓ Le fichier XML est valide selon le schéma XSD") + return True + + except ImportError: + logger.warning("Module lxml non disponible. Installation recommandée: pip install lxml") + return None + except Exception as e: + logger.error(f"Erreur lors de la validation: {e}") + return False + + +class CPFXMLGenerator: + """Générateur de fichiers XML pour les certifications CPF""" + + def __init__(self, csv_filepath: str, xml_filepath: str): + self.csv_filepath = Path(csv_filepath) + self.xml_filepath = Path(xml_filepath) + self.namespace = "urn:cdc:cpf:pc5:schema:1.0.0" + self.xsi_namespace = "http://www.w3.org/2001/XMLSchema-instance" + + def validate_csv_structure(self, rows: List[List[str]]) -> bool: + """Valide la structure du fichier CSV""" + if len(rows) < 6: + logger.error("Le fichier CSV doit contenir au moins 6 lignes") + return False + + # Vérifier que la ligne d'en-tête existe + if not rows[5] or len(rows[5]) < 25: + logger.error("La ligne d'en-tête (ligne 6) est incomplète") + return False + + return True + + def clean_string(self, value: str, max_length: Optional[int] = None) -> str: + """Nettoie et tronque une chaîne si nécessaire""" + cleaned = value.strip() + if max_length and len(cleaned) > max_length: + logger.warning(f"Valeur tronquée: '{cleaned}' -> '{cleaned[:max_length]}'") + cleaned = cleaned[:max_length] + return cleaned + + def format_date(self, date_str: str) -> str: + """ + Convertit une date du format DD/MM/YYYY vers YYYY-MM-DD + Gère aussi les dates déjà au bon format + """ + date_str = date_str.strip() + + # Si déjà au format ISO (YYYY-MM-DD) + if len(date_str) == 10 and date_str[4] == '-' and date_str[7] == '-': + return date_str + + # Format DD/MM/YYYY + if '/' in date_str: + parts = date_str.split('/') + if len(parts) == 3 and len(parts[2]) == 4: + return f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}" + + logger.warning(f"Format de date non reconnu: {date_str}") + return date_str + + def get_current_timestamp(self) -> str: + """Génère un horodatage au format requis par le XSD""" + now = datetime.datetime.now(TZ_PARIS) + return now.strftime("%Y-%m-%dT%H:%M:%S+01:00") + + def create_element(self, parent: ET.Element, tag: str, text: Optional[str] = None, + attrib: Optional[Dict] = None) -> ET.Element: + """Crée un élément XML avec le namespace cpf""" + full_tag = f"{{{self.namespace}}}{tag}" + elem = ET.SubElement(parent, full_tag, attrib or {}) + if text is not None: + elem.text = str(text) + return elem + + def add_titulaire(self, parent: ET.Element, row: List[str]) -> None: + """Ajoute les informations du titulaire""" + titulaire = self.create_element(parent, "titulaire") + + # Données obligatoires + self.create_element(titulaire, "nomNaissance", self.clean_string(row[17], 60)) + + # nomUsage (nillable) + if row[18].strip() and row[18].strip().lower() != 'nil': + self.create_element(titulaire, "nomUsage", self.clean_string(row[18], 60)) + else: + self.create_element(titulaire, "nomUsage", attrib={f"{{{self.xsi_namespace}}}nil": "true"}) + + self.create_element(titulaire, "prenom1", self.clean_string(row[19], 60)) + self.create_element(titulaire, "anneeNaissance", self.clean_string(row[20])) + + # moisNaissance (nillable) + if row[21].strip() and row[21].strip().lower() != 'nil': + self.create_element(titulaire, "moisNaissance", self.clean_string(row[21])) + else: + self.create_element(titulaire, "moisNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"}) + + # jourNaissance (nillable) + if row[22].strip() and row[22].strip().lower() != 'nil': + self.create_element(titulaire, "jourNaissance", self.clean_string(row[22])) + else: + self.create_element(titulaire, "jourNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"}) + + self.create_element(titulaire, "sexe", self.clean_string(row[23])) + + # Code commune naissance + code_commune = self.create_element(titulaire, "codeCommuneNaissance") + code_postal_elem = self.create_element(code_commune, "codePostalNaissance") + + if row[24].strip() and row[24].strip().lower() != 'nil': + self.create_element(code_postal_elem, "codePostal", self.clean_string(row[24], 9)) + else: + self.create_element(code_postal_elem, "codePostal", attrib={f"{{{self.xsi_namespace}}}nil": "true"}) + + def add_passage_certification(self, parent: ET.Element, row: List[str]) -> None: + """Ajoute un passage de certification""" + passage = self.create_element(parent, "passageCertification") + + # Données obligatoires + self.create_element(passage, "idTechnique", self.clean_string(row[7], 255)) + self.create_element(passage, "obtentionCertification", row[8].upper()) + self.create_element(passage, "donneeCertifiee", row[9].lower()) + + # Dates + self.create_element(passage, "dateDebutValidite", self.format_date(row[10])) + + # dateFinValidite (nillable) + if row[11].strip().lower() == 'nil': + self.create_element(passage, "dateFinValidite", attrib={f"{{{self.xsi_namespace}}}nil": "true"}) + else: + self.create_element(passage, "dateFinValidite", self.format_date(row[11])) + + # Niveaux européens + self.create_element(passage, "presenceNiveauLangueEuro", row[12].lower()) + self.create_element(passage, "presenceNiveauNumeriqueEuro", row[13].lower()) + + # Scoring (nillable) + if row[14].strip().lower() == 'nil': + self.create_element(passage, "scoring", attrib={f"{{{self.xsi_namespace}}}nil": "true"}) + else: + self.create_element(passage, "scoring", self.clean_string(row[14], 255)) + + # Mention validée (nillable) + if row[15].strip().lower() == 'nil': + self.create_element(passage, "mentionValidee", attrib={f"{{{self.xsi_namespace}}}nil": "true"}) + else: + self.create_element(passage, "mentionValidee", row[15].upper()) + + # Modalités inscription + modalites = self.create_element(passage, "modalitesInscription") + self.create_element(modalites, "modaliteAcces", row[16].upper()) + + # Identification titulaire + identification = self.create_element(passage, "identificationTitulaire") + self.add_titulaire(identification, row) + + def generate_xml(self) -> bool: + """Génère le fichier XML à partir du CSV""" + try: + logger.info(f"Lecture du fichier CSV: {self.csv_filepath}") + + # Vérifier l'existence du fichier + if not self.csv_filepath.exists(): + logger.error(f"Fichier CSV introuvable: {self.csv_filepath}") + return False + + # Lire le CSV + with open(self.csv_filepath, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + rows = list(reader) + + # Valider la structure + if not self.validate_csv_structure(rows): + return False + + logger.info("Création de la structure XML...") + + # Créer l'élément racine + ET.register_namespace('cpf', self.namespace) + ET.register_namespace('xsi', self.xsi_namespace) + + root = ET.Element( + f"{{{self.namespace}}}flux", + attrib={ + f"{{{self.xsi_namespace}}}schemaLocation": + f"{self.namespace} validation.xsd" + } + ) + + # En-tête + headers = rows[5] + self.create_element(root, "idFlux", self.clean_string(headers[2], 50)) + self.create_element(root, "horodatage", self.get_current_timestamp()) + + # Émetteur + emetteur = self.create_element(root, "emetteur") + self.create_element(emetteur, "idClient", self.clean_string(headers[3], 8)) + + # Certificateurs + certificateurs = self.create_element(emetteur, "certificateurs") + certificateur = self.create_element(certificateurs, "certificateur") + + self.create_element(certificateur, "idClient", self.clean_string(headers[4], 8)) + self.create_element(certificateur, "idContrat", self.clean_string(headers[5], 20)) + + # Certifications + certifications = self.create_element(certificateur, "certifications") + certification = self.create_element(certifications, "certification") + + self.create_element(certification, "type", self.clean_string(headers[6], 255)) + self.create_element(certification, "code", self.clean_string(headers[7], 100)) + + # Passages de certification + passages = self.create_element(certification, "passageCertifications") + + # Traiter chaque ligne de données + data_rows = [row for row in rows[6:] if row and row[0].strip()] + logger.info(f"Traitement de {len(data_rows)} certifications...") + + for idx, row in enumerate(data_rows, start=1): + try: + logger.info(f" [{idx}/{len(data_rows)}] Ajout: {row[19]} {row[17]}") + self.add_passage_certification(passages, row) + except Exception as e: + logger.error(f"Erreur ligne {idx + 6}: {e}") + continue + + # Formater et sauvegarder + logger.info(f"Écriture du fichier XML: {self.xml_filepath}") + xml_str = ET.tostring(root, encoding='utf-8') + + # Indentation avec minidom + dom = minidom.parseString(xml_str) + pretty_xml = dom.toprettyxml(indent=" ", encoding='utf-8') + + # Supprimer les lignes vides + lines = [line for line in pretty_xml.decode('utf-8').split('\n') if line.strip()] + + with open(self.xml_filepath, 'w', encoding='utf-8') as f: + f.write('\n'.join(lines)) + + logger.info("✓ Fichier XML créé avec succès") + return True + + except Exception as e: + logger.error(f"Erreur lors de la génération XML: {e}", exc_info=True) + return False + + +def main(): + """Fonction principale""" + # Effacer la console + os.system('cls' if os.name == 'nt' else 'clear') + + print("=" * 70) + print("GÉNÉRATEUR XML CPF - Version Professionnelle") + print("=" * 70) + print() + + # Générer le XML + generator = CPFXMLGenerator(CSV_FILE, XML_FILE) + + if not generator.generate_xml(): + logger.error("❌ Échec de la génération du fichier XML") + sys.exit(1) + + print() + print("-" * 70) + print("VALIDATION DU FICHIER XML") + print("-" * 70) + + # Valider le XML + validator = XMLValidator() + + if Path(XSD_FILE).exists(): + validation_result = validator.validate_xml(XML_FILE, XSD_FILE) + + if validation_result is True: + print() + print("✓" * 35) + print("SUCCESS: Le fichier XML est conforme au schéma XSD") + print("✓" * 35) + elif validation_result is False: + print() + print("⚠" * 35) + print("ATTENTION: Le fichier XML contient des erreurs de validation") + print("Consultez les logs ci-dessus pour plus de détails") + print("⚠" * 35) + else: + print() + print("ℹ" * 35) + print("INFO: Validation XSD non effectuée (module lxml non installé)") + print("Installez lxml pour activer la validation: pip install lxml") + print("ℹ" * 35) else: - if level and (not elem.tail or not elem.tail.strip()): - elem.tail = i - - -# Fonction pour lire les données CSV et générer le fichier XML -def create_xml_from_csv(csv_filepath, xml_filepath): - logging.info("Ouverture du CSV...") - # Ouvrir le fichier CSV pour la lecture - with open(csv_filepath, newline='', encoding='utf-8') as csvfile: - reader = csv.reader(csvfile) - Allrows = list(reader) + logger.warning(f"Fichier XSD non trouvé: {XSD_FILE}") + print() + print("⚠" * 35) + print("ATTENTION: Fichier XSD non trouvé - validation impossible") + print("⚠" * 35) - # Vérification de la présence des données nécessaires - if len(Allrows) < 5: - logging.error("Les données CSV requises sont manquantes.") - exit() - - headers = Allrows[4][2:] # Sauter les deux premiers en-têtes - logging.info("Création de l'arborescence du fichier XML...") - - # Créer l'élément racine avec l'espace de noms 'cpf' - root = ET.Element("cpf:flux") - root.set("xmlns:cpf", "urn:cdc:cpf:pc5:schema:1.0.0") - root.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance") - - #Reglage du fuseau horaire - now = datetime.datetime.now(tz_paris) - value = now.strftime("%Y-%m-%dT%H:%M:%S+01:00") - - ET.SubElement(root, "cpf:idFlux").text = headers[0] - #str(random.randint(1000000000, 99999999999)) - ET.SubElement(root, "cpf:horodatage").text = value - - # Création et ajout de l'élément emetteur - emetteur = ET.SubElement(root, "cpf:emetteur") - logging.info("Emmetteur du fchier ...") - - # Création de la structure des certificateurs - idClient = str(random.randint(10000000, 99999999)) if Allrows[5][2] == '' else Allrows[5][2] - ET.SubElement(emetteur, "cpf:idClient").text = idClient - certificateurs = ET.SubElement(emetteur, "cpf:certificateurs") - certificateur = ET.SubElement(certificateurs, "cpf:certificateur") - logging.info(f"Certificateur {idClient}...") - - # Traitement pour NumClient pour le certificateur - idClient2 = str(random.randint(10000000, 99999999)) if Allrows[5][3] == '' else Allrows[5][3] - ET.SubElement(certificateur, "cpf:idClient").text = idClient2 - - # Traitement pour NumContrat pour le certificateur - idContrat = str(random.randint(10000000, 99999999)) if Allrows[5][4] == '' else Allrows[5][4] - ET.SubElement(certificateur, "cpf:idContrat").text = idContrat - - # Création de la structure des certifications - certifications = ET.SubElement(certificateur, "cpf:certifications") - certification = ET.SubElement(certifications, "cpf:certification") - - ET.SubElement(certification, "cpf:type").text = headers[5] - ET.SubElement(certification, "cpf:code").text = headers[6] - - # Ajout d'un seul passage de certification - passage_certifications = ET.SubElement(certification, "cpf:passageCertifications") - - # Itérer sur chaque ligne du fichier CSV - for row in Allrows[5:]: - # Sauter les lignes vides - if not row[0].strip(): - continue - - #Ajout des certifications - logging.info(f"Ajout des certifications : {row[7]}") - passage_certification = ET.SubElement(passage_certifications, "cpf:passageCertification") - ET.SubElement(passage_certification, "cpf:idTechnique").text = row[7] - ET.SubElement(passage_certification, "cpf:obtentionCertification").text = row[8] - ET.SubElement(passage_certification, "cpf:donneeCertifiee").text = row[9] - ET.SubElement(passage_certification, "cpf:dateDebutValidite").text = row[10] - # Vérification si la date de fin de validité est 'nil;' - if row[11].strip().lower() == 'nil': - ET.SubElement(passage_certification, "cpf:dateFinValidite", {"xsi:nil": "true"}) - else: - ET.SubElement(passage_certification, "cpf:dateFinValidite").text = row[11] - ET.SubElement(passage_certification, "cpf:presenceNiveauLangueEuro").text = row[12] - ET.SubElement(passage_certification, "cpf:presenceNiveauNumeriqueEuro").text = row[13] - if row[14].strip().lower() == 'nil': - ET.SubElement(passage_certification, "cpf:scoring", {"xsi:nil": "true"}) - else: - ET.SubElement(passage_certification, "cpf:scoring").text = row[14] - if row[15].strip().lower() == 'nil': - ET.SubElement(passage_certification, "cpf:mentionValidee", {"xsi:nil": "true"}) - else: - ET.SubElement(passage_certification, "cpf:mentionValidee").text = row[15] - - # Modalite inscription - modalites_inscription = ET.SubElement(passage_certification, "cpf:modalitesInscription") - ET.SubElement(modalites_inscription, "cpf:modaliteAcces").text = row[16] - - #Identification Titulaire - identification_titulaire = ET.SubElement(passage_certification, "cpf:identificationTitulaire") - titulaire = ET.SubElement(identification_titulaire, "cpf:titulaire") - - logging.info(f"Titulaire : {row[18]} {row[19]}") - ET.SubElement(titulaire, "cpf:nomNaissance").text = row[17] - ET.SubElement(titulaire, "cpf:nomUsage").text = row[18] - ET.SubElement(titulaire, "cpf:prenom1").text = row[19] - ET.SubElement(titulaire, "cpf:anneeNaissance").text = row[20] - ET.SubElement(titulaire, "cpf:moisNaissance").text = row[21] - ET.SubElement(titulaire, "cpf:jourNaissance").text = row[22] - ET.SubElement(titulaire, "cpf:sexe").text = row[23] - code_commune_naissance = ET.SubElement(titulaire, "cpf:codeCommuneNaissance") - code_postal_naissance = ET.SubElement(code_commune_naissance, "cpf:codePostalNaissance") - ET.SubElement(code_postal_naissance, "cpf:codePostal").text = row[24] + print() + print(f"Fichier généré: {XML_FILE}") + print(f"Logs disponibles: xml_generation.log") - # Appliquer l'indentation - indent(root) - - # Enregistrement du fichier XML - logging.info(f"Ecriture du XML...") - tree = ET.ElementTree(root) - with open(xml_filepath, "wb") as file: - tree.write(file, encoding="utf-8", xml_declaration=True) - -# Appel de la fonction pour créer le fichier XML -create_xml_from_csv(csv_file, xml_file) -logging.info("Fichier XML créé avec succès.") \ No newline at end of file +if __name__ == "__main__": + main()