Server-Side Scripting/SQL Databases/Python (FastAPI)

From Wikiversity
Jump to navigation Jump to search

routers/lesson9.py[edit | edit source]

# This program creates and displays a temperature database
# with options to insert, update, and delete records.
#
# References:
#   https://en.wikibooks.org/wiki/Python_Programming
#   https://en.wikiversity.org/wiki/Python_Programming/Databases

from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates

import sqlite3

router = APIRouter(prefix="/lesson9")
templates = Jinja2Templates(directory="templates")

DATABASE = "temperature.db"

@router.get("/", response_class=HTMLResponse)
async def get_lesson9(request: Request):
    try:
        check_database()
        result = get_data()
    except Exception as exception:
        result = exception

    return templates.TemplateResponse(
        "lesson9.html", 
        {   
            "request": request,
            "table": result
        }
    )

@router.post("/", response_class=HTMLResponse)
async def post_lesson9(request: Request):
    form = dict(await request.form())
    try:
        country = form["country"].strip()
        temperature = form["temperature"].strip()

        if not country_exists(country):
            insert_country(country, temperature)
        elif temperature != "":
            update_country(country, temperature)
        else:
            delete_country(country)

        result = get_data()
    except Exception as exception:
        result = exception

    return templates.TemplateResponse(
        "lesson9.html", 
        {   
            "request": request,
            "table": result
        }
    )

def check_database():
    with sqlite3.connect(DATABASE) as connection:
        cursor = connection.cursor()
        sql = """
            SELECT COUNT(*) AS Count FROM sqlite_master
            WHERE name = 'Countries';
        """
        cursor.execute(sql)
        count = cursor.fetchone()[0]
        if count > 0:
            return
        
        sql = """
            CREATE TABLE Countries(
                ID INTEGER PRIMARY KEY AUTOINCREMENT,
                Country TEXT UNIQUE NOT NULL,
                Temperature REAL NOT NULL);
        """
        cursor.execute(sql)

def get_data():
    with sqlite3.connect(DATABASE) as connection:
        cursor = connection.cursor()
        sql = """
            SELECT ID, Country, Temperature FROM Countries;
        """
        cursor.execute(sql)
        rows = cursor.fetchall()
        result = "<table><tr><th>ID</th>"
        result += "<th>Country</th>"
        result += "<th>Temperature</th></tr>"
        for row in rows:
            result += f"<tr><td>{row[0]}</td>"
            result += f"<td>{row[1]}</td>"
            result += f"<td>{row[2]}</td></tr>"
        result += "</table>"
        return result

def country_exists(country):
    with sqlite3.connect(DATABASE) as connection:
        cursor = connection.cursor()
        sql = """
            SELECT EXISTS(
                SELECT * FROM Countries
                WHERE Country = ?) AS Count;
        """
        parameters = (country,)
        cursor.execute(sql, parameters)
        result = cursor.fetchone()[0]
        return result

def insert_country(country, temperature):
    with sqlite3.connect(DATABASE) as connection:
        cursor = connection.cursor()
        sql = """
            INSERT INTO Countries (Country, Temperature)
            VALUES(?, ?);
        """
        parameters = (country, temperature)
        cursor.execute(sql, parameters)
        connection.commit()

def update_country(country, temperature):
    with sqlite3.connect(DATABASE) as connection:
        cursor = connection.cursor()
        sql = """
            UPDATE Countries
            SET Temperature = ?
            WHERE Country = ?;
        """
        parameters = (temperature, country)
        cursor.execute(sql, parameters)
        connection.commit()

def delete_country(country):
    with sqlite3.connect(DATABASE) as connection:
        cursor = connection.cursor()
        sql = """
            DELETE FROM Countries
            WHERE Country = ?;
        """
        parameters = (country, )
        cursor.execute(sql, parameters)
        connection.commit()

templates/lesson9.html[edit | edit source]

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <title>Lesson 9</title>
    <link rel="stylesheet" href="{{url_for('static', path='/styles.css')}}">
</head>

<body>
    <h1>Temperature Data Entry</h1>
    <p>Enter country and temperature. Enter again to update.</p>
    <p>Enter country without temperature to delete.</p>
    <form method="POST">
        <p><label for="country">Country:</label>
            <input type="text" id="country" name="country" required>
        </p>
        <p><label for="stop">Temperature:</label>
            <input type="text" id="temperature" name="temperature">
        </p>
        <p><input type="submit" name="submit" value="Submit"></p>
    </form>
    <hr>
    {{table|safe}}
</body>

</html>

Try It[edit | edit source]

See Server-Side Scripting/Routes and Templates/Python (FastAPI) to create a test environment.

See Also[edit | edit source]