invoice-manager/database.py

215 lines
8.3 KiB
Python

import sqlite3
from datetime import datetime
import json
import os
from contextlib import contextmanager
import time
class Database:
def __init__(self, db_file="invoices.db"):
self.db_file = db_file
self.version = 1
self.init_db()
@contextmanager
def get_connection(self, timeout=30, max_retries=3):
retries = 0
while retries < max_retries:
try:
conn = sqlite3.connect(self.db_file, timeout=timeout)
conn.execute("PRAGMA journal_mode=WAL") # Utiliser le mode WAL pour de meilleures performances
conn.execute("PRAGMA busy_timeout=5000") # Timeout de 5 secondes
try:
yield conn
conn.commit()
finally:
conn.close()
break
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and retries < max_retries - 1:
retries += 1
time.sleep(0.1 * (2 ** retries)) # Backoff exponentiel
continue
raise
def init_db(self):
with self.get_connection() as conn:
c = conn.cursor()
# Table des versions
c.execute('''
CREATE TABLE IF NOT EXISTS db_version (
version INTEGER PRIMARY KEY
)
''')
# Vérifier la version actuelle
c.execute('SELECT version FROM db_version')
result = c.fetchone()
current_version = result[0] if result else 0
# Appliquer les migrations si nécessaire
if current_version < self.version:
self._apply_migrations(conn, current_version)
conn.commit()
def _apply_migrations(self, conn, current_version):
c = conn.cursor()
# Migration 1: Structure initiale
if current_version < 1:
# Table des clients
c.execute('''
CREATE TABLE IF NOT EXISTS clients (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
address TEXT NOT NULL,
postal_code TEXT NOT NULL,
town TEXT NOT NULL,
country TEXT NOT NULL,
vat_number TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Table des factures
c.execute('''
CREATE TABLE IF NOT EXISTS invoices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
invoice_number TEXT UNIQUE NOT NULL,
client_id INTEGER NOT NULL,
amount DECIMAL(10,2) NOT NULL,
currency TEXT NOT NULL,
language TEXT NOT NULL,
status TEXT DEFAULT 'issued',
issue_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
due_date TIMESTAMP,
typst_content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (client_id) REFERENCES clients (id)
)
''')
# Mettre à jour la version
c.execute('INSERT INTO db_version (version) VALUES (1)')
current_version = 1
def get_next_invoice_number(self):
with self.get_connection() as conn:
c = conn.cursor()
c.execute('SELECT MAX(CAST(SUBSTR(invoice_number, 1) AS INTEGER)) FROM invoices')
result = c.fetchone()
last_number = result[0] if result[0] is not None else 0
return str(last_number + 1).zfill(3)
def add_client(self, name, address, postal_code, town, country, vat_number=None):
with self.get_connection() as conn:
c = conn.cursor()
c.execute('''
INSERT INTO clients (name, address, postal_code, town, country, vat_number)
VALUES (?, ?, ?, ?, ?, ?)
''', (name, address, postal_code, town, country, vat_number))
client_id = c.lastrowid
conn.commit()
return client_id
def add_invoice(self, invoice_number, client_id, amount, currency, language, typst_content, due_date=None):
with self.get_connection() as conn:
c = conn.cursor()
c.execute('''
INSERT INTO invoices (invoice_number, client_id, amount, currency, language, typst_content, due_date)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (invoice_number, client_id, amount, currency, language, typst_content, due_date))
invoice_id = c.lastrowid
conn.commit()
return invoice_id
def get_invoices(self, filters=None):
with self.get_connection() as conn:
c = conn.cursor()
query = '''
SELECT i.*, c.name as client_name, c.address, c.postal_code, c.town, c.country, c.vat_number
FROM invoices i
JOIN clients c ON i.client_id = c.id
'''
params = []
if filters:
conditions = []
if 'status' in filters:
conditions.append('i.status = ?')
params.append(filters['status'])
if 'client_id' in filters:
conditions.append('i.client_id = ?')
params.append(filters['client_id'])
if 'date_from' in filters:
conditions.append('i.issue_date >= ?')
params.append(filters['date_from'])
if 'date_to' in filters:
conditions.append('i.issue_date <= ?')
params.append(filters['date_to'])
if conditions:
query += ' WHERE ' + ' AND '.join(conditions)
query += ' ORDER BY i.issue_date DESC'
c.execute(query, params)
columns = [description[0] for description in c.description]
invoices = [dict(zip(columns, row)) for row in c.fetchall()]
return invoices
def get_invoice(self, invoice_id):
with self.get_connection() as conn:
c = conn.cursor()
c.execute('''
SELECT i.*, c.name as client_name, c.address, c.postal_code, c.town, c.country, c.vat_number
FROM invoices i
JOIN clients c ON i.client_id = c.id
WHERE i.id = ?
''', (invoice_id,))
columns = [description[0] for description in c.description]
invoice = dict(zip(columns, c.fetchone()))
return invoice
def update_invoice_status(self, invoice_id, status):
with self.get_connection() as conn:
c = conn.cursor()
c.execute('UPDATE invoices SET status = ? WHERE id = ?', (status, invoice_id))
conn.commit()
def get_statistics(self):
"""Récupérer les statistiques des factures"""
with self.get_connection() as conn:
c = conn.cursor()
# Nombre total de factures
c.execute('SELECT COUNT(*) FROM invoices')
total_invoices = c.fetchone()[0] or 0
# Montant total des factures
c.execute('SELECT SUM(amount) FROM invoices')
total_amount = c.fetchone()[0] or 0
# Montant des factures payées
c.execute('SELECT SUM(amount) FROM invoices WHERE status = "paid"')
total_paid = c.fetchone()[0] or 0
# Montant des factures en retard
c.execute('SELECT SUM(amount) FROM invoices WHERE status = "overdue"')
total_overdue = c.fetchone()[0] or 0
# Montant des factures annulées
c.execute('SELECT SUM(amount) FROM invoices WHERE status = "cancelled"')
total_cancelled = c.fetchone()[0] or 0
return {
'total_invoices': total_invoices,
'total_amount': total_amount,
'total_paid': total_paid,
'total_overdue': total_overdue,
'total_cancelled': total_cancelled
}