Aiomysql CRUD – a versatile Python class utilizing the aiomysql library for streamlined interaction with MySQL databases.
Simplifies connection management and CRUD operations for convenience and efficiency.
This project provides a Python class, aiomysql_CRUD
, that simplifies interaction with MySQL databases using the aiomysql
library. It offers convenient methods for performing CRUD (Create, Read, Update, Delete) operations on database tables.
Features
- Asynchronous database operations for improved performance.
- Easy integration into Python projects.
- Streamlined connection management.
- Support for common database operations, including select, insert, update, and delete.
Getting Started
Install the required dependencies:
pip install aiomysql
Modify the config section in db.py to match your database settings.
Use the aiomysql_CRUD
class in your Python project to interact with your MySQL database.
Usage Here’s an example of how to use the “aiomysql_CRUD” class to perform basic database operations: import asyncio from db import aiomysql_CRUD:
import asyncio from db import aiomysql_CRUD async def main(): db = aiomysql_CRUD() await db.connect() # INSERT - The code performs an INSERT operation into the users table, inserting a record with the name "John Doe" and age 30. async def create_user(user_name, user_age): query = "INSERT INTO users (user_name, user_age) VALUES (%s, %s)" values = (user_name, user_age) await db.insert(query, values) # Now you can call the create_user function within an asynchronous context await create_user("John Doe", 35) # UPDATE - Updates a user's age to 55 based on their name ("John Doe"). async def update_user(user_name, user_age): query = "UPDATE users SET user_age = %s WHERE user_name = %s" values = (user_age, user_name) # Swap the values to match the order in the query await db.update(query, values) # This line of code calls the `update_user` function to change the age of a user named "John Doe" to 55 in the database. await update_user("John Doe", 55) # SELECT - This code defines a function `select_users_by_age` that retrieves users from the "users" table whose age is greater than the specified minimum age. # It then prints the selected users. async def select_users_by_age(min_age): query = "SELECT id, user_name, user_age FROM users WHERE user_age > %s" values = (min_age,) result = await db.select(query, values) return result min_age = 30 # Adjust this value to your desired minimum age users = await select_users_by_age(min_age) for user in users: print(user) # DELETE - Deleting a user from the users table based on their name. async def delete_user(user_name): query = "DELETE FROM users WHERE user_name = %s" values = (user_name,) await db.delete(query, values) # Calling the function to delete a user "John Doe" from the users table. await delete_user("John Doe") await db.close() if __name__ == "__main__": asyncio.run(main())
And here is the Python class:
import aiomysql class aiomysql_CRUD: def __init__(self): self.pool = None async def connect(self): try: self.pool = await aiomysql.create_pool( host='localhost', user='root', password='Qwerty1234', db='my_base', maxsize=200, autocommit=True # Adding the parameter autocommit=True. ) except Exception as e: print(f"Database connection error: {e}") async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc, tb): if self.pool: self.pool.close() await self.pool.wait_closed() async def delete(self, query, values): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, values) await conn.commit() except Exception as e: print(f"Delete query error: {e}") async def select(self, query, params): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, params) rows = await cur.fetchall() return rows except Exception as e: print(f"Select query error: {e}") async def insert(self, query, values): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, values) await conn.commit() except Exception as e: print(f"Insert query error: {e}") async def update(self, query, values): try: async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, values) await conn.commit() except Exception as e: print(f"Update query error: {e}") async def close(self): if self.pool: self.pool.close() await self.pool.wait_closed()