Simple usage Aiomysql – CRUD

Simple usage Aiomysql – CRUD

Aiomysql CRUD – a versatile Python class utilizing the aiomysql library for streamlined interaction with MySQL databases.

Simplifies connection management and CRUD operations for convenience and efficiency.

This project provides a Python class, aiomysql_CRUD, that simplifies interaction with MySQL databases using the aiomysql library. It offers convenient methods for performing CRUD (Create, Read, Update, Delete) operations on database tables.

Features

  • Asynchronous database operations for improved performance.
  • Easy integration into Python projects.
  • Streamlined connection management.
  • Support for common database operations, including select, insert, update, and delete.

Getting Started

Install the required dependencies:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install aiomysql
pip install aiomysql
pip install aiomysql

 

Modify the config section in db.py to match your database settings.

Use the aiomysql_CRUD class in your Python project to interact with your MySQL database.

Usage Here’s an example of how to use the “aiomysql_CRUD” class to perform basic database operations: import asyncio from db import aiomysql_CRUD:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import asyncio
from db import aiomysql_CRUD
async def main():
db = aiomysql_CRUD()
await db.connect()
# INSERT - The code performs an INSERT operation into the users table, inserting a record with the name "John Doe" and age 30.
async def create_user(user_name, user_age):
query = "INSERT INTO users (user_name, user_age) VALUES (%s, %s)"
values = (user_name, user_age)
await db.insert(query, values)
# Now you can call the create_user function within an asynchronous context
await create_user("John Doe", 35)
# UPDATE - Updates a user's age to 55 based on their name ("John Doe").
async def update_user(user_name, user_age):
query = "UPDATE users SET user_age = %s WHERE user_name = %s"
values = (user_age, user_name) # Swap the values to match the order in the query
await db.update(query, values)
# This line of code calls the `update_user` function to change the age of a user named "John Doe" to 55 in the database.
await update_user("John Doe", 55)
# SELECT - This code defines a function `select_users_by_age` that retrieves users from the "users" table whose age is greater than the specified minimum age.
# It then prints the selected users.
async def select_users_by_age(min_age):
query = "SELECT id, user_name, user_age FROM users WHERE user_age > %s"
values = (min_age,)
result = await db.select(query, values)
return result
min_age = 30 # Adjust this value to your desired minimum age
users = await select_users_by_age(min_age)
for user in users:
print(user)
# DELETE - Deleting a user from the users table based on their name.
async def delete_user(user_name):
query = "DELETE FROM users WHERE user_name = %s"
values = (user_name,)
await db.delete(query, values)
# Calling the function to delete a user "John Doe" from the users table.
await delete_user("John Doe")
await db.close()
if __name__ == "__main__":
asyncio.run(main())
import asyncio from db import aiomysql_CRUD async def main(): db = aiomysql_CRUD() await db.connect() # INSERT - The code performs an INSERT operation into the users table, inserting a record with the name "John Doe" and age 30. async def create_user(user_name, user_age): query = "INSERT INTO users (user_name, user_age) VALUES (%s, %s)" values = (user_name, user_age) await db.insert(query, values) # Now you can call the create_user function within an asynchronous context await create_user("John Doe", 35) # UPDATE - Updates a user's age to 55 based on their name ("John Doe"). async def update_user(user_name, user_age): query = "UPDATE users SET user_age = %s WHERE user_name = %s" values = (user_age, user_name) # Swap the values to match the order in the query await db.update(query, values) # This line of code calls the `update_user` function to change the age of a user named "John Doe" to 55 in the database. await update_user("John Doe", 55) # SELECT - This code defines a function `select_users_by_age` that retrieves users from the "users" table whose age is greater than the specified minimum age. # It then prints the selected users. async def select_users_by_age(min_age): query = "SELECT id, user_name, user_age FROM users WHERE user_age > %s" values = (min_age,) result = await db.select(query, values) return result min_age = 30 # Adjust this value to your desired minimum age users = await select_users_by_age(min_age) for user in users: print(user) # DELETE - Deleting a user from the users table based on their name. async def delete_user(user_name): query = "DELETE FROM users WHERE user_name = %s" values = (user_name,) await db.delete(query, values) # Calling the function to delete a user "John Doe" from the users table. await delete_user("John Doe") await db.close() if __name__ == "__main__": asyncio.run(main())
import asyncio
from db import aiomysql_CRUD

async def main():
    db = aiomysql_CRUD()
    await db.connect()

    # INSERT - The code performs an INSERT operation into the users table, inserting a record with the name "John Doe" and age 30.
    async def create_user(user_name, user_age):
        query = "INSERT INTO users (user_name, user_age) VALUES (%s, %s)"
        values = (user_name, user_age)
        await db.insert(query, values)

    # Now you can call the create_user function within an asynchronous context
    await create_user("John Doe", 35)


    # UPDATE - Updates a user's age to 55 based on their name ("John Doe").
    async def update_user(user_name, user_age):
        query = "UPDATE users SET user_age = %s WHERE user_name = %s"
        values = (user_age, user_name)  # Swap the values to match the order in the query
        await db.update(query, values)

    # This line of code calls the `update_user` function to change the age of a user named "John Doe" to 55 in the database.
    await update_user("John Doe", 55)


    # SELECT - This code defines a function `select_users_by_age` that retrieves users from the "users" table whose age is greater than the specified minimum age.
    # It then prints the selected users.
    async def select_users_by_age(min_age):
        query = "SELECT id, user_name, user_age FROM users WHERE user_age > %s"
        values = (min_age,)
        result = await db.select(query, values)
        return result

    min_age = 30  # Adjust this value to your desired minimum age
    users = await select_users_by_age(min_age)

    for user in users:
        print(user)

    # DELETE - Deleting a user from the users table based on their name.
    async def delete_user(user_name):
        query = "DELETE FROM users WHERE user_name = %s"
        values = (user_name,)
        await db.delete(query, values)


    # Calling the function to delete a user "John Doe" from the users table.
    await delete_user("John Doe")



    await db.close()

if __name__ == "__main__":
    asyncio.run(main())

 

And here is the Python class:

 

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import aiomysql
class aiomysql_CRUD:
def __init__(self):
self.pool = None
async def connect(self):
try:
self.pool = await aiomysql.create_pool(
host='localhost',
user='root',
password='Qwerty1234',
db='my_base',
maxsize=200,
autocommit=True # Adding the parameter autocommit=True.
)
except Exception as e:
print(f"Database connection error: {e}")
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
async def delete(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Delete query error: {e}")
async def select(self, query, params):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
rows = await cur.fetchall()
return rows
except Exception as e:
print(f"Select query error: {e}")
async def insert(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Insert query error: {e}")
async def update(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Update query error: {e}")
async def close(self):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
import aiomysql class aiomysql_CRUD: def __init__(self): self.pool = None async def connect(self): try: self.pool = await aiomysql.create_pool( host='localhost', user='root', password='Qwerty1234', db='my_base', maxsize=200, autocommit=True # Adding the parameter autocommit=True. ) except Exception as e: print(f"Database connection error: {e}") async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc, tb): if self.pool: self.pool.close() await self.pool.wait_closed() async def delete(self, query, values): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, values) await conn.commit() except Exception as e: print(f"Delete query error: {e}") async def select(self, query, params): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, params) rows = await cur.fetchall() return rows except Exception as e: print(f"Select query error: {e}") async def insert(self, query, values): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, values) await conn.commit() except Exception as e: print(f"Insert query error: {e}") async def update(self, query, values): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, values) await conn.commit() except Exception as e: print(f"Update query error: {e}") async def close(self): if self.pool: self.pool.close() await self.pool.wait_closed()
import aiomysql

class aiomysql_CRUD:
    def __init__(self):
        self.pool = None

    async def connect(self):
        try:
            self.pool = await aiomysql.create_pool(
                host='localhost',
                user='root',
                password='Qwerty1234',
                db='my_base',
                maxsize=200,
                autocommit=True  # Adding the parameter autocommit=True.
            )
        except Exception as e:
            print(f"Database connection error: {e}")

    async def __aenter__(self):
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()

    async def delete(self, query, values):
        try:
                async with self.pool.acquire() as conn:
                    async with conn.cursor() as cur:
                        await cur.execute(query, values)
                        await conn.commit()
        except Exception as e:
            print(f"Delete query error: {e}")

    async def select(self, query, params):
        try:
                async with self.pool.acquire() as conn:
                    async with conn.cursor() as cur:
                        await cur.execute(query, params)
                        rows = await cur.fetchall()
                        return rows
        except Exception as e:
            print(f"Select query error: {e}")

    async def insert(self, query, values):
        try:
                async with self.pool.acquire() as conn:
                    async with conn.cursor() as cur:
                        await cur.execute(query, values)
                        await conn.commit()
        except Exception as e:
            print(f"Insert query error: {e}")

    async def update(self, query, values):
        try:
                async with self.pool.acquire() as conn:
                    async with conn.cursor() as cur:
                        await cur.execute(query, values)
                        await conn.commit()
        except Exception as e:
            print(f"Update query error: {e}")

    async def close(self):
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()

GitHub Aiomysql CRUD

Download on GitHub >>>

 

Leave a Comment