Simple usage Aiomysql – CRUD
Aiomysql CRUD – a versatile Python class utilizing the aiomysql library for streamlined interaction with MySQL databases.
Simplifies connection management and CRUD operations for convenience and efficiency.
This project provides a Python class, aiomysql_CRUD
, that simplifies interaction with MySQL databases using the aiomysql
library. It offers convenient methods for performing CRUD (Create, Read, Update, Delete) operations on database tables.
Features
- Asynchronous database operations for improved performance.
- Easy integration into Python projects.
- Streamlined connection management.
- Support for common database operations, including select, insert, update, and delete.
Getting Started
Install the required dependencies:
pip install aiomysql
Modify the config section in db.py to match your database settings.
Use the aiomysql_CRUD
class in your Python project to interact with your MySQL database.
Usage Here’s an example of how to use the “aiomysql_CRUD” class to perform basic database operations: import asyncio from db import aiomysql_CRUD:
from db import aiomysql_CRUD
# INSERT - The code performs an INSERT operation into the users table, inserting a record with the name "John Doe" and age 30.
async def create_user(user_name, user_age):
query = "INSERT INTO users (user_name, user_age) VALUES (%s, %s)"
values = (user_name, user_age)
await db.insert(query, values)
# Now you can call the create_user function within an asynchronous context
await create_user("John Doe", 35)
# UPDATE - Updates a user's age to 55 based on their name ("John Doe").
async def update_user(user_name, user_age):
query = "UPDATE users SET user_age = %s WHERE user_name = %s"
values = (user_age, user_name) # Swap the values to match the order in the query
await db.update(query, values)
# This line of code calls the `update_user` function to change the age of a user named "John Doe" to 55 in the database.
await update_user("John Doe", 55)
# SELECT - This code defines a function `select_users_by_age` that retrieves users from the "users" table whose age is greater than the specified minimum age.
# It then prints the selected users.
async def select_users_by_age(min_age):
query = "SELECT id, user_name, user_age FROM users WHERE user_age > %s"
result = await db.select(query, values)
min_age = 30 # Adjust this value to your desired minimum age
users = await select_users_by_age(min_age)
# DELETE - Deleting a user from the users table based on their name.
async def delete_user(user_name):
query = "DELETE FROM users WHERE user_name = %s"
await db.delete(query, values)
# Calling the function to delete a user "John Doe" from the users table.
await delete_user("John Doe")
if __name__ == "__main__":
import asyncio
from db import aiomysql_CRUD
async def main():
db = aiomysql_CRUD()
await db.connect()
# INSERT - The code performs an INSERT operation into the users table, inserting a record with the name "John Doe" and age 30.
async def create_user(user_name, user_age):
query = "INSERT INTO users (user_name, user_age) VALUES (%s, %s)"
values = (user_name, user_age)
await db.insert(query, values)
# Now you can call the create_user function within an asynchronous context
await create_user("John Doe", 35)
# UPDATE - Updates a user's age to 55 based on their name ("John Doe").
async def update_user(user_name, user_age):
query = "UPDATE users SET user_age = %s WHERE user_name = %s"
values = (user_age, user_name) # Swap the values to match the order in the query
await db.update(query, values)
# This line of code calls the `update_user` function to change the age of a user named "John Doe" to 55 in the database.
await update_user("John Doe", 55)
# SELECT - This code defines a function `select_users_by_age` that retrieves users from the "users" table whose age is greater than the specified minimum age.
# It then prints the selected users.
async def select_users_by_age(min_age):
query = "SELECT id, user_name, user_age FROM users WHERE user_age > %s"
values = (min_age,)
result = await db.select(query, values)
return result
min_age = 30 # Adjust this value to your desired minimum age
users = await select_users_by_age(min_age)
for user in users:
print(user)
# DELETE - Deleting a user from the users table based on their name.
async def delete_user(user_name):
query = "DELETE FROM users WHERE user_name = %s"
values = (user_name,)
await db.delete(query, values)
# Calling the function to delete a user "John Doe" from the users table.
await delete_user("John Doe")
await db.close()
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from db import aiomysql_CRUD
async def main():
db = aiomysql_CRUD()
await db.connect()
# INSERT - The code performs an INSERT operation into the users table, inserting a record with the name "John Doe" and age 30.
async def create_user(user_name, user_age):
query = "INSERT INTO users (user_name, user_age) VALUES (%s, %s)"
values = (user_name, user_age)
await db.insert(query, values)
# Now you can call the create_user function within an asynchronous context
await create_user("John Doe", 35)
# UPDATE - Updates a user's age to 55 based on their name ("John Doe").
async def update_user(user_name, user_age):
query = "UPDATE users SET user_age = %s WHERE user_name = %s"
values = (user_age, user_name) # Swap the values to match the order in the query
await db.update(query, values)
# This line of code calls the `update_user` function to change the age of a user named "John Doe" to 55 in the database.
await update_user("John Doe", 55)
# SELECT - This code defines a function `select_users_by_age` that retrieves users from the "users" table whose age is greater than the specified minimum age.
# It then prints the selected users.
async def select_users_by_age(min_age):
query = "SELECT id, user_name, user_age FROM users WHERE user_age > %s"
values = (min_age,)
result = await db.select(query, values)
return result
min_age = 30 # Adjust this value to your desired minimum age
users = await select_users_by_age(min_age)
for user in users:
print(user)
# DELETE - Deleting a user from the users table based on their name.
async def delete_user(user_name):
query = "DELETE FROM users WHERE user_name = %s"
values = (user_name,)
await db.delete(query, values)
# Calling the function to delete a user "John Doe" from the users table.
await delete_user("John Doe")
await db.close()
if __name__ == "__main__":
asyncio.run(main())
And here is the Python class:
self.pool = await aiomysql.create_pool(
autocommit=True # Adding the parameter autocommit=True.
print(f"Database connection error: {e}")
async def __aenter__(self):
async def __aexit__(self, exc_type, exc, tb):
await self.pool.wait_closed()
async def delete(self, query, values):
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
print(f"Delete query error: {e}")
async def select(self, query, params):
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
rows = await cur.fetchall()
print(f"Select query error: {e}")
async def insert(self, query, values):
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
print(f"Insert query error: {e}")
async def update(self, query, values):
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
print(f"Update query error: {e}")
await self.pool.wait_closed()
import aiomysql
class aiomysql_CRUD:
def __init__(self):
self.pool = None
async def connect(self):
try:
self.pool = await aiomysql.create_pool(
host='localhost',
user='root',
password='Qwerty1234',
db='my_base',
maxsize=200,
autocommit=True # Adding the parameter autocommit=True.
)
except Exception as e:
print(f"Database connection error: {e}")
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
async def delete(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Delete query error: {e}")
async def select(self, query, params):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
rows = await cur.fetchall()
return rows
except Exception as e:
print(f"Select query error: {e}")
async def insert(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Insert query error: {e}")
async def update(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Update query error: {e}")
async def close(self):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
import aiomysql
class aiomysql_CRUD:
def __init__(self):
self.pool = None
async def connect(self):
try:
self.pool = await aiomysql.create_pool(
host='localhost',
user='root',
password='Qwerty1234',
db='my_base',
maxsize=200,
autocommit=True # Adding the parameter autocommit=True.
)
except Exception as e:
print(f"Database connection error: {e}")
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
async def delete(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Delete query error: {e}")
async def select(self, query, params):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
rows = await cur.fetchall()
return rows
except Exception as e:
print(f"Select query error: {e}")
async def insert(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Insert query error: {e}")
async def update(self, query, values):
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, values)
await conn.commit()
except Exception as e:
print(f"Update query error: {e}")
async def close(self):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
Download on GitHub >>>
Author: Bogdan Kuhar
Full Stack Developer/coach
https://www.youtube.com/@imimir_com
info@imimir.com