Jump to content

Applied Programming/Databases/Python3 MySQL

From Wikiversity

databases.py

[edit | edit source]
"""This program demonstrates MySQL database processing.

Input:
    None

Output:
    Sample data.

References:
    https://en.wikiversity.org/wiki/Python_Programming/Databases
    https://www.w3schools.com/python/python_mysql_insert.asp

***IMPORTANT***
Download Link for MySQL: https://dev.mysql.com/downloads/installer/

You must install MySQL Python driver for this to work...
Using: 'python -m pip install mysql-connector'

Please see included 'w3schools' link for more info!

"""

import sys
import mysql.connector

DATABASE = "mydatabase"


def execute_sql(sql):
    """Executes the given sql statement.

    Args:
        sql (string): A valid SQL statement to execute.

    Returns:
        None.

    """
    try:
        conn = mysql.connector.connect(
          host="serveradress",
          user="username",
          passwd="password",
          database="mydatabase"
        )

    except:
        print(f"Unable to connect to {DATABASE}")
        raise

    try:
        cursor = conn.cursor()
        cursor.execute(sql)
        conn.commit()
    except Exception as exception:
        print(f"Unable to execute {sql}")
        print(exception)
    finally:
        conn.close()


def create_table():
    """Creates the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = "DROP TABLE IF EXISTS Users;"
    execute_sql(sql)

    sql = """
        CREATE TABLE
        Users(
        UserID INT PRIMARY KEY NOT NULL,
        [User] TEXT NOT NULL, Age INT NOT NULL
        );
        """
    execute_sql(sql)


def insert_users():
    """Inserts data into the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = "INSERT INTO Users(UserID, [User], Age) VALUES(1, 'Moe', 28);"
    execute_sql(sql)

    sql = "INSERT INTO Users(UserID, [User], Age) VALUES(2, 'Larry', 55);"
    execute_sql(sql)

    sql = "INSERT INTO Users(UserID, [User], Age) VALUES(3, 'Curly', 19);"
    execute_sql(sql)


def display_table():
    """Displays the Users table.

    Args:
        None.

    Returns:
        None.

    """
    try:
        conn = mysql.connector.connect(
          host="serveradress",
          user="username",
          passwd="password",
          database="mydatabase"
        )

    except:
        print(f"Unable to connect to {DATABASE}")
        raise

    try:
        cursor = conn.cursor()

        sql = """
            SELECT UserID, [User], Age FROM Users;
            """
        cursor.execute(sql)
        rows = cursor.fetchall()

        for row in rows:
            print(row)
        print()
    except Exception as exception:
        print(f"Error processing {sql}")
        print(exception)
    finally:
        conn.close()


def update_user():
    """Updates the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = """
        UPDATE Users
        SET [User] = 'Shemp'
        WHERE UserID = 3;
        """
    execute_sql(sql)


def delete_user():
    """Deletes a record from the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = """
        DELETE FROM Users
        WHERE UserID = 3;
        """
    execute_sql(sql)


def display_list_of_tables():
    """Deletes a record from the Users table.

    Args:
        None.

    Returns:
        None.

    """
    print("List of the tables in database: ", end='')
    try:
        conn = pyodbc.connect(
            'DRIVER={SQL Server};'
            'SERVER=DESKTOP\\MSSQLSERVER01;'
            'PORT=1433;'
            'DATABASE=users.db;'
            'UID=PyUser;'
            'PWD=pass123'
        )
    except:
        print(f"Unable to connect to {DATABASE}")
        raise
    try:
        cursor = conn.cursor()
        sql = """
            SELECT * FROM INFORMATION_SCHEMA.TABLES
            WHERE TABLE_TYPE='BASE TABLE';
            """
        cursor.execute(sql)

        rows = cursor.fetchall()
        if rows == []:
            print("Database is Empty, all Tables deleted.")
        else:
            for row in rows:
                print(row)
    except Exception as exception:
        print(f"Error processing {sql}")
        print(exception)
    finally:
        conn.close()


def delete_table():
    """Deletes a record from the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = """
        DROP TABLE Users;
        """
    execute_sql(sql)
    print('Users Table was deleted.\n')


def main():
    """Runs the main program logic."""

    try:
        print("Users")
        create_table()
        insert_users()
        display_table()

        print("Users After Update")
        update_user()
        display_table()

        print("Users After Delete")
        delete_user()
        display_table()

        print("Delete Table")
        display_list_of_tables()
        delete_table()
        display_list_of_tables()
    except:
        print("Unexpected error.")
        print("Error:", sys.exc_info()[1])
        print("File: ", sys.exc_info()[2].tb_frame.f_code.co_filename)
        print("Line: ", sys.exc_info()[2].tb_lineno)


main()