> 文章列表 > 使用records操作数据库

使用records操作数据库

使用records操作数据库

records数据库操作库

    • Python的records库简介
    • 安装records库
      • 连接数据库
    • 示例代码:records操作Mysql
    • 示例代码:records操作sqllite

Python的records库简介

records库提供简便地处理 SQL 数据库的方法,简单好用。

安装records库

https://blog.csdn.net/weixin_42725107/article/details/100766593

# 安装records库
pip install records# 安装 MySQL 驱动程序:
pip install records[mysql]
pip install records mysqlclient# 安装 PostgreSQL 驱动程序
pip install records[postgresql]# 安装oracle支持
pip install cx_Oracle

连接数据库

Records 是基于 SQLAlchemy 实现的,所以数据库连接方式参考:https://docs.sqlalchemy.org/en/13/dialects/

参考:https://blog.csdn.net/weixin_42725107/article/details/100766593

# 1)连接sqllite
db = records.Database(‘sqlite:///users.db’)
db = records.Database(‘sqlite:absolute/path/to/file.db’)# 2)Oracle 数据库连接示例:
## 先安装 cx_Oracle
oracle://root:1234@ORCL#3)MySQL 数据库连接串示例:
mysql://root:12345@localhost/mydb?charset=utf8# 4)PostgreSQL 数据库连接串示例:
postgresql://postgres:1234@localhost/mydb# 5)SQL Server 数据库连接示例:
## 首先安装 pymssql
mssql+pymssql://sa:12345@localhost:1433/mydb

示例代码:records操作Mysql

# -*- coding: utf-8 -*-
# @Time    : 3/17/21 5:29 下午
# @Author  : qiaofei.li
# @FileName: records_db.py
# @Software: PyCharm
# @Blog    :https://blog.csdn.net/omaidb
"""
records操作mysql数据的模块
"""import records# 数据库查询类
class SqlQuery:def __init__(self, user, password, host, port: int, db_name):"""初始化数据库连接:param user: 传入数据库用户名:param password: 传入数据库登录密码:param host: 传入数据库主机地址:param port: 传入端口号:param db_name: 传入数据库名称"""self.db = records.Database(f'mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}?charset=utf8',pool_recycle=3600)def db_query(self, sql语句):"""执行数据库语句功能:param sql语句::return: 以列表方式返回查询结果"""# 连接数据库rows = self.db.query(sql语句)return rows.all()def db_query_dict(self, sql语句):"""执行数据库查询功能,以字典方式返回结果:param sql语句::return: 以字典返回查询结果"""# 连接数据库rows = self.db.query(sql语句)return rows.all(as_dict=True)def db_transaction(self, sql语句1, sql语句2):"""事务类执行方法:param sql语句2::param sql语句1::return:"""with self.db.transaction()as t:t.query(sql语句1)print("user1已执行插入...")t.query(sql语句2)print("user2已执行插入...")  # 如果没有打印user2执行成功的信息,就是自动回滚事务了def export_query_file(self, sql语句, 导出的路径和文件, 导出格式):"""导出查询结果为文件:param 导出的路径和文件::param sql语句::param 导出格式: 传入文件格式,json或xlsx,yaml,html:return:"""rows = self.db.query(sql语句)with open(f'{导出的路径和文件}.{导出格式}', 'wb')as f:f.write(rows.export(导出格式))def delte_table(self, table_name):"""删除指定的数据表:param table_name: 传入要删除的表名:return:"""drop_table_sql = f"drop table if exists {table_name};"self.db.query(drop_table_sql)def create_table(self, table_name):"""创建指定的数据表:param table_name: 传入要创建的表名:return:"""create_table_sql = f"create table if not exists {table_name}(name varchar(20),age  int) default charset = utf8;"self.db.query(create_table_sql)# if __name__ == '__main__':
#     mytest = SqlQuery('root', '123456', '192.168.255.184', 3306, 'movie')
#     sql语句 = 'SELECT * FROM `user`;'
#     pprint(mytest.db_query_dict(sql语句))

示例代码:records操作sqllite

import records# 创建数据库连接
db = records.Database('sqlite:///example.db')# 执行查询语句
rows = db.query('SELECT name, age FROM users WHERE age > :age', age=18)# 遍历结果集
for row in rows:print(row.name, row.age)