import sqlite3 from datetime import datetime import json import os from contextlib import contextmanager import time class Database: def __init__(self, db_file="invoices.db"): self.db_file = db_file self.version = 1 self.init_db() @contextmanager def get_connection(self, timeout=30, max_retries=3): retries = 0 while retries < max_retries: try: conn = sqlite3.connect(self.db_file, timeout=timeout) conn.execute("PRAGMA journal_mode=WAL") # Utiliser le mode WAL pour de meilleures performances conn.execute("PRAGMA busy_timeout=5000") # Timeout de 5 secondes try: yield conn conn.commit() finally: conn.close() break except sqlite3.OperationalError as e: if "database is locked" in str(e) and retries < max_retries - 1: retries += 1 time.sleep(0.1 * (2 ** retries)) # Backoff exponentiel continue raise def init_db(self): with self.get_connection() as conn: c = conn.cursor() # Table des versions c.execute(''' CREATE TABLE IF NOT EXISTS db_version ( version INTEGER PRIMARY KEY ) ''') # Vérifier la version actuelle c.execute('SELECT version FROM db_version') result = c.fetchone() current_version = result[0] if result else 0 # Appliquer les migrations si nécessaire if current_version < self.version: self._apply_migrations(conn, current_version) conn.commit() def _apply_migrations(self, conn, current_version): c = conn.cursor() # Migration 1: Structure initiale if current_version < 1: # Table des clients c.execute(''' CREATE TABLE IF NOT EXISTS clients ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, address TEXT NOT NULL, postal_code TEXT NOT NULL, town TEXT NOT NULL, country TEXT NOT NULL, vat_number TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Table des factures c.execute(''' CREATE TABLE IF NOT EXISTS invoices ( id INTEGER PRIMARY KEY AUTOINCREMENT, invoice_number TEXT UNIQUE NOT NULL, client_id INTEGER NOT NULL, amount DECIMAL(10,2) NOT NULL, currency TEXT NOT NULL, language TEXT NOT NULL, status TEXT DEFAULT 'issued', issue_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, due_date TIMESTAMP, typst_content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (client_id) REFERENCES clients (id) ) ''') # Mettre à jour la version c.execute('INSERT INTO db_version (version) VALUES (1)') current_version = 1 def get_next_invoice_number(self): with self.get_connection() as conn: c = conn.cursor() c.execute('SELECT MAX(CAST(SUBSTR(invoice_number, 1) AS INTEGER)) FROM invoices') result = c.fetchone() last_number = result[0] if result[0] is not None else 0 return str(last_number + 1).zfill(3) def add_client(self, name, address, postal_code, town, country, vat_number=None): with self.get_connection() as conn: c = conn.cursor() c.execute(''' INSERT INTO clients (name, address, postal_code, town, country, vat_number) VALUES (?, ?, ?, ?, ?, ?) ''', (name, address, postal_code, town, country, vat_number)) client_id = c.lastrowid conn.commit() return client_id def add_invoice(self, invoice_number, client_id, amount, currency, language, typst_content, due_date=None): with self.get_connection() as conn: c = conn.cursor() c.execute(''' INSERT INTO invoices (invoice_number, client_id, amount, currency, language, typst_content, due_date) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (invoice_number, client_id, amount, currency, language, typst_content, due_date)) invoice_id = c.lastrowid conn.commit() return invoice_id def get_invoices(self, filters=None): with self.get_connection() as conn: c = conn.cursor() query = ''' SELECT i.*, c.name as client_name, c.address, c.postal_code, c.town, c.country, c.vat_number FROM invoices i JOIN clients c ON i.client_id = c.id ''' params = [] if filters: conditions = [] if 'status' in filters: conditions.append('i.status = ?') params.append(filters['status']) if 'client_id' in filters: conditions.append('i.client_id = ?') params.append(filters['client_id']) if 'date_from' in filters: conditions.append('i.issue_date >= ?') params.append(filters['date_from']) if 'date_to' in filters: conditions.append('i.issue_date <= ?') params.append(filters['date_to']) if conditions: query += ' WHERE ' + ' AND '.join(conditions) query += ' ORDER BY i.issue_date DESC' c.execute(query, params) columns = [description[0] for description in c.description] invoices = [dict(zip(columns, row)) for row in c.fetchall()] return invoices def get_invoice(self, invoice_id): with self.get_connection() as conn: c = conn.cursor() c.execute(''' SELECT i.*, c.name as client_name, c.address, c.postal_code, c.town, c.country, c.vat_number FROM invoices i JOIN clients c ON i.client_id = c.id WHERE i.id = ? ''', (invoice_id,)) columns = [description[0] for description in c.description] invoice = dict(zip(columns, c.fetchone())) return invoice def update_invoice_status(self, invoice_id, status): with self.get_connection() as conn: c = conn.cursor() c.execute('UPDATE invoices SET status = ? WHERE id = ?', (status, invoice_id)) conn.commit() def get_statistics(self): """Récupérer les statistiques des factures""" with self.get_connection() as conn: c = conn.cursor() # Nombre total de factures c.execute('SELECT COUNT(*) FROM invoices') total_invoices = c.fetchone()[0] or 0 # Montant total des factures c.execute('SELECT SUM(amount) FROM invoices') total_amount = c.fetchone()[0] or 0 # Montant des factures payées c.execute('SELECT SUM(amount) FROM invoices WHERE status = "paid"') total_paid = c.fetchone()[0] or 0 # Montant des factures en retard c.execute('SELECT SUM(amount) FROM invoices WHERE status = "overdue"') total_overdue = c.fetchone()[0] or 0 # Montant des factures annulées c.execute('SELECT SUM(amount) FROM invoices WHERE status = "cancelled"') total_cancelled = c.fetchone()[0] or 0 return { 'total_invoices': total_invoices, 'total_amount': total_amount, 'total_paid': total_paid, 'total_overdue': total_overdue, 'total_cancelled': total_cancelled }