Python内置函数
内置函数 (Built-in Functions)
Python内置函数是Python解释器自带的函数,无需导入任何模块即可使用。这些函数提供了基本的数据处理和操作功能。
print() - 输出函数
print(*objects, sep=' ', end='\n', file=sys.stdout, flush=False)
将对象打印到文本流或控制台。可以设置分隔符、结束符、输出位置等参数。
objects
- 要打印的对象,可以是多个
sep
- 对象之间的分隔符,默认为空格
end
- 打印结束后的字符,默认为换行符
示例
# 基本用法
print("Hello, World!")
# 打印多个值
name = "Alice"
age = 25
print("Name:", name, "Age:", age)
# 自定义分隔符和结束符
print("Python", "is", "awesome", sep="-", end="!\n")
len() - 长度函数
len(s)
返回对象的长度(元素个数)。适用于字符串、列表、元组、字典、集合等。
示例
# 字符串长度
text = "Python"
print(len(text)) # 输出: 6
# 列表长度
numbers = [1, 2, 3, 4, 5]
print(len(numbers)) # 输出: 5
# 字典长度
person = {"name": "Alice", "age": 25, "city": "Beijing"}
print(len(person)) # 输出: 3
type() - 类型函数
type(object)
返回对象的类型。通常用于调试和类型检查。
示例
# 检查各种类型
print(type(10)) # <class 'int'>
print(type(3.14)) # <class 'float'>
print(type("Hello")) # <class 'str'>
print(type([1, 2, 3])) # <class 'list'>
print(type((1, 2, 3))) # <class 'tuple'>
print(type({"a": 1})) # <class 'dict'>
print(type({1, 2, 3})) # <class 'set'>
range() - 范围函数
range(stop)
range(start, stop[, step])
生成一个不可变的数字序列,常用于循环中。
start
- 序列起始值(包含),默认为0
stop
- 序列结束值(不包含)
step
- 步长,默认为1
示例
# 生成0-4的序列
print(list(range(5))) # [0, 1, 2, 3, 4]
# 生成2-6的序列
print(list(range(2, 7))) # [2, 3, 4, 5, 6]
# 生成1-10的奇数序列
print(list(range(1, 11, 2))) # [1, 3, 5, 7, 9]
# 在循环中使用
for i in range(3):
print(f"循环第 {i+1} 次")
math数学库
math - 数学函数库
math模块提供了数学运算函数,包括三角函数、对数函数、幂函数、常数等。
math.pow() - 幂运算
math.pow(x, y)
返回x的y次幂。注意:返回值为浮点数。
示例
import math
# 计算2的3次方
result = math.pow(2, 3)
print(f"2的3次方: {result}") # 8.0
# 计算平方根
sqrt_result = math.pow(16, 0.5)
print(f"16的平方根: {sqrt_result}") # 4.0
# 使用**运算符(返回整数)
print(f"使用**运算符: {2 ** 3}") # 8
math.sqrt() - 平方根
math.sqrt(x)
返回x的平方根。x必须为非负数。
示例
import math
# 计算平方根
print(f"16的平方根: {math.sqrt(16)}") # 4.0
print(f"2的平方根: {math.sqrt(2)}") # 1.4142135623730951
# 计算直角三角形的斜边
a = 3
b = 4
c = math.sqrt(a**2 + b**2)
print(f"直角边为{a}和{b}的斜边: {c}") # 5.0
math.ceil()和math.floor() - 取整函数
math.ceil(x) # 向上取整
math.floor(x) # 向下取整
ceil()返回大于或等于x的最小整数,floor()返回小于或等于x的最大整数。
示例
import math
x = 3.7
print(f"原数: {x}")
print(f"向上取整: {math.ceil(x)}") # 4
print(f"向下取整: {math.floor(x)}") # 3
y = -2.3
print(f"\n原数: {y}")
print(f"向上取整: {math.ceil(y)}") # -2
print(f"向下取整: {math.floor(y)}") # -3
math.pi和math.e - 数学常数
math.pi # 圆周率π ≈ 3.141592653589793
math.e # 自然常数e ≈ 2.718281828459045
提供数学中常用的常数。
示例
import math
# 计算圆的面积和周长
radius = 5
area = math.pi * radius ** 2
circumference = 2 * math.pi * radius
print(f"半径为{radius}的圆:")
print(f"面积: {area:.2f}")
print(f"周长: {circumference:.2f}")
# 使用自然常数e
print(f"\n自然常数e的值: {math.e}")
print(f"e的平方: {math.e ** 2:.2f}")
os系统库
os - 操作系统接口
os模块提供了与操作系统交互的函数,包括文件和目录操作、环境变量、进程管理等。
os.listdir() - 列出目录内容
os.listdir(path='.')
返回指定路径下的文件和目录列表。
示例
import os
# 列出当前目录内容
print("当前目录内容:")
for item in os.listdir('.'):
print(f" {item}")
# 列出指定目录内容(需要目录存在)
try:
print("\n家目录内容:")
for item in os.listdir(os.path.expanduser('~'))[:5]: # 只显示前5个
print(f" {item}")
except FileNotFoundError:
print("目录不存在")
os.path.join() - 路径拼接
os.path.join(path1[, path2[, ...]])
智能地拼接一个或多个路径组件,使用正确的路径分隔符。
示例
import os
# 拼接路径
path1 = "home"
path2 = "user"
path3 = "documents"
path4 = "file.txt"
full_path = os.path.join(path1, path2, path3, path4)
print(f"拼接后的路径: {full_path}")
# 在Windows上输出: home\user\documents\file.txt
# 在Linux/Mac上输出: home/user/documents/file.txt
# 实际应用:创建文件路径
base_dir = "data"
filename = "config.json"
config_path = os.path.join(base_dir, filename)
print(f"配置文件路径: {config_path}")
os.path.exists() - 检查路径是否存在
os.path.exists(path)
检查指定路径是否存在(文件或目录)。
示例
import os
# 检查当前目录是否存在
print(f"当前目录存在: {os.path.exists('.')}")
# 检查文件是否存在
test_file = "test.txt"
if os.path.exists(test_file):
print(f"文件 '{test_file}' 存在")
else:
print(f"文件 '{test_file}' 不存在")
# 创建文件后检查
with open(test_file, 'w') as f:
f.write("测试文件")
print(f"创建后文件存在: {os.path.exists(test_file)}")
# 清理
os.remove(test_file)
os.environ - 环境变量
os.environ
包含环境变量的字典。可以读取和设置环境变量。
示例
import os
# 获取环境变量
print("当前用户:", os.environ.get('USER', os.environ.get('USERNAME', '未知')))
print("家目录:", os.environ.get('HOME', os.environ.get('USERPROFILE', '未知')))
# 获取PATH环境变量
path = os.environ.get('PATH', '')
path_dirs = path.split(os.pathsep)
print(f"\nPATH包含 {len(path_dirs)} 个目录")
print("前5个目录:")
for i, dir in enumerate(path_dirs[:5]):
print(f" {i+1}. {dir}")
# 设置环境变量(仅当前进程有效)
os.environ['MY_VAR'] = 'my_value'
print(f"\n自定义环境变量: MY_VAR = {os.environ.get('MY_VAR')}")
datetime时间库
datetime - 日期时间处理
datetime模块提供了日期和时间处理的类,包括date、time、datetime、timedelta等。
datetime.datetime.now() - 当前时间
datetime.datetime.now([tz])
返回当前本地日期和时间。可以指定时区。
示例
from datetime import datetime
# 获取当前时间
now = datetime.now()
print(f"当前时间: {now}")
print(f"年份: {now.year}")
print(f"月份: {now.month}")
print(f"日期: {now.day}")
print(f"小时: {now.hour}")
print(f"分钟: {now.minute}")
print(f"秒: {now.second}")
print(f"微秒: {now.microsecond}")
# 格式化输出
formatted = now.strftime("%Y-%m-%d %H:%M:%S")
print(f"格式化时间: {formatted}")
# 获取时间戳
timestamp = now.timestamp()
print(f"时间戳: {timestamp}")
datetime.timedelta - 时间间隔
datetime.timedelta([days[, seconds[, microseconds[, milliseconds[, minutes[, hours[, weeks]]]]]]])
表示两个日期或时间之间的差。可用于日期时间的加减运算。
示例
from datetime import datetime, timedelta
now = datetime.now()
print(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
# 一天后
one_day_later = now + timedelta(days=1)
print(f"一天后: {one_day_later.strftime('%Y-%m-%d %H:%M:%S')}")
# 一周前
one_week_ago = now - timedelta(weeks=1)
print(f"一周前: {one_week_ago.strftime('%Y-%m-%d %H:%M:%S')}")
# 2小时30分钟后
two_hours_later = now + timedelta(hours=2, minutes=30)
print(f"2小时30分钟后: {two_hours_later.strftime('%Y-%m-%d %H:%M:%S')}")
# 计算时间差
date1 = datetime(2023, 1, 1)
date2 = datetime(2023, 12, 31)
difference = date2 - date1
print(f"\n{date2.date()} 和 {date1.date()} 相差 {difference.days} 天")
datetime.date - 日期处理
datetime.date(year, month, day)
表示日期(不包含时间)。提供日期的各种操作。
示例
from datetime import date
# 创建日期
today = date.today()
print(f"今天: {today}")
print(f"年: {today.year}, 月: {today.month}, 日: {today.day}")
# 创建特定日期
christmas = date(2023, 12, 25)
print(f"圣诞节: {christmas}")
# 计算星期几 (0=周一, 6=周日)
weekday = today.weekday()
weekdays = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
print(f"今天是: {weekdays[weekday]}")
# 日期运算
new_year = date(2024, 1, 1)
days_to_new_year = (new_year - today).days
print(f"距离2024年元旦还有 {days_to_new_year} 天")
random随机库
random - 随机数生成
random模块提供了生成随机数的函数,包括整数、浮点数、序列选择等。
random.random() - 随机浮点数
random.random()
返回[0.0, 1.0)范围内的随机浮点数。
示例
import random
# 生成随机浮点数
print("5个随机浮点数:")
for i in range(5):
print(f" {random.random():.4f}")
# 生成指定范围的随机浮点数
def random_float_range(min_val, max_val):
return min_val + random.random() * (max_val - min_val)
print("\n10到20之间的随机浮点数:")
for i in range(3):
print(f" {random_float_range(10, 20):.2f}")
# 模拟概率
trials = 10000
count = sum(1 for _ in range(trials) if random.random() < 0.3)
probability = count / trials
print(f"\n模拟概率 0.3: {probability:.4f} (试验次数: {trials})")
random.randint() - 随机整数
random.randint(a, b)
返回[a, b]范围内的随机整数,包括两端。
示例
import random
# 生成1到6的随机整数(模拟骰子)
print("掷骰子10次:")
for i in range(10):
dice = random.randint(1, 6)
print(f" 第{i+1}次: {dice}")
# 生成随机年龄
min_age = 18
max_age = 60
print(f"\n随机生成年龄 ({min_age}-{max_age}岁):")
for i in range(5):
age = random.randint(min_age, max_age)
print(f" 第{i+1}个: {age}岁")
# 抽奖程序
participants = ["Alice", "Bob", "Charlie", "David", "Eva"]
winner_index = random.randint(0, len(participants) - 1)
print(f"\n抽奖结果: 恭喜 {participants[winner_index]} 中奖!")
random.choice() - 随机选择
random.choice(seq)
从非空序列中随机返回一个元素。
示例
import random
# 随机选择颜色
colors = ["红色", "蓝色", "绿色", "黄色", "紫色"]
print("随机颜色选择:")
for i in range(5):
color = random.choice(colors)
print(f" 第{i+1}次: {color}")
# 随机选择幸运数字
lucky_numbers = [7, 8, 9, 13, 21, 42]
print(f"\n幸运数字: {random.choice(lucky_numbers)}")
# 随机选择姓名
names = ["张三", "李四", "王五", "赵六", "钱七"]
selected = random.choice(names)
print(f"随机选择的姓名: {selected}")
# 随机选择多个不重复元素
print("\n随机选择3个不重复的幸运儿:")
winners = random.sample(names, 3)
for i, winner in enumerate(winners):
print(f" 第{i+1}名: {winner}")
random.shuffle() - 随机打乱
random.shuffle(x[, random])
将序列x随机打乱(原地操作)。
示例
import random
# 打乱列表
cards = ["A", "2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K"]
print("原始扑克牌顺序:")
print(" ", cards)
random.shuffle(cards)
print("\n洗牌后的顺序:")
print(" ", cards)
# 打乱数字列表
numbers = list(range(1, 11))
print(f"\n原始数字: {numbers}")
random.shuffle(numbers)
print(f"打乱后: {numbers}")
# 模拟抽奖顺序
participants = ["选手A", "选手B", "选手C", "选手D", "选手E"]
print("\n抽奖顺序:")
for i, participant in enumerate(participants):
print(f" 第{i+1}位: {participant}")
JSON数据处理
json - JSON编码解码
json模块提供了JSON数据的编码和解码功能,用于Python对象和JSON字符串之间的转换。
json.dumps() - JSON编码
json.dumps(obj, *, skipkeys=False, ensure_ascii=True,
check_circular=True, allow_nan=True, cls=None,
indent=None, separators=None, default=None,
sort_keys=False, **kw)
将Python对象编码成JSON格式字符串。支持多种参数控制输出格式。
obj
- 要编码的Python对象
indent
- 缩进空格数,美化输出
ensure_ascii
- 是否确保ASCII编码,False可输出中文
示例
import json
# Python字典
data = {
"name": "张三",
"age": 25,
"city": "北京",
"skills": ["Python", "JavaScript", "SQL"],
"married": False
}
# 转换为JSON字符串
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print("JSON字符串:")
print(json_str)
# 紧凑格式
compact_json = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
print("\n紧凑格式:")
print(compact_json)
json.loads() - JSON解码
json.loads(s, *, cls=None, object_hook=None,
parse_float=None, parse_int=None,
parse_constant=None, object_pairs_hook=None, **kw)
将JSON格式字符串解码为Python对象。
示例
import json
# JSON字符串
json_str = '{"name": "李四", "age": 30, "city": "上海"}'
# 解析为Python对象
data = json.loads(json_str)
print("解析后的Python对象:")
print(f"类型: {type(data)}")
print(f"姓名: {data['name']}")
print(f"年龄: {data['age']}")
print(f"城市: {data['city']}")
# 从文件读取JSON
json_from_file = '''
{
"students": [
{"name": "张三", "score": 85},
{"name": "李四", "score": 92},
{"name": "王五", "score": 78}
]
}
'''
students_data = json.loads(json_from_file)
print("\n学生数据:")
for student in students_data['students']:
print(f" {student['name']}: {student['score']}分")
json.dump() - 写入JSON文件
json.dump(obj, fp, *, skipkeys=False, ensure_ascii=True,
check_circular=True, allow_nan=True, cls=None,
indent=None, separators=None, default=None,
sort_keys=False, **kw)
将Python对象序列化为JSON格式并写入文件。
示例
import json
import os
# 要保存的数据
config = {
"app_name": "数据管理系统",
"version": "1.0.0",
"database": {
"host": "localhost",
"port": 3306,
"username": "admin"
},
"features": ["用户管理", "数据导入", "报表生成"]
}
# 写入JSON文件
filename = "config.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
print(f"配置文件已保存到 {filename}")
# 验证文件内容
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
print(f"\n文件内容预览:\n{content[:200]}...")
# 清理测试文件
os.remove(filename)
print(f"\n测试文件 {filename} 已清理")
json.load() - 读取JSON文件
json.load(fp, *, cls=None, object_hook=None,
parse_float=None, parse_int=None,
parse_constant=None, object_pairs_hook=None, **kw)
从文件读取JSON数据并解析为Python对象。
示例
import json
import os
# 创建测试JSON文件
test_data = {
"product": "笔记本电脑",
"price": 6999.99,
"in_stock": True,
"specs": {
"cpu": "Intel i7",
"ram": "16GB",
"storage": "512GB SSD"
}
}
filename = "product.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(test_data, f, ensure_ascii=False, indent=2)
# 从文件读取JSON
with open(filename, 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print("从文件加载的数据:")
print(f"产品: {loaded_data['product']}")
print(f"价格: ¥{loaded_data['price']}")
print(f"库存: {'有货' if loaded_data['in_stock'] else '缺货'}")
print("规格:")
for key, value in loaded_data['specs'].items():
print(f" {key}: {value}")
# 清理
os.remove(filename)
print(f"\n测试文件 {filename} 已清理")
re正则表达式库
re - 正则表达式操作
re模块提供了正则表达式匹配操作,用于字符串的模式匹配、查找和替换。
re.search() - 搜索匹配
re.search(pattern, string, flags=0)
扫描整个字符串,返回第一个匹配的Match对象。如果没有匹配则返回None。
pattern
- 正则表达式模式
string
- 要搜索的字符串
示例
import re
# 搜索邮箱地址
text = "联系我: email@example.com 或 support@company.cn"
pattern = r'\b[\w\.-]+@[\w\.-]+\.\w+\b'
match = re.search(pattern, text)
if match:
print(f"找到邮箱: {match.group()}")
print(f"匹配位置: {match.start()} - {match.end()}")
else:
print("未找到邮箱")
# 搜索电话号码
phone_text = "电话: 138-1234-5678 或 010-8765-4321"
phone_pattern = r'\d{3,4}-\d{4}-\d{4}'
phone_match = re.search(phone_pattern, phone_text)
if phone_match:
print(f"\n找到电话: {phone_match.group()}")
# 使用分组
date_text = "今天是2023-12-15,明天是2023-12-16"
date_pattern = r'(\d{4})-(\d{2})-(\d{2})'
date_match = re.search(date_pattern, date_text)
if date_match:
print(f"\n找到日期: {date_match.group()}")
print(f"年: {date_match.group(1)}")
print(f"月: {date_match.group(2)}")
print(f"日: {date_match.group(3)}")
re.findall() - 查找所有匹配
re.findall(pattern, string, flags=0)
返回字符串中所有非重叠匹配的列表。如果模式中有分组,返回分组元组的列表。
示例
import re
# 查找所有数字
text = "Python 3.10 发布于 2021年10月4日,Python 3.11 发布于 2022年10月24日"
numbers = re.findall(r'\d+\.?\d*', text)
print(f"所有数字: {numbers}")
# 查找所有单词
words = re.findall(r'\b\w+\b', text)
print(f"\n所有单词: {words}")
# 查找所有版本号
versions = re.findall(r'Python (\d+\.\d+)', text)
print(f"Python版本: {versions}")
# 查找日期
dates = re.findall(r'(\d{4})年(\d{1,2})月(\d{1,2})日', text)
print(f"\n所有日期:")
for year, month, day in dates:
print(f" {year}-{month.zfill(2)}-{day.zfill(2)}")
# 查找金额
money_text = "价格: $19.99, ¥100.50, €25.00, 免费"
currencies = re.findall(r'[\$\¥€]\d+\.?\d*', money_text)
print(f"\n所有金额: {currencies}")
re.sub() - 替换匹配
re.sub(pattern, repl, string, count=0, flags=0)
使用repl替换string中所有匹配pattern的子串,返回替换后的字符串。
示例
import re
# 替换敏感信息
text = "用户电话: 138-1234-5678,身份证: 110101199001011234"
# 隐藏电话号码中间4位
hidden_phone = re.sub(r'(\d{3})-(\d{4})-(\d{4})', r'\1-****-\3', text)
# 隐藏身份证后4位
hidden_id = re.sub(r'(\d{14})(\d{4})', r'\1****', hidden_phone)
print("脱敏后文本:")
print(hidden_id)
# 格式化日期
date_text = "日期: 20231215,另一个日期: 20230101"
formatted = re.sub(r'(\d{4})(\d{2})(\d{2})', r'\1-\2-\3', date_text)
print(f"\n格式化日期: {formatted}")
# 移除HTML标签
html = ""
clean_text = re.sub(r'<[^>]+>', '', html)
print(f"\n去除HTML标签: {clean_text}")
# 使用函数进行替换
def double_numbers(match):
num = int(match.group())
return str(num * 2)
text_with_numbers = "数字: 1, 2, 3, 4, 5"
doubled = re.sub(r'\d+', double_numbers, text_with_numbers)
print(f"\n数字加倍: {doubled}")
标题
段落内容
re.split() - 分割字符串
re.split(pattern, string, maxsplit=0, flags=0)
使用正则表达式模式分割字符串,返回分割后的列表。
示例
import re
# 按多种分隔符分割
text = "apple,banana;orange grape|melon"
result = re.split(r'[,;| ]', text)
print(f"分割结果: {result}")
# 按数字分割
text2 = "Python3.10发布于2021年,Python3.11发布于2022年"
result2 = re.split(r'\d+\.?\d*', text2)
print(f"\n按数字分割: {result2}")
# 保留分隔符
text3 = "100+200-300*400/500"
result3 = re.split(r'([+\-*/])', text3)
print(f"\n包含分隔符的分割: {result3}")
# 复杂的日志分割
log_line = "2023-12-15 14:30:25 INFO [MainThread] User login successful"
log_parts = re.split(r'\s+', log_line)
print(f"\n日志分割:")
for i, part in enumerate(log_parts):
print(f" {i}: {part}")
collections容器库
collections - 容器数据类型
collections模块提供了额外的容器数据类型,扩展了Python的内置容器。
Counter - 计数器
collections.Counter([iterable-or-mapping])
Counter是一个字典子类,用于计数可哈希对象。它是一个无序集合,元素存储为字典键,计数存储为字典值。
示例
from collections import Counter
# 统计列表元素
words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']
word_count = Counter(words)
print(f"单词统计: {word_count}")
print(f"apple出现次数: {word_count['apple']}")
print(f"最常见的2个: {word_count.most_common(2)}")
# 统计字符串字符
text = "abracadabra"
char_count = Counter(text)
print(f"\n字符统计: {char_count}")
print(f"a出现次数: {char_count['a']}")
# Counter运算
c1 = Counter(a=3, b=2, c=1)
c2 = Counter(a=1, b=2, c=3)
print(f"\nCounter1: {c1}")
print(f"Counter2: {c2}")
print(f"相加: {c1 + c2}")
print(f"相减: {c1 - c2}")
print(f"交集: {c1 & c2}")
print(f"并集: {c1 | c2}")
# 统计句子词频
sentence = "this is a test this is only a test"
words = sentence.split()
word_counter = Counter(words)
print(f"\n句子词频: {word_counter}")
defaultdict - 默认字典
collections.defaultdict(default_factory)
返回一个新的类字典对象。defaultdict是内置dict类的子类,它重写了__missing__方法,为不存在的键提供默认值。
示例
from collections import defaultdict
# 默认值为列表
list_dict = defaultdict(list)
list_dict['fruits'].append('apple')
list_dict['fruits'].append('banana')
list_dict['vegetables'].append('carrot')
print(f"列表字典: {dict(list_dict)}")
# 默认值为整数
int_dict = defaultdict(int)
words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']
for word in words:
int_dict[word] += 1
print(f"\n词频统计: {dict(int_dict)}")
# 默认值为集合
set_dict = defaultdict(set)
pairs = [('a', 1), ('a', 2), ('b', 2), ('b', 3), ('c', 1)]
for key, value in pairs:
set_dict[key].add(value)
print(f"\n集合字典: {dict(set_dict)}")
# 嵌套defaultdict
nested_dict = lambda: defaultdict(int)
dd = defaultdict(nested_dict)
dd['group1']['item1'] = 10
dd['group1']['item2'] = 20
dd['group2']['item1'] = 30
print(f"\n嵌套字典: {dict(dd['group1'])}")
print(f"访问不存在的键: {dd['group3']['new_item']}") # 返回0
deque - 双端队列
collections.deque([iterable[, maxlen]])
返回一个双向队列对象,支持从两端高效地添加和弹出元素,时间复杂度为O(1)。
示例
from collections import deque
# 创建deque
d = deque(['b', 'c', 'd'])
print(f"初始deque: {d}")
# 添加元素
d.append('e') # 右端添加
d.appendleft('a') # 左端添加
print(f"添加后: {d}")
# 弹出元素
right_item = d.pop() # 右端弹出
left_item = d.popleft() # 左端弹出
print(f"右端弹出: {right_item}, 左端弹出: {left_item}")
print(f"弹出后: {d}")
# 旋转
d.rotate(1) # 向右旋转1位
print(f"向右旋转: {d}")
d.rotate(-2) # 向左旋转2位
print(f"向左旋转2位: {d}")
# 有限长度deque
limited_d = deque(maxlen=3)
for i in range(5):
limited_d.append(i)
print(f"添加{i}后: {limited_d}")
OrderedDict - 有序字典
collections.OrderedDict([items])
返回一个字典子类,记住元素插入的顺序。在Python 3.7+中,普通dict也保持插入顺序,但OrderedDict有额外的方法。
示例
from collections import OrderedDict
# 创建有序字典
od = OrderedDict()
od['z'] = 1
od['y'] = 2
od['x'] = 3
print(f"有序字典: {od}")
print(f"键顺序: {list(od.keys())}")
# 移动元素到最后
od.move_to_end('z')
print(f"\n移动z到最后: {list(od.keys())}")
# 移动元素到最前
od.move_to_end('x', last=False)
print(f"移动x到最前: {list(od.keys())}")
# 保留最后N个元素
def keep_last_n(d, n):
"""保留最后n个元素"""
while len(d) > n:
d.popitem(last=False)
od2 = OrderedDict([(i, f"item{i}") for i in range(10)])
print(f"\n原始字典: {list(od2.keys())}")
keep_last_n(od2, 5)
print(f"保留最后5个: {list(od2.keys())}")
# 合并有序字典
od3 = OrderedDict([('a', 1), ('b', 2)])
od4 = OrderedDict([('c', 3), ('d', 4)])
od3.update(od4)
print(f"\n合并后: {list(od3.keys())}")
string字符串库
string - 字符串操作
string模块提供了字符串常量和模板类,用于常见的字符串操作。
string常量
string模块定义了多个有用的字符串常量。
示例
import string
print("ASCII小写字母:")
print(f" {string.ascii_lowercase}")
print("\nASCII大写字母:")
print(f" {string.ascii_uppercase}")
print("\nASCII字母:")
print(f" {string.ascii_letters}")
print("\n数字:")
print(f" {string.digits}")
print("\n十六进制数字:")
print(f" {string.hexdigits}")
print("\n八进制数字:")
print(f" {string.octdigits}")
print("\n标点符号:")
print(f" {string.punctuation}")
print("\n空白字符:")
print(f" {repr(string.whitespace)}")
print("\n可打印字符:")
print(f" {string.printable[:50]}...")
# 使用常量生成随机字符串
import random
random_str = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
print(f"\n随机字符串: {random_str}")
# 检查字符串类型
test_str = "Hello123"
print(f"\n'{test_str}' 是否全为字母: {test_str.isalpha()}")
print(f"'{test_str}' 是否全为数字: {test_str.isdigit()}")
print(f"'{test_str}' 是否全为字母数字: {test_str.isalnum()}")
string.Template - 字符串模板
string.Template(template)
提供简单的字符串替换功能,使用$作为占位符,比%格式化和str.format()更安全。
示例
import string
# 创建模板
template = string.Template('$name 的年龄是 $age 岁,住在 $city。')
# 替换值
data = {'name': '张三', 'age': 25, 'city': '北京'}
result = template.substitute(data)
print(f"模板替换: {result}")
# 安全替换(缺少值时不会报错)
safe_data = {'name': '李四', 'age': 30}
safe_result = template.safe_substitute(safe_data)
print(f"安全替换: {safe_result}")
# 复杂模板
invoice_template = string.Template('''
========== 发票 ==========
客户: $customer
订单号: $order_id
日期: $date
商品 单价 数量 小计
$item1 $price1 $qty1 $${subtotal1:.2f}
$item2 $price2 $qty2 $${subtotal2:.2f}
----------------------------------
总计: $${total:.2f}
==========================
''')
invoice_data = {
'customer': 'ABC公司',
'order_id': 'ORD-2023-001',
'date': '2023-12-15',
'item1': '笔记本电脑',
'price1': 6999,
'qty1': 2,
'subtotal1': 6999 * 2,
'item2': '鼠标',
'price2': 99,
'qty2': 5,
'subtotal2': 99 * 5,
'total': 6999 * 2 + 99 * 5
}
invoice = invoice_template.safe_substitute(invoice_data)
print(f"\n发票模板:\n{invoice}")
string函数
string模块还提供了一些有用的字符串处理函数。
示例
import string
# capwords: 将字符串中每个单词的首字母大写
text = "hello world! this is python programming."
capitalized = string.capwords(text)
print(f"首字母大写: {capitalized}")
# 自定义翻译表
# maketrans: 创建字符映射转换表
# translate: 根据映射表转换字符串
# 创建转换表
trans_table = str.maketrans('aeiou', '12345')
text2 = "hello world"
translated = text2.translate(trans_table)
print(f"\n字符替换: {text2} -> {translated}")
# 删除特定字符
remove_table = str.maketrans('', '', 'aeiou')
removed = text2.translate(remove_table)
print(f"删除元音: {text2} -> {removed}")
# 凯撒密码
def caesar_cipher(text, shift):
"""简单的凯撒密码"""
alphabet = string.ascii_lowercase
shifted_alphabet = alphabet[shift:] + alphabet[:shift]
table = str.maketrans(alphabet + alphabet.upper(),
shifted_alphabet + shifted_alphabet.upper())
return text.translate(table)
original = "Hello, World!"
encrypted = caesar_cipher(original, 3)
decrypted = caesar_cipher(encrypted, -3)
print(f"\n凯撒密码 (偏移3):")
print(f" 原文: {original}")
print(f" 加密: {encrypted}")
print(f" 解密: {decrypted}")
numpy数值计算库
numpy - 数值计算
NumPy是Python科学计算的基础库,提供高性能的多维数组对象和数组操作工具。
numpy.array() - 创建数组
numpy.array(object, dtype=None, *, copy=True, order='K',
subok=False, ndmin=0, like=None)
创建NumPy数组,这是NumPy中最基本的数据结构。
示例
import numpy as np
# 从列表创建数组
arr1 = np.array([1, 2, 3, 4, 5])
print(f"一维数组: {arr1}")
print(f"形状: {arr1.shape}")
print(f"数据类型: {arr1.dtype}")
# 二维数组
arr2 = np.array([[1, 2, 3], [4, 5, 6]])
print(f"\n二维数组:\n{arr2}")
print(f"形状: {arr2.shape}")
print(f"维度: {arr2.ndim}")
# 指定数据类型
arr3 = np.array([1.5, 2.7, 3.1], dtype=np.int32)
print(f"\n指定类型为int32: {arr3}")
# 特殊数组
zeros = np.zeros((3, 4))
print(f"\n零数组 (3x4):\n{zeros}")
ones = np.ones((2, 3))
print(f"\n1数组 (2x3):\n{ones}")
identity = np.eye(3)
print(f"\n单位矩阵 (3x3):\n{identity}")
range_arr = np.arange(0, 10, 2)
print(f"\n范围数组 (0-10,步长2): {range_arr}")
linspace_arr = np.linspace(0, 1, 5)
print(f"\n线性间隔数组 (0-1,5个点): {linspace_arr}")
数组操作
NumPy数组支持各种数学运算和操作。
示例
import numpy as np
# 创建数组
a = np.array([1, 2, 3, 4])
b = np.array([5, 6, 7, 8])
# 算术运算
print(f"数组a: {a}")
print(f"数组b: {b}")
print(f"\na + b = {a + b}")
print(f"a - b = {a - b}")
print(f"a * b = {a * b}")
print(f"a / b = {a / b}")
print(f"a ** 2 = {a ** 2}")
# 矩阵乘法
matrix1 = np.array([[1, 2], [3, 4]])
matrix2 = np.array([[5, 6], [7, 8]])
print(f"\n矩阵1:\n{matrix1}")
print(f"矩阵2:\n{matrix2}")
print(f"矩阵乘法:\n{np.dot(matrix1, matrix2)}")
# 统计函数
arr = np.array([[1, 2, 3], [4, 5, 6]])
print(f"\n数组:\n{arr}")
print(f"平均值: {np.mean(arr)}")
print(f"全局平均值: {arr.mean()}")
print(f"每列平均值: {arr.mean(axis=0)}")
print(f"每行平均值: {arr.mean(axis=1)}")
print(f"总和: {np.sum(arr)}")
print(f"最大值: {np.max(arr)}")
print(f"最小值: {np.min(arr)}")
print(f"标准差: {np.std(arr):.2f}")
# 数组变形
arr_flat = arr.flatten()
print(f"\n扁平化: {arr_flat}")
arr_reshaped = arr.reshape(3, 2)
print(f"重塑为3x2:\n{arr_reshaped}")
# 数组索引和切片
print(f"\n原始数组:\n{arr}")
print(f"arr[0, 1] = {arr[0, 1]}")
print(f"第一行: {arr[0, :]}")
print(f"第二列: {arr[:, 1]}")
pandas数据分析库
pandas - 数据分析
pandas是Python的数据分析库,提供DataFrame和Series数据结构,用于数据处理和分析。
pandas.DataFrame - 数据框
pandas.DataFrame(data=None, index=None, columns=None,
dtype=None, copy=None)
DataFrame是pandas的主要数据结构,是一个二维标签数组,类似于Excel表格或SQL表。
示例
import pandas as pd
# 从字典创建DataFrame
data = {
'姓名': ['张三', '李四', '王五', '赵六'],
'年龄': [25, 30, 35, 28],
'城市': ['北京', '上海', '广州', '深圳'],
'工资': [5000, 7000, 8000, 6500]
}
df = pd.DataFrame(data)
print("DataFrame:")
print(df)
print(f"\n形状: {df.shape}")
print(f"列名: {df.columns.tolist()}")
print(f"索引: {df.index.tolist()}")
# 查看数据
print(f"\n前2行:\n{df.head(2)}")
print(f"\n后2行:\n{df.tail(2)}")
print(f"\n基本信息:\n{df.info()}")
print(f"\n描述性统计:\n{df.describe()}")
# 选择数据
print(f"\n选择单列 - 姓名:\n{df['姓名']}")
print(f"\n选择多列:\n{df[['姓名', '工资']]}")
print(f"\n选择行 (iloc):\n{df.iloc[1:3]}")
print(f"\n条件筛选 (工资>6000):\n{df[df['工资'] > 6000]}")
# 添加新列
df['奖金'] = df['工资'] * 0.1
df['总收入'] = df['工资'] + df['奖金']
print(f"\n添加新列后:\n{df}")
# 分组聚合
grouped = df.groupby('城市')['工资'].agg(['mean', 'sum', 'count'])
print(f"\n按城市分组统计:\n{grouped}")
数据读取与保存
pandas支持多种格式的数据读写,包括CSV、Excel、JSON、SQL等。
示例
import pandas as pd
import numpy as np
import os
# 创建示例数据
data = {
'日期': pd.date_range('2023-01-01', periods=5),
'产品': ['A', 'B', 'A', 'C', 'B'],
'销量': [100, 150, 120, 80, 200],
'单价': [10.5, 20.0, 10.5, 15.0, 20.0]
}
df = pd.DataFrame(data)
df['销售额'] = df['销量'] * df['单价']
print("原始数据:")
print(df)
# 保存到CSV
csv_file = 'sales_data.csv'
df.to_csv(csv_file, index=False, encoding='utf-8')
print(f"\n数据已保存到 {csv_file}")
# 从CSV读取
df_from_csv = pd.read_csv(csv_file)
print(f"\n从CSV读取的数据:\n{df_from_csv.head()}")
# 保存到Excel
excel_file = 'sales_data.xlsx'
df.to_excel(excel_file, index=False, sheet_name='销售数据')
print(f"\n数据已保存到Excel文件 {excel_file}")
# 从Excel读取
df_from_excel = pd.read_excel(excel_file, sheet_name='销售数据')
print(f"\n从Excel读取的数据:\n{df_from_excel.head()}")
# 保存到JSON
json_file = 'sales_data.json'
df.to_json(json_file, orient='records', force_ascii=False)
print(f"\n数据已保存到JSON文件 {json_file}")
# 清理测试文件
for file in [csv_file, excel_file, json_file]:
if os.path.exists(file):
os.remove(file)
print(f"已删除测试文件: {file}")
requests网络请求库
requests - HTTP请求
requests是一个简单易用的HTTP库,用于发送HTTP请求和处理响应。
requests.get() - GET请求
requests.get(url, params=None, **kwargs)
发送HTTP GET请求,获取指定URL的内容。
示例
import requests
# 基本GET请求
url = "https://httpbin.org/get"
response = requests.get(url)
print(f"状态码: {response.status_code}")
print(f"响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
print(f"\n响应内容 (前500字符):")
print(response.text[:500])
# 带参数的GET请求
params = {
'page': 1,
'limit': 10,
'search': 'python'
}
response_with_params = requests.get("https://httpbin.org/get", params=params)
print(f"\n带参数的请求URL: {response_with_params.url}")
# 解析JSON响应
json_response = response_with_params.json()
print(f"\nJSON响应中的参数:")
print(f" page: {json_response['args'].get('page')}")
print(f" limit: {json_response['args'].get('limit')}")
# 设置请求头
headers = {
'User-Agent': 'MyPythonApp/1.0',
'Accept': 'application/json'
}
response_with_headers = requests.get("https://httpbin.org/headers", headers=headers)
print(f"\n自定义请求头:")
print(response_with_headers.json()['headers'])
requests.post() - POST请求
requests.post(url, data=None, json=None, **kwargs)
发送HTTP POST请求,向服务器提交数据。
示例
import requests
# 发送表单数据
url = "https://httpbin.org/post"
form_data = {
'username': 'testuser',
'password': 'testpass123',
'email': 'test@example.com'
}
response = requests.post(url, data=form_data)
print("发送表单数据:")
print(f"状态码: {response.status_code}")
print(f"响应JSON:")
print(response.json()['form'])
# 发送JSON数据
json_data = {
'name': '张三',
'age': 25,
'skills': ['Python', 'JavaScript', 'SQL']
}
json_response = requests.post(url, json=json_data)
print(f"\n发送JSON数据:")
print(json_response.json()['json'])
# 上传文件
files = {'file': ('test.txt', '这是文件内容', 'text/plain')}
file_response = requests.post(url, files=files)
print(f"\n上传文件:")
print(file_response.json()['files'])
# 带认证的请求
auth_response = requests.post(
"https://httpbin.org/basic-auth/user/passwd",
auth=('user', 'passwd')
)
print(f"\n基本认证请求:")
print(f"状态码: {auth_response.status_code}")
print(f"响应: {auth_response.json()}")
BeautifulSoup解析库
BeautifulSoup - HTML/XML解析
BeautifulSoup是一个用于解析HTML和XML文档的Python库,可以方便地提取网页数据。它提供简单易用的API来导航、搜索和修改解析树。
BeautifulSoup() - 创建解析对象
BeautifulSoup(markup, features)
创建一个BeautifulSoup对象,用于解析HTML或XML文档。
markup
- 要解析的HTML/XML字符串或文件对象
features
- 解析器名称,如'html.parser', 'lxml', 'html5lib'
示例
from bs4 import BeautifulSoup
# 解析HTML字符串
html_doc = """
测试页面
欢迎来到Python世界
""" # 创建BeautifulSoup对象 soup = BeautifulSoup(html_doc, 'html.parser') print("文档标题:", soup.title.string) print("整个HTML:") print(soup.prettify()) # 使用不同解析器 # soup_lxml = BeautifulSoup(html_doc, 'lxml') # 需要安装lxml # soup_html5lib = BeautifulSoup(html_doc, 'html5lib') # 需要安装html5lib 查找元素方法
BeautifulSoup提供多种方法查找文档中的元素。
示例
from bs4 import BeautifulSoup
html = """
"""
soup = BeautifulSoup(html, 'html.parser')
# find() - 查找第一个匹配的元素
first_h2 = soup.find('h2')
print(f"第一个h2标签: {first_h2.text}")
# find_all() - 查找所有匹配的元素
all_h2 = soup.find_all('h2')
print("\n所有h2标签:")
for i, h2 in enumerate(all_h2, 1):
print(f" {i}. {h2.text}")
# 通过属性查找
main_title = soup.find(id='main-title')
print(f"\n通过id查找: {main_title.text}")
articles = soup.find_all(class_='article')
print(f"\n找到 {len(articles)} 篇文章")
# 通过CSS选择器查找
authors = soup.select('.author')
print("\n所有作者:")
for author in authors:
print(f" {author.text}")
# 查找特定属性的标签
links = soup.find_all('a', href=True)
print("\n所有链接:")
for link in links:
print(f" 文本: {link.text}, 地址: {link['href']}")
导航文档树
通过父子、兄弟关系在文档树中导航。
示例
from bs4 import BeautifulSoup
html = """
"""
soup = BeautifulSoup(html, 'html.parser')
div_main = soup.find('div', class_='main')
# 获取父元素
parent = div_main.parent
print(f"div.main的父元素: {parent.name}")
# 获取子元素
children = list(div_main.children)
print(f"\ndiv.main有 {len(children)} 个子元素:")
for i, child in enumerate(children):
if child.name:
print(f" {i}. {child.name}")
else:
print(f" {i}. 文本节点")
# 获取所有后代
descendants = list(div_main.descendants)
print(f"\ndiv.main有 {len(descendants)} 个后代元素")
# 获取兄弟元素
first_p = div_main.find('p')
print(f"\n第一个p标签: {first_p.text}")
next_sibling = first_p.find_next_sibling()
print(f"下一个兄弟: {next_sibling.name if next_sibling.name else '文本'}")
# 查找前一个和后一个元素
nested_div = soup.find('div', class_='nested')
previous = nested_div.find_previous_sibling()
next = nested_div.find_next_sibling()
print(f"\n嵌套div的前一个兄弟: {previous.name}")
print(f"嵌套div的后一个兄弟: {next.name}")
提取和修改数据
从元素中提取内容和属性,以及修改文档。
示例
from bs4 import BeautifulSoup
html = """
"""
soup = BeautifulSoup(html, 'html.parser')
product = soup.find('div', class_='product')
# 提取文本
print("产品信息:")
print(f"名称: {product.find('h2').get_text(strip=True)}")
print(f"价格: {product.find(class_='price').get_text(strip=True)}")
# 提取属性
buy_link = product.find('a', class_='buy-btn')
print(f"购买链接: {buy_link['href']}")
print(f"链接文本: {buy_link.text}")
print(f"链接类名: {buy_link.get('class', [])}")
# 提取所有规格
specs = product.find('div', class_='specs')
print("\n规格:")
for spec in specs.find_all('span'):
print(f" {spec.text}")
# 修改内容
product.find('h2').string = "游戏笔记本电脑"
product.find(class_='price').string = "¥7999"
print(f"\n修改后的名称: {product.find('h2').text}")
print(f"修改后的价格: {product.find(class_='price').text}")
# 添加新元素
new_tag = soup.new_tag('p')
new_tag.string = "库存: 10台"
new_tag['class'] = 'stock'
product.append(new_tag)
print(f"\n添加库存信息: {product.find(class_='stock').text}")
# 删除元素
if product.find(class_='description'):
product.find(class_='description').decompose()
print("已删除描述信息")
# 获取修改后的HTML
print(f"\n修改后的HTML:\n{product.prettify()}")
matplotlib可视化库
matplotlib - 数据可视化
matplotlib是Python最流行的绘图库,可以创建各种静态、动态、交互式的图表,包括线图、柱状图、散点图、饼图等。
基本绘图 - plot()
plt.plot(x, y, format_string, **kwargs)
绘制线图,是最常用的绘图函数之一。可以绘制单条或多条线。
示例
import matplotlib.pyplot as plt
import numpy as np
# 设置中文字体(如果需要显示中文)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 创建数据
x = np.linspace(0, 10, 100)
y1 = np.sin(x)
y2 = np.cos(x)
# 创建图形和坐标轴
fig, ax = plt.subplots(figsize=(10, 6))
# 绘制线图
ax.plot(x, y1, 'r-', label='正弦曲线', linewidth=2) # 红色实线
ax.plot(x, y2, 'b--', label='余弦曲线', linewidth=2) # 蓝色虚线
# 设置图表属性
ax.set_title('三角函数曲线', fontsize=16, fontweight='bold')
ax.set_xlabel('X轴', fontsize=12)
ax.set_ylabel('Y轴', fontsize=12)
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
# 设置坐标轴范围
ax.set_xlim(0, 10)
ax.set_ylim(-1.5, 1.5)
# 添加文本标注
ax.text(5, 0.8, '正弦波峰值', fontsize=10,
bbox=dict(boxstyle="round,pad=0.3", facecolor="yellow", alpha=0.5))
# 显示图表
plt.tight_layout()
print("图表已创建,使用plt.show()显示")
# plt.show() # 在实际环境中取消注释
# 保存图表
# plt.savefig('sine_cosine.png', dpi=300, bbox_inches='tight')
# print("图表已保存为 sine_cosine.png")
柱状图 - bar()
plt.bar(x, height, width=0.8, bottom=None, *, align='center', **kwargs)
绘制垂直柱状图,用于比较不同类别的数据。
示例
import matplotlib.pyplot as plt
import numpy as np
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建数据
categories = ['苹果', '香蕉', '橙子', '葡萄', '西瓜']
sales_2022 = [120, 85, 110, 65, 95]
sales_2023 = [135, 90, 125, 70, 110]
x = np.arange(len(categories)) # 类别位置
width = 0.35 # 柱状图宽度
# 创建图形
fig, ax = plt.subplots(figsize=(10, 6))
# 绘制柱状图
bars1 = ax.bar(x - width/2, sales_2022, width, label='2022年',
color='skyblue', edgecolor='black')
bars2 = ax.bar(x + width/2, sales_2023, width, label='2023年',
color='lightcoral', edgecolor='black')
# 设置图表属性
ax.set_title('水果销售额对比', fontsize=16, fontweight='bold')
ax.set_xlabel('水果种类', fontsize=12)
ax.set_ylabel('销售额 (万元)', fontsize=12)
ax.set_xticks(x)
ax.set_xticklabels(categories)
ax.legend()
# 添加数值标签
def autolabel(bars):
"""在柱状图上显示数值"""
for bar in bars:
height = bar.get_height()
ax.annotate(f'{height}',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3), # 偏移量
textcoords="offset points",
ha='center', va='bottom', fontsize=10)
autolabel(bars1)
autolabel(bars2)
# 添加网格
ax.grid(True, axis='y', alpha=0.3)
# 调整布局
plt.tight_layout()
print("柱状图已创建")
# 水平柱状图示例
fig2, ax2 = plt.subplots(figsize=(8, 6))
bars_h = ax2.barh(categories, sales_2023, color='lightgreen', edgecolor='black')
ax2.set_title('2023年水果销售额', fontsize=14)
ax2.set_xlabel('销售额 (万元)')
for bar in bars_h:
width = bar.get_width()
ax2.annotate(f'{width}',
xy=(width, bar.get_y() + bar.get_height()/2),
xytext=(5, 0),
textcoords="offset points",
ha='left', va='center')
print("水平柱状图已创建")
# plt.show()
散点图 - scatter()
plt.scatter(x, y, s=None, c=None, marker=None, **kwargs)
绘制散点图,用于显示两个变量之间的关系,或数据点的分布。
示例
import matplotlib.pyplot as plt
import numpy as np
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建模拟数据
np.random.seed(42) # 设置随机种子以便重现结果
n_points = 100
# 创建三组数据
x1 = np.random.normal(50, 15, n_points)
y1 = np.random.normal(60, 10, n_points)
sizes1 = np.random.uniform(20, 200, n_points)
x2 = np.random.normal(80, 12, n_points)
y2 = np.random.normal(40, 8, n_points)
sizes2 = np.random.uniform(20, 200, n_points)
x3 = np.random.normal(30, 10, n_points)
y3 = np.random.normal(30, 12, n_points)
sizes3 = np.random.uniform(20, 200, n_points)
# 创建图形
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
# 基本散点图
scatter1 = ax1.scatter(x1, y1, s=sizes1, alpha=0.6,
edgecolors='w', linewidth=0.5)
ax1.set_title('基本散点图', fontsize=14)
ax1.set_xlabel('X值')
ax1.set_ylabel('Y值')
ax1.grid(True, alpha=0.3)
# 带颜色映射的散点图
all_x = np.concatenate([x1, x2, x3])
all_y = np.concatenate([y1, y2, y3])
all_sizes = np.concatenate([sizes1, sizes2, sizes3])
categories = np.array(['A']*n_points + ['B']*n_points + ['C']*n_points)
# 为不同类别分配颜色
colors = {'A': 'red', 'B': 'blue', 'C': 'green'}
color_list = [colors[cat] for cat in categories]
scatter2 = ax2.scatter(all_x, all_y, s=all_sizes, c=color_list,
alpha=0.6, edgecolors='w', linewidth=0.5)
ax2.set_title('多类别散点图', fontsize=14)
ax2.set_xlabel('X值')
ax2.set_ylabel('Y值')
ax2.grid(True, alpha=0.3)
# 添加图例
import matplotlib.patches as mpatches
legend_elements = [mpatches.Patch(color='red', alpha=0.6, label='类别A'),
mpatches.Patch(color='blue', alpha=0.6, label='类别B'),
mpatches.Patch(color='green', alpha=0.6, label='类别C')]
ax2.legend(handles=legend_elements, loc='upper right')
# 添加颜色条(colorbar)
# 如果需要连续颜色映射:
# plt.colorbar(scatter2, ax=ax2, label='数值大小')
plt.tight_layout()
print("散点图已创建")
# plt.show()
子图和多图布局
创建多个子图,实现复杂的图表布局。
示例
import matplotlib.pyplot as plt
import numpy as np
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建数据
np.random.seed(42)
x = np.linspace(0, 10, 100)
y1 = np.sin(x)
y2 = np.cos(x)
y3 = np.tan(x) / 10 # 缩小tan值以便显示
# 饼图数据
categories = ['技术', '销售', '市场', '行政', '研发']
sizes = [30, 25, 20, 15, 10]
colors = ['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#c2c2f0']
# 创建2x2的子图网格
fig, axs = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle('多图表展示', fontsize=16, fontweight='bold')
# 子图1: 线图
axs[0, 0].plot(x, y1, 'r-', label='sin(x)')
axs[0, 0].plot(x, y2, 'b--', label='cos(x)')
axs[0, 0].set_title('三角函数线图')
axs[0, 0].set_xlabel('x')
axs[0, 0].set_ylabel('y')
axs[0, 0].legend()
axs[0, 0].grid(True, alpha=0.3)
# 子图2: 柱状图
bar_categories = ['Q1', 'Q2', 'Q3', 'Q4']
bar_values = [120, 135, 150, 130]
bars = axs[0, 1].bar(bar_categories, bar_values,
color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728'])
axs[0, 1].set_title('季度销售额')
axs[0, 1].set_ylabel('销售额 (万)')
# 添加数值标签
for bar in bars:
height = bar.get_height()
axs[0, 1].text(bar.get_x() + bar.get_width()/2., height + 3,
f'{height}', ha='center', va='bottom')
# 子图3: 散点图
scatter_x = np.random.rand(50) * 100
scatter_y = np.random.rand(50) * 100
scatter_sizes = np.random.rand(50) * 500
scatter_colors = np.random.rand(50)
scatter = axs[1, 0].scatter(scatter_x, scatter_y, s=scatter_sizes,
c=scatter_colors, alpha=0.6, cmap='viridis')
axs[1, 0].set_title('随机散点图')
axs[1, 0].set_xlabel('X')
axs[1, 0].set_ylabel('Y')
# 添加颜色条
plt.colorbar(scatter, ax=axs[1, 0])
# 子图4: 饼图
wedges, texts, autotexts = axs[1, 1].pie(sizes, labels=categories, colors=colors,
autopct='%1.1f%%', startangle=90,
explode=(0.05, 0, 0, 0, 0))
axs[1, 1].set_title('部门分布饼图')
# 美化文本
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontsize(10)
for text in texts:
text.set_fontsize(11)
# 调整布局
plt.tight_layout()
print("多图布局已创建")
# plt.show()
# 保存整个图形
# fig.savefig('multiple_plots.png', dpi=300, bbox_inches='tight')
# print("图表已保存为 multiple_plots.png")
flask Web框架
flask - 轻量级Web框架
Flask是一个轻量级的Python Web框架,简单易用,适合快速开发Web应用。它基于Werkzeug WSGI工具包和Jinja2模板引擎。
基本Flask应用
from flask import Flask
app = Flask(__name__)
创建Flask应用实例,这是所有Flask应用的起点。
示例:最简单的Flask应用
# app.py
from flask import Flask
# 创建Flask应用实例
app = Flask(__name__)
# 定义路由和视图函数
@app.route('/')
def home():
return '<h1>欢迎来到Flask世界!</h1><p>这是一个简单的Flask应用。</p>'
@app.route('/hello')
def hello():
return 'Hello, World!'
@app.route('/user/<username>')
def show_user(username):
return f'用户: {username}'
@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'文章ID: {post_id}'
# 运行应用
if __name__ == '__main__':
# 调试模式,代码修改后自动重启
app.run(debug=True, host='0.0.0.0', port=5000)
# 运行命令:
# python app.py
# 然后在浏览器中访问:
# http://localhost:5000/
# http://localhost:5000/hello
# http://localhost:5000/user/张三
# http://localhost:5000/post/123
模板渲染
render_template(template_name, **context)
渲染Jinja2模板,将动态数据传递给HTML模板。
示例:使用模板
# app_with_templates.py
from flask import Flask, render_template, request
from datetime import datetime
app = Flask(__name__)
# 模拟数据库
users = {
'zhangsan': {'name': '张三', 'age': 25, 'city': '北京',
'skills': ['Python', 'Java', 'SQL'], 'join_date': '2022-01-15'},
'lisi': {'name': '李四', 'age': 30, 'city': '上海',
'skills': ['JavaScript', 'HTML/CSS', 'React'], 'join_date': '2021-08-22'}
}
visit_count = 0
@app.route('/')
def index():
global visit_count
visit_count += 1
return render_template('index.html',
current_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
visit_count=visit_count)
@app.route('/user/<username>')
def user_profile(username):
user = users.get(username)
if user:
return render_template('user.html', user=user)
else:
return render_template('404.html'), 404
@app.route('/about')
def about():
return render_template('about.html', title='关于我们')
@app.route('/greet', methods=['POST'])
def greet():
name = request.form.get('name', '访客')
return render_template('greet.html', name=name, title='问候')
if __name__ == '__main__':
app.run(debug=True)
# templates/index.html 示例
"""
<!DOCTYPE html>
<html>
<head>
<title>我的网站</title>
</head>
<body>
<h1>欢迎来到我的网站</h1>
<p>当前时间: {{ current_time }}</p>
<p>访问次数: {{ visit_count }}</p>
<h2>用户信息</h2>
<form action="/greet" method="post">
<input type="text" name="name" placeholder="请输入您的姓名" required>
<button type="submit">提交</button>
</form>
</body>
</html>
"""
表单处理和数据库
处理Web表单和连接数据库,创建完整的CRUD应用。
示例:任务管理应用
# task_app.py
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///tasks.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# 定义任务模型
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
status = db.Column(db.String(20), default='待办') # 待办, 进行中, 已完成
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<Task {self.title}>'
# 创建数据库表
with app.app_context():
db.create_all()
# 主页 - 显示所有任务
@app.route('/')
def index():
tasks = Task.query.order_by(Task.created_at.desc()).all()
return render_template('tasks/index.html', tasks=tasks)
# 创建新任务
@app.route('/task/new', methods=['GET', 'POST'])
def new_task():
if request.method == 'POST':
title = request.form['title']
description = request.form['description']
status = request.form.get('status', '待办')
task = Task(title=title, description=description, status=status)
db.session.add(task)
db.session.commit()
flash('任务创建成功!', 'success')
return redirect(url_for('index'))
return render_template('tasks/new.html')
# 查看任务详情
@app.route('/task/<int:task_id>')
def view_task(task_id):
task = Task.query.get_or_404(task_id)
return render_template('tasks/view.html', task=task)
# 编辑任务
@app.route('/task/<int:task_id>/edit', methods=['GET', 'POST'])
def edit_task(task_id):
task = Task.query.get_or_404(task_id)
if request.method == 'POST':
task.title = request.form['title']
task.description = request.form['description']
task.status = request.form['status']
db.session.commit()
flash('任务更新成功!', 'success')
return redirect(url_for('view_task', task_id=task.id))
return render_template('tasks/edit.html', task=task)
# 删除任务
@app.route('/task/<int:task_id>/delete', methods=['POST'])
def delete_task(task_id):
task = Task.query.get_or_404(task_id)
db.session.delete(task)
db.session.commit()
flash('任务删除成功!', 'success')
return redirect(url_for('index'))
# 完成任务
@app.route('/task/<int:task_id>/complete', methods=['POST'])
def complete_task(task_id):
task = Task.query.get_or_404(task_id)
task.status = '已完成'
db.session.commit()
flash('任务已完成!', 'success')
return redirect(url_for('index'))
if __name__ == '__main__':
app.run(debug=True)
# templates/tasks/index.html 示例结构
"""
<!DOCTYPE html>
<html>
<head>
<title>任务管理</title>
</head>
<body>
<h1>任务管理</h1>
<a href="{{ url_for('new_task') }}">新建任务</a>
<table>
<thead>
<tr>
<th>标题</th>
<th>状态</th>
<th>创建时间</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{% for task in tasks %}
<tr>
<td>{{ task.title }}</td>
<td>{{ task.status }}</td>
<td>{{ task.created_at.strftime('%Y-%m-%d') }}</td>
<td>
<a href="{{ url_for('view_task', task_id=task.id) }}">查看</a>
<a href="{{ url_for('edit_task', task_id=task.id) }}">编辑</a>
<form action="{{ url_for('delete_task', task_id=task.id) }}" method="POST" style="display: inline;">
<button type="submit" onclick="return confirm('确定删除吗?')">删除</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>
"""
用户认证和会话
实现用户注册、登录、会话管理和权限控制。
示例:用户认证系统
# auth_app.py
from flask import Flask, render_template, request, redirect, url_for, flash, session
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here-change-in-production'
# 模拟用户数据库
users_db = {
'admin': {
'username': 'admin',
'password_hash': generate_password_hash('admin123'),
'email': 'admin@example.com'
}
}
# 登录装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session:
flash('请先登录!', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/')
def index():
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']
confirm_password = request.form['confirm_password']
# 验证
if password != confirm_password:
flash('两次输入的密码不一致!', 'danger')
return redirect(url_for('register'))
if username in users_db:
flash('用户名已存在!', 'danger')
return redirect(url_for('register'))
# 创建用户
users_db[username] = {
'username': username,
'password_hash': generate_password_hash(password),
'email': email
}
flash('注册成功! 请登录。', 'success')
return redirect(url_for('login'))
return render_template('auth/register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = users_db.get(username)
if user and check_password_hash(user['password_hash'], password):
session['username'] = user['username']
flash('登录成功!', 'success')
return redirect(url_for('dashboard'))
else:
flash('用户名或密码错误!', 'danger')
return render_template('auth/login.html')
@app.route('/logout')
def logout():
session.clear()
flash('已退出登录!', 'info')
return redirect(url_for('index'))
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html', username=session['username'])
@app.route('/profile')
@login_required
def profile():
user = users_db.get(session['username'])
return render_template('auth/profile.html', user=user)
# 错误处理
@app.errorhandler(404)
def not_found_error(error):
return render_template('errors/404.html'), 404
if __name__ == '__main__':
app.run(debug=True)
sqlite3数据库库
sqlite3 - 嵌入式数据库
sqlite3是Python内置的SQLite数据库接口,用于操作SQLite数据库文件。SQLite是一个轻量级的嵌入式数据库,不需要独立的服务器进程。
连接和创建数据库
sqlite3.connect(database[, timeout, detect_types, isolation_level, ...])
连接到SQLite数据库文件。如果文件不存在,会自动创建。
示例:数据库连接和基本操作
import sqlite3
import os
from datetime import datetime
# 连接到数据库(如果不存在则创建)
def connect_db(db_name='example.db'):
"""连接到SQLite数据库"""
conn = sqlite3.connect(db_name)
print(f"已连接到数据库: {db_name}")
return conn
def create_tables(conn):
"""创建数据表"""
cursor = conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
# 创建文章表
cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
user_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# 创建评论表
cursor.execute('''
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
post_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (post_id) REFERENCES posts (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
conn.commit()
print("数据表创建完成")
def insert_sample_data(conn):
"""插入示例数据"""
cursor = conn.cursor()
# 插入用户数据
users = [
('张三', 'zhangsan@example.com', 'password123'),
('李四', 'lisi@example.com', 'password456'),
('王五', 'wangwu@example.com', 'password789')
]
cursor.executemany('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', users)
# 插入文章数据
posts = [
('Python入门教程', 'Python是一种高级编程语言...', 1),
('Flask Web开发', 'Flask是一个轻量级Web框架...', 1),
('SQLite数据库操作', 'SQLite是一个嵌入式数据库...', 2),
('数据可视化技巧', '使用matplotlib创建图表...', 3)
]
cursor.executemany('''
INSERT INTO posts (title, content, user_id)
VALUES (?, ?, ?)
''', posts)
# 插入评论数据
comments = [
('好文章,学习了!', 1, 2),
('很有帮助,谢谢分享', 1, 3),
('期待更多教程', 2, 2),
('例子很实用', 3, 1),
('内容很详细', 4, 2)
]
cursor.executemany('''
INSERT INTO comments (content, post_id, user_id)
VALUES (?, ?, ?)
''', comments)
conn.commit()
print("示例数据插入完成")
def query_data(conn):
"""查询数据"""
cursor = conn.cursor()
# 查询所有用户
print("所有用户:")
cursor.execute('SELECT id, username, email, created_at FROM users')
users = cursor.fetchall()
for user in users:
print(f" ID: {user[0]}, 用户名: {user[1]}, 邮箱: {user[2]}, 注册时间: {user[3]}")
# 查询所有文章
print("\n所有文章:")
cursor.execute('''
SELECT p.id, p.title, u.username, p.created_at
FROM posts p
JOIN users u ON p.user_id = u.id
ORDER BY p.created_at DESC
''')
posts = cursor.fetchall()
for post in posts:
print(f" ID: {post[0]}, 标题: {post[1]}, 作者: {post[2]}, 发布时间: {post[3]}")
# 查询带评论数量的文章
print("\n文章评论统计:")
cursor.execute('''
SELECT p.title, u.username, COUNT(c.id) as comment_count
FROM posts p
JOIN users u ON p.user_id = u.id
LEFT JOIN comments c ON p.id = c.post_id
GROUP BY p.id
ORDER BY comment_count DESC
''')
stats = cursor.fetchall()
for stat in stats:
print(f" 文章: {stat[0]}, 作者: {stat[1]}, 评论数: {stat[2]}")
def update_data(conn):
"""更新数据"""
cursor = conn.cursor()
# 更新用户信息
cursor.execute('''
UPDATE users
SET username = ?
WHERE id = ?
''', ('张老三', 1))
# 更新文章内容
cursor.execute('''
UPDATE posts
SET content = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', ('更新后的Python教程内容...', 1))
conn.commit()
print("数据更新完成")
def delete_data(conn):
"""删除数据"""
cursor = conn.cursor()
# 删除特定评论
cursor.execute('DELETE FROM comments WHERE id = ?', (1,))
# 删除不活跃用户
cursor.execute('DELETE FROM users WHERE is_active = 0')
conn.commit()
print("数据删除完成")
def transaction_example(conn):
"""事务处理示例"""
try:
cursor = conn.cursor()
# 开始事务
cursor.execute('BEGIN TRANSACTION')
# 执行多个操作
cursor.execute('INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)',
('赵六', 'zhaoliu@example.com', 'password999'))
user_id = cursor.lastrowid
cursor.execute('INSERT INTO posts (title, content, user_id) VALUES (?, ?, ?)',
('新文章标题', '新文章内容...', user_id))
# 提交事务
conn.commit()
print("事务执行成功")
except sqlite3.Error as e:
# 回滚事务
conn.rollback()
print(f"事务执行失败,已回滚: {e}")
def backup_database(conn, backup_name='backup.db'):
"""备份数据库"""
try:
# 使用SQLite的备份API
import sqlite3.backup as backup
# 创建备份数据库连接
backup_conn = sqlite3.connect(backup_name)
# 执行备份
with backup_conn:
conn.backup(backup_conn, pages=1)
backup_conn.close()
print(f"数据库已备份到: {backup_name}")
except Exception as e:
print(f"备份失败: {e}")
# 主程序
def main():
db_name = 'example.db'
# 如果数据库已存在,先删除(仅用于演示)
if os.path.exists(db_name):
os.remove(db_name)
print(f"已删除旧数据库: {db_name}")
# 连接数据库
conn = connect_db(db_name)
try:
# 创建表
create_tables(conn)
# 插入示例数据
insert_sample_data(conn)
# 查询数据
query_data(conn)
# 更新数据
update_data(conn)
# 删除数据
delete_data(conn)
# 事务示例
transaction_example(conn)
# 备份数据库
backup_database(conn)
# 再次查询验证
print("\n最终数据状态:")
query_data(conn)
finally:
# 关闭连接
conn.close()
print(f"\n数据库连接已关闭")
if __name__ == '__main__':
main()
高级查询和参数化查询
使用参数化查询防止SQL注入,执行复杂的SQL查询。
示例:高级查询技巧
import sqlite3
import json
def advanced_queries():
"""高级查询示例"""
conn = sqlite3.connect(':memory:') # 使用内存数据库
conn.row_factory = sqlite3.Row # 使用Row对象,可以通过列名访问
cursor = conn.cursor()
# 创建示例表
cursor.execute('''
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
department TEXT,
salary REAL,
hire_date DATE,
skills TEXT -- JSON格式存储技能
)
''')
# 插入示例数据
employees = [
(1, '张三', '技术部', 15000, '2020-01-15', '["Python", "Linux", "Docker"]'),
(2, '李四', '市场部', 12000, '2019-05-20', '["Marketing", "Excel", "PPT"]'),
(3, '王五', '技术部', 18000, '2018-03-10', '["Java", "Spring", "MySQL"]'),
(4, '赵六', '销售部', 10000, '2021-08-05', '["Sales", "Communication"]'),
(5, '钱七', '技术部', 16000, '2020-11-30', '["Python", "Flask", "SQLite"]'),
(6, '孙八', '市场部', 13000, '2019-09-15', '["SEO", "Google Analytics"]')
]
cursor.executemany('''
INSERT INTO employees (id, name, department, salary, hire_date, skills)
VALUES (?, ?, ?, ?, ?, ?)
''', employees)
conn.commit()
print("=== 高级查询示例 ===\n")
# 1. 参数化查询(防止SQL注入)
def search_employees_by_department(dept):
"""安全的参数化查询"""
cursor.execute('SELECT * FROM employees WHERE department = ?', (dept,))
return cursor.fetchall()
print("1. 参数化查询 - 技术部员工:")
tech_employees = search_employees_by_department('技术部')
for emp in tech_employees:
print(f" 姓名: {emp['name']}, 薪资: {emp['salary']}")
# 2. 使用IN子句的参数化查询
def search_employees_in_departments(departments):
"""多个值的参数化查询"""
placeholders = ','.join('?' * len(departments))
query = f'SELECT * FROM employees WHERE department IN ({placeholders})'
cursor.execute(query, departments)
return cursor.fetchall()
print("\n2. 多部门查询:")
multi_dept_emps = search_employees_in_departments(['技术部', '市场部'])
for emp in multi_dept_emps:
print(f" 姓名: {emp['name']}, 部门: {emp['department']}")
# 3. 聚合函数和分组
print("\n3. 部门薪资统计:")
cursor.execute('''
SELECT department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MIN(salary) as min_salary,
MAX(salary) as max_salary,
SUM(salary) as total_salary
FROM employees
GROUP BY department
ORDER BY avg_salary DESC
''')
dept_stats = cursor.fetchall()
for stat in dept_stats:
print(f" 部门: {stat['department']:8} 人数: {stat['employee_count']:2} "
f"平均薪资: {stat['avg_salary']:8.2f} 总薪资: {stat['total_salary']:8.2f}")
# 4. 子查询
print("\n4. 高于平均薪资的员工:")
cursor.execute('''
SELECT name, department, salary
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees)
ORDER BY salary DESC
''')
above_avg = cursor.fetchall()
for emp in above_avg:
print(f" 姓名: {emp['name']}, 部门: {emp['department']}, 薪资: {emp['salary']}")
# 5. 窗口函数(SQLite 3.25.0+)
print("\n5. 部门内薪资排名:")
try:
cursor.execute('''
SELECT name, department, salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank_in_dept,
RANK() OVER (ORDER BY salary DESC) as overall_rank
FROM employees
ORDER BY department, salary DESC
''')
ranked = cursor.fetchall()
for emp in ranked:
print(f" 姓名: {emp['name']:6} 部门: {emp['department']:6} "
f"薪资: {emp['salary']:8.2f} 部门内排名: {emp['rank_in_dept']}")
except sqlite3.OperationalError:
print(" (需要SQLite 3.25.0+版本支持窗口函数)")
# 6. JSON函数处理(SQLite 3.38.0+)
print("\n6. 处理JSON数据:")
try:
cursor.execute('''
SELECT name,
json_extract(skills, '$[0]') as first_skill,
json_array_length(skills) as skill_count
FROM employees
WHERE json_extract(skills, '$') LIKE '%Python%'
''')
python_developers = cursor.fetchall()
for dev in python_developers:
skills_json = json.loads(dev['skills'])
print(f" 姓名: {dev['name']}, 技能数: {dev['skill_count']}, 技能: {', '.join(skills_json)}")
except sqlite3.OperationalError:
print(" (需要SQLite 3.38.0+版本支持JSON函数)")
# 7. 复杂条件查询
print("\n7. 复杂条件组合查询:")
cursor.execute('''
SELECT name, department, salary, hire_date
FROM employees
WHERE (department = '技术部' AND salary > 15000)
OR (department = '市场部' AND salary > 12000)
OR hire_date < '2020-01-01'
ORDER BY department, salary DESC
''')
complex_results = cursor.fetchall()
for result in complex_results:
print(f" 姓名: {result['name']}, 部门: {result['department']}, "
f"薪资: {result['salary']}, 入职: {result['hire_date']}")
# 8. 分页查询
print("\n8. 分页查询(第1页,每页3条):")
page = 1
page_size = 3
offset = (page - 1) * page_size
cursor.execute('''
SELECT name, department, salary
FROM employees
ORDER BY salary DESC
LIMIT ? OFFSET ?
''', (page_size, offset))
page_results = cursor.fetchall()
for i, result in enumerate(page_results, 1):
print(f" {i}. 姓名: {result['name']}, 部门: {result['department']}, 薪资: {result['salary']}")
# 获取总记录数(用于分页)
cursor.execute('SELECT COUNT(*) as total FROM employees')
total = cursor.fetchone()['total']
total_pages = (total + page_size - 1) // page_size
print(f" 总记录数: {total}, 总页数: {total_pages}")
conn.close()
if __name__ == '__main__':
advanced_queries()
实用工具函数
创建实用的数据库操作工具函数。
示例:数据库工具类
import sqlite3
import contextlib
from typing import List, Dict, Any, Optional, Union
from datetime import datetime
import json
class SQLiteDatabase:
"""SQLite数据库工具类"""
def __init__(self, db_path: str):
"""
初始化数据库连接
Args:
db_path: 数据库文件路径,使用':memory:'表示内存数据库
"""
self.db_path = db_path
self.conn = None
def connect(self):
"""连接到数据库"""
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row # 返回字典样式的行
return self.conn
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
self.conn = None
@contextlib.contextmanager
def get_cursor(self):
"""获取游标的上下文管理器"""
if not self.conn:
self.connect()
cursor = self.conn.cursor()
try:
yield cursor
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
finally:
cursor.close()
def execute(self, sql: str, params: tuple = None):
"""执行SQL语句"""
with self.get_cursor() as cursor:
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor
def executemany(self, sql: str, params_list: List[tuple]):
"""执行多条SQL语句"""
with self.get_cursor() as cursor:
cursor.executemany(sql, params_list)
return cursor
def fetch_one(self, sql: str, params: tuple = None) -> Optional[Dict]:
"""获取单条记录"""
cursor = self.execute(sql, params)
row = cursor.fetchone()
return dict(row) if row else None
def fetch_all(self, sql: str, params: tuple = None) -> List[Dict]:
"""获取所有记录"""
cursor = self.execute(sql, params)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def insert(self, table: str, data: Dict) -> int:
"""
插入数据
Args:
table: 表名
data: 数据字典
Returns:
插入行的ID
"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
sql = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})'
with self.get_cursor() as cursor:
cursor.execute(sql, tuple(data.values()))
return cursor.lastrowid
def update(self, table: str, data: Dict, where: str, where_params: tuple = None) -> int:
"""
更新数据
Args:
table: 表名
data: 要更新的数据字典
where: WHERE条件
where_params: WHERE条件参数
Returns:
影响的行数
"""
set_clause = ', '.join([f'{k} = ?' for k in data.keys()])
sql = f'UPDATE {table} SET {set_clause} WHERE {where}'
params = tuple(data.values())
if where_params:
params = params + where_params
with self.get_cursor() as cursor:
cursor.execute(sql, params)
return cursor.rowcount
def delete(self, table: str, where: str, params: tuple = None) -> int:
"""
删除数据
Args:
table: 表名
where: WHERE条件
params: 条件参数
Returns:
删除的行数
"""
sql = f'DELETE FROM {table} WHERE {where}'
with self.get_cursor() as cursor:
cursor.execute(sql, params or ())
return cursor.rowcount
def table_exists(self, table_name: str) -> bool:
"""检查表是否存在"""
sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
result = self.fetch_one(sql, (table_name,))
return result is not None
def get_table_info(self, table_name: str) -> List[Dict]:
"""获取表结构信息"""
sql = f"PRAGMA table_info({table_name})"
return self.fetch_all(sql)
def backup(self, backup_path: str):
"""备份数据库"""
import shutil
shutil.copy2(self.db_path, backup_path)
print(f"数据库已备份到: {backup_path}")
def export_to_json(self, table: str, output_file: str):
"""导出表数据到JSON文件"""
data = self.fetch_all(f'SELECT * FROM {table}')
# 处理Row对象和日期时间
def serialize(obj):
if isinstance(obj, datetime):
return obj.isoformat()
return str(obj)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, default=serialize, ensure_ascii=False, indent=2)
print(f"表 {table} 数据已导出到: {output_file}")
def import_from_json(self, table: str, input_file: str):
"""从JSON文件导入数据"""
with open(input_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if not data:
print("没有数据可导入")
return
# 获取列名(使用第一条数据)
columns = list(data[0].keys())
placeholders = ', '.join(['?'] * len(columns))
sql = f'INSERT INTO {table} ({", ".join(columns)}) VALUES ({placeholders})'
# 准备数据
rows = [tuple(row[col] for col in columns) for row in data]
with self.get_cursor() as cursor:
cursor.executemany(sql, rows)
print(f"已导入 {len(rows)} 条数据到表 {table}")
# 使用示例
def demo_database_tool():
"""演示数据库工具类的使用"""
# 创建数据库实例
db = SQLiteDatabase(':memory:')
try:
# 创建表
db.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
db.execute('''
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
user_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
print("1. 插入数据:")
# 插入用户
user_data = {
'username': '张三',
'email': 'zhangsan@example.com',
'age': 25
}
user_id = db.insert('users', user_data)
print(f" 插入用户,ID: {user_id}")
# 插入文章
post_data = {
'title': 'Python数据库操作',
'content': 'SQLite是Python内置的数据库...',
'user_id': user_id
}
post_id = db.insert('posts', post_data)
print(f" 插入文章,ID: {post_id}")
print("\n2. 查询数据:")
# 查询用户
user = db.fetch_one('SELECT * FROM users WHERE id = ?', (user_id,))
print(f" 用户信息: {user}")
# 查询所有用户
users = db.fetch_all('SELECT * FROM users ORDER BY created_at DESC')
print(f" 所有用户: {len(users)} 条记录")
print("\n3. 更新数据:")
updated = db.update('users', {'age': 26}, 'id = ?', (user_id,))
print(f" 更新了 {updated} 条记录")
# 验证更新
updated_user = db.fetch_one('SELECT * FROM users WHERE id = ?', (user_id,))
print(f" 更新后的年龄: {updated_user['age']}")
print("\n4. 删除数据:")
deleted = db.delete('posts', 'id = ?', (post_id,))
print(f" 删除了 {deleted} 条记录")
print("\n5. 表信息:")
table_info = db.get_table_info('users')
print(f" users表结构:")
for col in table_info:
print(f" 字段: {col['name']}, 类型: {col['type']}, 主键: {col['pk']}")
print("\n6. 批量插入:")
# 批量插入用户
users_batch = [
('李四', 'lisi@example.com', 30),
('王五', 'wangwu@example.com', 28),
('赵六', 'zhaoliu@example.com', 35)
]
db.executemany(
'INSERT INTO users (username, email, age) VALUES (?, ?, ?)',
users_batch
)
# 验证批量插入
all_users = db.fetch_all('SELECT username, age FROM users')
print(f" 所有用户列表:")
for u in all_users:
print(f" {u['username']} - {u['age']}岁")
print("\n7. 复杂查询(连接查询):")
# 重新插入文章数据用于演示连接查询
db.execute("INSERT INTO posts (title, content, user_id) VALUES ('测试文章', '内容...', ?)", (user_id,))
query = '''
SELECT u.username, p.title, p.content
FROM users u
JOIN posts p ON u.id = p.user_id
ORDER BY u.username
'''
results = db.fetch_all(query)
print(f" 用户文章列表:")
for r in results:
print(f" 用户: {r['username']}, 文章: {r['title']}")
finally:
db.close()
if __name__ == '__main__':
demo_database_tool()