Python内置函数

内置函数 (Built-in Functions)
Python内置函数是Python解释器自带的函数,无需导入任何模块即可使用。这些函数提供了基本的数据处理和操作功能。
print() - 输出函数
print(*objects, sep=' ', end='\n', file=sys.stdout, flush=False)
将对象打印到文本流或控制台。可以设置分隔符、结束符、输出位置等参数。
objects - 要打印的对象,可以是多个
sep - 对象之间的分隔符,默认为空格
end - 打印结束后的字符,默认为换行符
示例
# 基本用法 print("Hello, World!") # 打印多个值 name = "Alice" age = 25 print("Name:", name, "Age:", age) # 自定义分隔符和结束符 print("Python", "is", "awesome", sep="-", end="!\n")
len() - 长度函数
len(s)
返回对象的长度(元素个数)。适用于字符串、列表、元组、字典、集合等。
示例
# 字符串长度 text = "Python" print(len(text)) # 输出: 6 # 列表长度 numbers = [1, 2, 3, 4, 5] print(len(numbers)) # 输出: 5 # 字典长度 person = {"name": "Alice", "age": 25, "city": "Beijing"} print(len(person)) # 输出: 3
type() - 类型函数
type(object)
返回对象的类型。通常用于调试和类型检查。
示例
# 检查各种类型 print(type(10)) # <class 'int'> print(type(3.14)) # <class 'float'> print(type("Hello")) # <class 'str'> print(type([1, 2, 3])) # <class 'list'> print(type((1, 2, 3))) # <class 'tuple'> print(type({"a": 1})) # <class 'dict'> print(type({1, 2, 3})) # <class 'set'>
range() - 范围函数
range(stop) range(start, stop[, step])
生成一个不可变的数字序列,常用于循环中。
start - 序列起始值(包含),默认为0
stop - 序列结束值(不包含)
step - 步长,默认为1
示例
# 生成0-4的序列 print(list(range(5))) # [0, 1, 2, 3, 4] # 生成2-6的序列 print(list(range(2, 7))) # [2, 3, 4, 5, 6] # 生成1-10的奇数序列 print(list(range(1, 11, 2))) # [1, 3, 5, 7, 9] # 在循环中使用 for i in range(3): print(f"循环第 {i+1} 次")

math数学库

math - 数学函数库
math模块提供了数学运算函数,包括三角函数、对数函数、幂函数、常数等。
math.pow() - 幂运算
math.pow(x, y)
返回x的y次幂。注意:返回值为浮点数。
示例
import math # 计算2的3次方 result = math.pow(2, 3) print(f"2的3次方: {result}") # 8.0 # 计算平方根 sqrt_result = math.pow(16, 0.5) print(f"16的平方根: {sqrt_result}") # 4.0 # 使用**运算符(返回整数) print(f"使用**运算符: {2 ** 3}") # 8
math.sqrt() - 平方根
math.sqrt(x)
返回x的平方根。x必须为非负数。
示例
import math # 计算平方根 print(f"16的平方根: {math.sqrt(16)}") # 4.0 print(f"2的平方根: {math.sqrt(2)}") # 1.4142135623730951 # 计算直角三角形的斜边 a = 3 b = 4 c = math.sqrt(a**2 + b**2) print(f"直角边为{a}和{b}的斜边: {c}") # 5.0
math.ceil()和math.floor() - 取整函数
math.ceil(x) # 向上取整 math.floor(x) # 向下取整
ceil()返回大于或等于x的最小整数,floor()返回小于或等于x的最大整数。
示例
import math x = 3.7 print(f"原数: {x}") print(f"向上取整: {math.ceil(x)}") # 4 print(f"向下取整: {math.floor(x)}") # 3 y = -2.3 print(f"\n原数: {y}") print(f"向上取整: {math.ceil(y)}") # -2 print(f"向下取整: {math.floor(y)}") # -3
math.pi和math.e - 数学常数
math.pi # 圆周率π ≈ 3.141592653589793 math.e # 自然常数e ≈ 2.718281828459045
提供数学中常用的常数。
示例
import math # 计算圆的面积和周长 radius = 5 area = math.pi * radius ** 2 circumference = 2 * math.pi * radius print(f"半径为{radius}的圆:") print(f"面积: {area:.2f}") print(f"周长: {circumference:.2f}") # 使用自然常数e print(f"\n自然常数e的值: {math.e}") print(f"e的平方: {math.e ** 2:.2f}")

os系统库

os - 操作系统接口
os模块提供了与操作系统交互的函数,包括文件和目录操作、环境变量、进程管理等。
os.listdir() - 列出目录内容
os.listdir(path='.')
返回指定路径下的文件和目录列表。
示例
import os # 列出当前目录内容 print("当前目录内容:") for item in os.listdir('.'): print(f" {item}") # 列出指定目录内容(需要目录存在) try: print("\n家目录内容:") for item in os.listdir(os.path.expanduser('~'))[:5]: # 只显示前5个 print(f" {item}") except FileNotFoundError: print("目录不存在")
os.path.join() - 路径拼接
os.path.join(path1[, path2[, ...]])
智能地拼接一个或多个路径组件,使用正确的路径分隔符。
示例
import os # 拼接路径 path1 = "home" path2 = "user" path3 = "documents" path4 = "file.txt" full_path = os.path.join(path1, path2, path3, path4) print(f"拼接后的路径: {full_path}") # 在Windows上输出: home\user\documents\file.txt # 在Linux/Mac上输出: home/user/documents/file.txt # 实际应用:创建文件路径 base_dir = "data" filename = "config.json" config_path = os.path.join(base_dir, filename) print(f"配置文件路径: {config_path}")
os.path.exists() - 检查路径是否存在
os.path.exists(path)
检查指定路径是否存在(文件或目录)。
示例
import os # 检查当前目录是否存在 print(f"当前目录存在: {os.path.exists('.')}") # 检查文件是否存在 test_file = "test.txt" if os.path.exists(test_file): print(f"文件 '{test_file}' 存在") else: print(f"文件 '{test_file}' 不存在") # 创建文件后检查 with open(test_file, 'w') as f: f.write("测试文件") print(f"创建后文件存在: {os.path.exists(test_file)}") # 清理 os.remove(test_file)
os.environ - 环境变量
os.environ
包含环境变量的字典。可以读取和设置环境变量。
示例
import os # 获取环境变量 print("当前用户:", os.environ.get('USER', os.environ.get('USERNAME', '未知'))) print("家目录:", os.environ.get('HOME', os.environ.get('USERPROFILE', '未知'))) # 获取PATH环境变量 path = os.environ.get('PATH', '') path_dirs = path.split(os.pathsep) print(f"\nPATH包含 {len(path_dirs)} 个目录") print("前5个目录:") for i, dir in enumerate(path_dirs[:5]): print(f" {i+1}. {dir}") # 设置环境变量(仅当前进程有效) os.environ['MY_VAR'] = 'my_value' print(f"\n自定义环境变量: MY_VAR = {os.environ.get('MY_VAR')}")

datetime时间库

datetime - 日期时间处理
datetime模块提供了日期和时间处理的类,包括date、time、datetime、timedelta等。
datetime.datetime.now() - 当前时间
datetime.datetime.now([tz])
返回当前本地日期和时间。可以指定时区。
示例
from datetime import datetime # 获取当前时间 now = datetime.now() print(f"当前时间: {now}") print(f"年份: {now.year}") print(f"月份: {now.month}") print(f"日期: {now.day}") print(f"小时: {now.hour}") print(f"分钟: {now.minute}") print(f"秒: {now.second}") print(f"微秒: {now.microsecond}") # 格式化输出 formatted = now.strftime("%Y-%m-%d %H:%M:%S") print(f"格式化时间: {formatted}") # 获取时间戳 timestamp = now.timestamp() print(f"时间戳: {timestamp}")
datetime.timedelta - 时间间隔
datetime.timedelta([days[, seconds[, microseconds[, milliseconds[, minutes[, hours[, weeks]]]]]]])
表示两个日期或时间之间的差。可用于日期时间的加减运算。
示例
from datetime import datetime, timedelta now = datetime.now() print(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}") # 一天后 one_day_later = now + timedelta(days=1) print(f"一天后: {one_day_later.strftime('%Y-%m-%d %H:%M:%S')}") # 一周前 one_week_ago = now - timedelta(weeks=1) print(f"一周前: {one_week_ago.strftime('%Y-%m-%d %H:%M:%S')}") # 2小时30分钟后 two_hours_later = now + timedelta(hours=2, minutes=30) print(f"2小时30分钟后: {two_hours_later.strftime('%Y-%m-%d %H:%M:%S')}") # 计算时间差 date1 = datetime(2023, 1, 1) date2 = datetime(2023, 12, 31) difference = date2 - date1 print(f"\n{date2.date()} 和 {date1.date()} 相差 {difference.days} 天")
datetime.date - 日期处理
datetime.date(year, month, day)
表示日期(不包含时间)。提供日期的各种操作。
示例
from datetime import date # 创建日期 today = date.today() print(f"今天: {today}") print(f"年: {today.year}, 月: {today.month}, 日: {today.day}") # 创建特定日期 christmas = date(2023, 12, 25) print(f"圣诞节: {christmas}") # 计算星期几 (0=周一, 6=周日) weekday = today.weekday() weekdays = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] print(f"今天是: {weekdays[weekday]}") # 日期运算 new_year = date(2024, 1, 1) days_to_new_year = (new_year - today).days print(f"距离2024年元旦还有 {days_to_new_year} 天")

random随机库

random - 随机数生成
random模块提供了生成随机数的函数,包括整数、浮点数、序列选择等。
random.random() - 随机浮点数
random.random()
返回[0.0, 1.0)范围内的随机浮点数。
示例
import random # 生成随机浮点数 print("5个随机浮点数:") for i in range(5): print(f" {random.random():.4f}") # 生成指定范围的随机浮点数 def random_float_range(min_val, max_val): return min_val + random.random() * (max_val - min_val) print("\n10到20之间的随机浮点数:") for i in range(3): print(f" {random_float_range(10, 20):.2f}") # 模拟概率 trials = 10000 count = sum(1 for _ in range(trials) if random.random() < 0.3) probability = count / trials print(f"\n模拟概率 0.3: {probability:.4f} (试验次数: {trials})")
random.randint() - 随机整数
random.randint(a, b)
返回[a, b]范围内的随机整数,包括两端。
示例
import random # 生成1到6的随机整数(模拟骰子) print("掷骰子10次:") for i in range(10): dice = random.randint(1, 6) print(f" 第{i+1}次: {dice}") # 生成随机年龄 min_age = 18 max_age = 60 print(f"\n随机生成年龄 ({min_age}-{max_age}岁):") for i in range(5): age = random.randint(min_age, max_age) print(f" 第{i+1}个: {age}岁") # 抽奖程序 participants = ["Alice", "Bob", "Charlie", "David", "Eva"] winner_index = random.randint(0, len(participants) - 1) print(f"\n抽奖结果: 恭喜 {participants[winner_index]} 中奖!")
random.choice() - 随机选择
random.choice(seq)
从非空序列中随机返回一个元素。
示例
import random # 随机选择颜色 colors = ["红色", "蓝色", "绿色", "黄色", "紫色"] print("随机颜色选择:") for i in range(5): color = random.choice(colors) print(f" 第{i+1}次: {color}") # 随机选择幸运数字 lucky_numbers = [7, 8, 9, 13, 21, 42] print(f"\n幸运数字: {random.choice(lucky_numbers)}") # 随机选择姓名 names = ["张三", "李四", "王五", "赵六", "钱七"] selected = random.choice(names) print(f"随机选择的姓名: {selected}") # 随机选择多个不重复元素 print("\n随机选择3个不重复的幸运儿:") winners = random.sample(names, 3) for i, winner in enumerate(winners): print(f" 第{i+1}名: {winner}")
random.shuffle() - 随机打乱
random.shuffle(x[, random])
将序列x随机打乱(原地操作)。
示例
import random # 打乱列表 cards = ["A", "2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K"] print("原始扑克牌顺序:") print(" ", cards) random.shuffle(cards) print("\n洗牌后的顺序:") print(" ", cards) # 打乱数字列表 numbers = list(range(1, 11)) print(f"\n原始数字: {numbers}") random.shuffle(numbers) print(f"打乱后: {numbers}") # 模拟抽奖顺序 participants = ["选手A", "选手B", "选手C", "选手D", "选手E"] print("\n抽奖顺序:") for i, participant in enumerate(participants): print(f" 第{i+1}位: {participant}")

JSON数据处理

json - JSON编码解码
json模块提供了JSON数据的编码和解码功能,用于Python对象和JSON字符串之间的转换。
json.dumps() - JSON编码
json.dumps(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw)
将Python对象编码成JSON格式字符串。支持多种参数控制输出格式。
obj - 要编码的Python对象
indent - 缩进空格数,美化输出
ensure_ascii - 是否确保ASCII编码,False可输出中文
示例
import json # Python字典 data = { "name": "张三", "age": 25, "city": "北京", "skills": ["Python", "JavaScript", "SQL"], "married": False } # 转换为JSON字符串 json_str = json.dumps(data, ensure_ascii=False, indent=2) print("JSON字符串:") print(json_str) # 紧凑格式 compact_json = json.dumps(data, ensure_ascii=False, separators=(',', ':')) print("\n紧凑格式:") print(compact_json)
json.loads() - JSON解码
json.loads(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)
将JSON格式字符串解码为Python对象。
示例
import json # JSON字符串 json_str = '{"name": "李四", "age": 30, "city": "上海"}' # 解析为Python对象 data = json.loads(json_str) print("解析后的Python对象:") print(f"类型: {type(data)}") print(f"姓名: {data['name']}") print(f"年龄: {data['age']}") print(f"城市: {data['city']}") # 从文件读取JSON json_from_file = ''' { "students": [ {"name": "张三", "score": 85}, {"name": "李四", "score": 92}, {"name": "王五", "score": 78} ] } ''' students_data = json.loads(json_from_file) print("\n学生数据:") for student in students_data['students']: print(f" {student['name']}: {student['score']}分")
json.dump() - 写入JSON文件
json.dump(obj, fp, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw)
将Python对象序列化为JSON格式并写入文件。
示例
import json import os # 要保存的数据 config = { "app_name": "数据管理系统", "version": "1.0.0", "database": { "host": "localhost", "port": 3306, "username": "admin" }, "features": ["用户管理", "数据导入", "报表生成"] } # 写入JSON文件 filename = "config.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=2) print(f"配置文件已保存到 {filename}") # 验证文件内容 if os.path.exists(filename): with open(filename, 'r', encoding='utf-8') as f: content = f.read() print(f"\n文件内容预览:\n{content[:200]}...") # 清理测试文件 os.remove(filename) print(f"\n测试文件 {filename} 已清理")
json.load() - 读取JSON文件
json.load(fp, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)
从文件读取JSON数据并解析为Python对象。
示例
import json import os # 创建测试JSON文件 test_data = { "product": "笔记本电脑", "price": 6999.99, "in_stock": True, "specs": { "cpu": "Intel i7", "ram": "16GB", "storage": "512GB SSD" } } filename = "product.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(test_data, f, ensure_ascii=False, indent=2) # 从文件读取JSON with open(filename, 'r', encoding='utf-8') as f: loaded_data = json.load(f) print("从文件加载的数据:") print(f"产品: {loaded_data['product']}") print(f"价格: ¥{loaded_data['price']}") print(f"库存: {'有货' if loaded_data['in_stock'] else '缺货'}") print("规格:") for key, value in loaded_data['specs'].items(): print(f" {key}: {value}") # 清理 os.remove(filename) print(f"\n测试文件 {filename} 已清理")

re正则表达式库

re - 正则表达式操作
re模块提供了正则表达式匹配操作,用于字符串的模式匹配、查找和替换。
re.search() - 搜索匹配
re.search(pattern, string, flags=0)
扫描整个字符串,返回第一个匹配的Match对象。如果没有匹配则返回None。
pattern - 正则表达式模式
string - 要搜索的字符串
示例
import re # 搜索邮箱地址 text = "联系我: email@example.com 或 support@company.cn" pattern = r'\b[\w\.-]+@[\w\.-]+\.\w+\b' match = re.search(pattern, text) if match: print(f"找到邮箱: {match.group()}") print(f"匹配位置: {match.start()} - {match.end()}") else: print("未找到邮箱") # 搜索电话号码 phone_text = "电话: 138-1234-5678 或 010-8765-4321" phone_pattern = r'\d{3,4}-\d{4}-\d{4}' phone_match = re.search(phone_pattern, phone_text) if phone_match: print(f"\n找到电话: {phone_match.group()}") # 使用分组 date_text = "今天是2023-12-15,明天是2023-12-16" date_pattern = r'(\d{4})-(\d{2})-(\d{2})' date_match = re.search(date_pattern, date_text) if date_match: print(f"\n找到日期: {date_match.group()}") print(f"年: {date_match.group(1)}") print(f"月: {date_match.group(2)}") print(f"日: {date_match.group(3)}")
re.findall() - 查找所有匹配
re.findall(pattern, string, flags=0)
返回字符串中所有非重叠匹配的列表。如果模式中有分组,返回分组元组的列表。
示例
import re # 查找所有数字 text = "Python 3.10 发布于 2021年10月4日,Python 3.11 发布于 2022年10月24日" numbers = re.findall(r'\d+\.?\d*', text) print(f"所有数字: {numbers}") # 查找所有单词 words = re.findall(r'\b\w+\b', text) print(f"\n所有单词: {words}") # 查找所有版本号 versions = re.findall(r'Python (\d+\.\d+)', text) print(f"Python版本: {versions}") # 查找日期 dates = re.findall(r'(\d{4})年(\d{1,2})月(\d{1,2})日', text) print(f"\n所有日期:") for year, month, day in dates: print(f" {year}-{month.zfill(2)}-{day.zfill(2)}") # 查找金额 money_text = "价格: $19.99, ¥100.50, €25.00, 免费" currencies = re.findall(r'[\$\¥€]\d+\.?\d*', money_text) print(f"\n所有金额: {currencies}")
re.sub() - 替换匹配
re.sub(pattern, repl, string, count=0, flags=0)
使用repl替换string中所有匹配pattern的子串,返回替换后的字符串。
示例
import re # 替换敏感信息 text = "用户电话: 138-1234-5678,身份证: 110101199001011234" # 隐藏电话号码中间4位 hidden_phone = re.sub(r'(\d{3})-(\d{4})-(\d{4})', r'\1-****-\3', text) # 隐藏身份证后4位 hidden_id = re.sub(r'(\d{14})(\d{4})', r'\1****', hidden_phone) print("脱敏后文本:") print(hidden_id) # 格式化日期 date_text = "日期: 20231215,另一个日期: 20230101" formatted = re.sub(r'(\d{4})(\d{2})(\d{2})', r'\1-\2-\3', date_text) print(f"\n格式化日期: {formatted}") # 移除HTML标签 html = "

标题

段落内容

" clean_text = re.sub(r'<[^>]+>', '', html) print(f"\n去除HTML标签: {clean_text}") # 使用函数进行替换 def double_numbers(match): num = int(match.group()) return str(num * 2) text_with_numbers = "数字: 1, 2, 3, 4, 5" doubled = re.sub(r'\d+', double_numbers, text_with_numbers) print(f"\n数字加倍: {doubled}")
re.split() - 分割字符串
re.split(pattern, string, maxsplit=0, flags=0)
使用正则表达式模式分割字符串,返回分割后的列表。
示例
import re # 按多种分隔符分割 text = "apple,banana;orange grape|melon" result = re.split(r'[,;| ]', text) print(f"分割结果: {result}") # 按数字分割 text2 = "Python3.10发布于2021年,Python3.11发布于2022年" result2 = re.split(r'\d+\.?\d*', text2) print(f"\n按数字分割: {result2}") # 保留分隔符 text3 = "100+200-300*400/500" result3 = re.split(r'([+\-*/])', text3) print(f"\n包含分隔符的分割: {result3}") # 复杂的日志分割 log_line = "2023-12-15 14:30:25 INFO [MainThread] User login successful" log_parts = re.split(r'\s+', log_line) print(f"\n日志分割:") for i, part in enumerate(log_parts): print(f" {i}: {part}")

collections容器库

collections - 容器数据类型
collections模块提供了额外的容器数据类型,扩展了Python的内置容器。
Counter - 计数器
collections.Counter([iterable-or-mapping])
Counter是一个字典子类,用于计数可哈希对象。它是一个无序集合,元素存储为字典键,计数存储为字典值。
示例
from collections import Counter # 统计列表元素 words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple'] word_count = Counter(words) print(f"单词统计: {word_count}") print(f"apple出现次数: {word_count['apple']}") print(f"最常见的2个: {word_count.most_common(2)}") # 统计字符串字符 text = "abracadabra" char_count = Counter(text) print(f"\n字符统计: {char_count}") print(f"a出现次数: {char_count['a']}") # Counter运算 c1 = Counter(a=3, b=2, c=1) c2 = Counter(a=1, b=2, c=3) print(f"\nCounter1: {c1}") print(f"Counter2: {c2}") print(f"相加: {c1 + c2}") print(f"相减: {c1 - c2}") print(f"交集: {c1 & c2}") print(f"并集: {c1 | c2}") # 统计句子词频 sentence = "this is a test this is only a test" words = sentence.split() word_counter = Counter(words) print(f"\n句子词频: {word_counter}")
defaultdict - 默认字典
collections.defaultdict(default_factory)
返回一个新的类字典对象。defaultdict是内置dict类的子类,它重写了__missing__方法,为不存在的键提供默认值。
示例
from collections import defaultdict # 默认值为列表 list_dict = defaultdict(list) list_dict['fruits'].append('apple') list_dict['fruits'].append('banana') list_dict['vegetables'].append('carrot') print(f"列表字典: {dict(list_dict)}") # 默认值为整数 int_dict = defaultdict(int) words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple'] for word in words: int_dict[word] += 1 print(f"\n词频统计: {dict(int_dict)}") # 默认值为集合 set_dict = defaultdict(set) pairs = [('a', 1), ('a', 2), ('b', 2), ('b', 3), ('c', 1)] for key, value in pairs: set_dict[key].add(value) print(f"\n集合字典: {dict(set_dict)}") # 嵌套defaultdict nested_dict = lambda: defaultdict(int) dd = defaultdict(nested_dict) dd['group1']['item1'] = 10 dd['group1']['item2'] = 20 dd['group2']['item1'] = 30 print(f"\n嵌套字典: {dict(dd['group1'])}") print(f"访问不存在的键: {dd['group3']['new_item']}") # 返回0
deque - 双端队列
collections.deque([iterable[, maxlen]])
返回一个双向队列对象,支持从两端高效地添加和弹出元素,时间复杂度为O(1)。
示例
from collections import deque # 创建deque d = deque(['b', 'c', 'd']) print(f"初始deque: {d}") # 添加元素 d.append('e') # 右端添加 d.appendleft('a') # 左端添加 print(f"添加后: {d}") # 弹出元素 right_item = d.pop() # 右端弹出 left_item = d.popleft() # 左端弹出 print(f"右端弹出: {right_item}, 左端弹出: {left_item}") print(f"弹出后: {d}") # 旋转 d.rotate(1) # 向右旋转1位 print(f"向右旋转: {d}") d.rotate(-2) # 向左旋转2位 print(f"向左旋转2位: {d}") # 有限长度deque limited_d = deque(maxlen=3) for i in range(5): limited_d.append(i) print(f"添加{i}后: {limited_d}")
OrderedDict - 有序字典
collections.OrderedDict([items])
返回一个字典子类,记住元素插入的顺序。在Python 3.7+中,普通dict也保持插入顺序,但OrderedDict有额外的方法。
示例
from collections import OrderedDict # 创建有序字典 od = OrderedDict() od['z'] = 1 od['y'] = 2 od['x'] = 3 print(f"有序字典: {od}") print(f"键顺序: {list(od.keys())}") # 移动元素到最后 od.move_to_end('z') print(f"\n移动z到最后: {list(od.keys())}") # 移动元素到最前 od.move_to_end('x', last=False) print(f"移动x到最前: {list(od.keys())}") # 保留最后N个元素 def keep_last_n(d, n): """保留最后n个元素""" while len(d) > n: d.popitem(last=False) od2 = OrderedDict([(i, f"item{i}") for i in range(10)]) print(f"\n原始字典: {list(od2.keys())}") keep_last_n(od2, 5) print(f"保留最后5个: {list(od2.keys())}") # 合并有序字典 od3 = OrderedDict([('a', 1), ('b', 2)]) od4 = OrderedDict([('c', 3), ('d', 4)]) od3.update(od4) print(f"\n合并后: {list(od3.keys())}")

string字符串库

string - 字符串操作
string模块提供了字符串常量和模板类,用于常见的字符串操作。
string常量
string模块定义了多个有用的字符串常量。
示例
import string print("ASCII小写字母:") print(f" {string.ascii_lowercase}") print("\nASCII大写字母:") print(f" {string.ascii_uppercase}") print("\nASCII字母:") print(f" {string.ascii_letters}") print("\n数字:") print(f" {string.digits}") print("\n十六进制数字:") print(f" {string.hexdigits}") print("\n八进制数字:") print(f" {string.octdigits}") print("\n标点符号:") print(f" {string.punctuation}") print("\n空白字符:") print(f" {repr(string.whitespace)}") print("\n可打印字符:") print(f" {string.printable[:50]}...") # 使用常量生成随机字符串 import random random_str = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) print(f"\n随机字符串: {random_str}") # 检查字符串类型 test_str = "Hello123" print(f"\n'{test_str}' 是否全为字母: {test_str.isalpha()}") print(f"'{test_str}' 是否全为数字: {test_str.isdigit()}") print(f"'{test_str}' 是否全为字母数字: {test_str.isalnum()}")
string.Template - 字符串模板
string.Template(template)
提供简单的字符串替换功能,使用$作为占位符,比%格式化和str.format()更安全。
示例
import string # 创建模板 template = string.Template('$name 的年龄是 $age 岁,住在 $city。') # 替换值 data = {'name': '张三', 'age': 25, 'city': '北京'} result = template.substitute(data) print(f"模板替换: {result}") # 安全替换(缺少值时不会报错) safe_data = {'name': '李四', 'age': 30} safe_result = template.safe_substitute(safe_data) print(f"安全替换: {safe_result}") # 复杂模板 invoice_template = string.Template(''' ========== 发票 ========== 客户: $customer 订单号: $order_id 日期: $date 商品 单价 数量 小计 $item1 $price1 $qty1 $${subtotal1:.2f} $item2 $price2 $qty2 $${subtotal2:.2f} ---------------------------------- 总计: $${total:.2f} ========================== ''') invoice_data = { 'customer': 'ABC公司', 'order_id': 'ORD-2023-001', 'date': '2023-12-15', 'item1': '笔记本电脑', 'price1': 6999, 'qty1': 2, 'subtotal1': 6999 * 2, 'item2': '鼠标', 'price2': 99, 'qty2': 5, 'subtotal2': 99 * 5, 'total': 6999 * 2 + 99 * 5 } invoice = invoice_template.safe_substitute(invoice_data) print(f"\n发票模板:\n{invoice}")
string函数
string模块还提供了一些有用的字符串处理函数。
示例
import string # capwords: 将字符串中每个单词的首字母大写 text = "hello world! this is python programming." capitalized = string.capwords(text) print(f"首字母大写: {capitalized}") # 自定义翻译表 # maketrans: 创建字符映射转换表 # translate: 根据映射表转换字符串 # 创建转换表 trans_table = str.maketrans('aeiou', '12345') text2 = "hello world" translated = text2.translate(trans_table) print(f"\n字符替换: {text2} -> {translated}") # 删除特定字符 remove_table = str.maketrans('', '', 'aeiou') removed = text2.translate(remove_table) print(f"删除元音: {text2} -> {removed}") # 凯撒密码 def caesar_cipher(text, shift): """简单的凯撒密码""" alphabet = string.ascii_lowercase shifted_alphabet = alphabet[shift:] + alphabet[:shift] table = str.maketrans(alphabet + alphabet.upper(), shifted_alphabet + shifted_alphabet.upper()) return text.translate(table) original = "Hello, World!" encrypted = caesar_cipher(original, 3) decrypted = caesar_cipher(encrypted, -3) print(f"\n凯撒密码 (偏移3):") print(f" 原文: {original}") print(f" 加密: {encrypted}") print(f" 解密: {decrypted}")

numpy数值计算库

numpy - 数值计算
NumPy是Python科学计算的基础库,提供高性能的多维数组对象和数组操作工具。
numpy.array() - 创建数组
numpy.array(object, dtype=None, *, copy=True, order='K', subok=False, ndmin=0, like=None)
创建NumPy数组,这是NumPy中最基本的数据结构。
示例
import numpy as np # 从列表创建数组 arr1 = np.array([1, 2, 3, 4, 5]) print(f"一维数组: {arr1}") print(f"形状: {arr1.shape}") print(f"数据类型: {arr1.dtype}") # 二维数组 arr2 = np.array([[1, 2, 3], [4, 5, 6]]) print(f"\n二维数组:\n{arr2}") print(f"形状: {arr2.shape}") print(f"维度: {arr2.ndim}") # 指定数据类型 arr3 = np.array([1.5, 2.7, 3.1], dtype=np.int32) print(f"\n指定类型为int32: {arr3}") # 特殊数组 zeros = np.zeros((3, 4)) print(f"\n零数组 (3x4):\n{zeros}") ones = np.ones((2, 3)) print(f"\n1数组 (2x3):\n{ones}") identity = np.eye(3) print(f"\n单位矩阵 (3x3):\n{identity}") range_arr = np.arange(0, 10, 2) print(f"\n范围数组 (0-10,步长2): {range_arr}") linspace_arr = np.linspace(0, 1, 5) print(f"\n线性间隔数组 (0-1,5个点): {linspace_arr}")
数组操作
NumPy数组支持各种数学运算和操作。
示例
import numpy as np # 创建数组 a = np.array([1, 2, 3, 4]) b = np.array([5, 6, 7, 8]) # 算术运算 print(f"数组a: {a}") print(f"数组b: {b}") print(f"\na + b = {a + b}") print(f"a - b = {a - b}") print(f"a * b = {a * b}") print(f"a / b = {a / b}") print(f"a ** 2 = {a ** 2}") # 矩阵乘法 matrix1 = np.array([[1, 2], [3, 4]]) matrix2 = np.array([[5, 6], [7, 8]]) print(f"\n矩阵1:\n{matrix1}") print(f"矩阵2:\n{matrix2}") print(f"矩阵乘法:\n{np.dot(matrix1, matrix2)}") # 统计函数 arr = np.array([[1, 2, 3], [4, 5, 6]]) print(f"\n数组:\n{arr}") print(f"平均值: {np.mean(arr)}") print(f"全局平均值: {arr.mean()}") print(f"每列平均值: {arr.mean(axis=0)}") print(f"每行平均值: {arr.mean(axis=1)}") print(f"总和: {np.sum(arr)}") print(f"最大值: {np.max(arr)}") print(f"最小值: {np.min(arr)}") print(f"标准差: {np.std(arr):.2f}") # 数组变形 arr_flat = arr.flatten() print(f"\n扁平化: {arr_flat}") arr_reshaped = arr.reshape(3, 2) print(f"重塑为3x2:\n{arr_reshaped}") # 数组索引和切片 print(f"\n原始数组:\n{arr}") print(f"arr[0, 1] = {arr[0, 1]}") print(f"第一行: {arr[0, :]}") print(f"第二列: {arr[:, 1]}")

pandas数据分析库

pandas - 数据分析
pandas是Python的数据分析库,提供DataFrame和Series数据结构,用于数据处理和分析。
pandas.DataFrame - 数据框
pandas.DataFrame(data=None, index=None, columns=None, dtype=None, copy=None)
DataFrame是pandas的主要数据结构,是一个二维标签数组,类似于Excel表格或SQL表。
示例
import pandas as pd # 从字典创建DataFrame data = { '姓名': ['张三', '李四', '王五', '赵六'], '年龄': [25, 30, 35, 28], '城市': ['北京', '上海', '广州', '深圳'], '工资': [5000, 7000, 8000, 6500] } df = pd.DataFrame(data) print("DataFrame:") print(df) print(f"\n形状: {df.shape}") print(f"列名: {df.columns.tolist()}") print(f"索引: {df.index.tolist()}") # 查看数据 print(f"\n前2行:\n{df.head(2)}") print(f"\n后2行:\n{df.tail(2)}") print(f"\n基本信息:\n{df.info()}") print(f"\n描述性统计:\n{df.describe()}") # 选择数据 print(f"\n选择单列 - 姓名:\n{df['姓名']}") print(f"\n选择多列:\n{df[['姓名', '工资']]}") print(f"\n选择行 (iloc):\n{df.iloc[1:3]}") print(f"\n条件筛选 (工资>6000):\n{df[df['工资'] > 6000]}") # 添加新列 df['奖金'] = df['工资'] * 0.1 df['总收入'] = df['工资'] + df['奖金'] print(f"\n添加新列后:\n{df}") # 分组聚合 grouped = df.groupby('城市')['工资'].agg(['mean', 'sum', 'count']) print(f"\n按城市分组统计:\n{grouped}")
数据读取与保存
pandas支持多种格式的数据读写,包括CSV、Excel、JSON、SQL等。
示例
import pandas as pd import numpy as np import os # 创建示例数据 data = { '日期': pd.date_range('2023-01-01', periods=5), '产品': ['A', 'B', 'A', 'C', 'B'], '销量': [100, 150, 120, 80, 200], '单价': [10.5, 20.0, 10.5, 15.0, 20.0] } df = pd.DataFrame(data) df['销售额'] = df['销量'] * df['单价'] print("原始数据:") print(df) # 保存到CSV csv_file = 'sales_data.csv' df.to_csv(csv_file, index=False, encoding='utf-8') print(f"\n数据已保存到 {csv_file}") # 从CSV读取 df_from_csv = pd.read_csv(csv_file) print(f"\n从CSV读取的数据:\n{df_from_csv.head()}") # 保存到Excel excel_file = 'sales_data.xlsx' df.to_excel(excel_file, index=False, sheet_name='销售数据') print(f"\n数据已保存到Excel文件 {excel_file}") # 从Excel读取 df_from_excel = pd.read_excel(excel_file, sheet_name='销售数据') print(f"\n从Excel读取的数据:\n{df_from_excel.head()}") # 保存到JSON json_file = 'sales_data.json' df.to_json(json_file, orient='records', force_ascii=False) print(f"\n数据已保存到JSON文件 {json_file}") # 清理测试文件 for file in [csv_file, excel_file, json_file]: if os.path.exists(file): os.remove(file) print(f"已删除测试文件: {file}")

requests网络请求库

requests - HTTP请求
requests是一个简单易用的HTTP库,用于发送HTTP请求和处理响应。
requests.get() - GET请求
requests.get(url, params=None, **kwargs)
发送HTTP GET请求,获取指定URL的内容。
示例
import requests # 基本GET请求 url = "https://httpbin.org/get" response = requests.get(url) print(f"状态码: {response.status_code}") print(f"响应头:") for key, value in response.headers.items(): print(f" {key}: {value}") print(f"\n响应内容 (前500字符):") print(response.text[:500]) # 带参数的GET请求 params = { 'page': 1, 'limit': 10, 'search': 'python' } response_with_params = requests.get("https://httpbin.org/get", params=params) print(f"\n带参数的请求URL: {response_with_params.url}") # 解析JSON响应 json_response = response_with_params.json() print(f"\nJSON响应中的参数:") print(f" page: {json_response['args'].get('page')}") print(f" limit: {json_response['args'].get('limit')}") # 设置请求头 headers = { 'User-Agent': 'MyPythonApp/1.0', 'Accept': 'application/json' } response_with_headers = requests.get("https://httpbin.org/headers", headers=headers) print(f"\n自定义请求头:") print(response_with_headers.json()['headers'])
requests.post() - POST请求
requests.post(url, data=None, json=None, **kwargs)
发送HTTP POST请求,向服务器提交数据。
示例
import requests # 发送表单数据 url = "https://httpbin.org/post" form_data = { 'username': 'testuser', 'password': 'testpass123', 'email': 'test@example.com' } response = requests.post(url, data=form_data) print("发送表单数据:") print(f"状态码: {response.status_code}") print(f"响应JSON:") print(response.json()['form']) # 发送JSON数据 json_data = { 'name': '张三', 'age': 25, 'skills': ['Python', 'JavaScript', 'SQL'] } json_response = requests.post(url, json=json_data) print(f"\n发送JSON数据:") print(json_response.json()['json']) # 上传文件 files = {'file': ('test.txt', '这是文件内容', 'text/plain')} file_response = requests.post(url, files=files) print(f"\n上传文件:") print(file_response.json()['files']) # 带认证的请求 auth_response = requests.post( "https://httpbin.org/basic-auth/user/passwd", auth=('user', 'passwd') ) print(f"\n基本认证请求:") print(f"状态码: {auth_response.status_code}") print(f"响应: {auth_response.json()}")

BeautifulSoup解析库

BeautifulSoup - HTML/XML解析
BeautifulSoup是一个用于解析HTML和XML文档的Python库,可以方便地提取网页数据。它提供简单易用的API来导航、搜索和修改解析树。
BeautifulSoup() - 创建解析对象
BeautifulSoup(markup, features)
创建一个BeautifulSoup对象,用于解析HTML或XML文档。
markup - 要解析的HTML/XML字符串或文件对象
features - 解析器名称,如'html.parser', 'lxml', 'html5lib'
示例
from bs4 import BeautifulSoup # 解析HTML字符串 html_doc = """ 测试页面

欢迎来到Python世界

这是一个测试页面。

学习BeautifulSoup解析HTML。

  • 项目1
  • 项目2
  • 项目3
Python官网
""" # 创建BeautifulSoup对象 soup = BeautifulSoup(html_doc, 'html.parser') print("文档标题:", soup.title.string) print("整个HTML:") print(soup.prettify()) # 使用不同解析器 # soup_lxml = BeautifulSoup(html_doc, 'lxml') # 需要安装lxml # soup_html5lib = BeautifulSoup(html_doc, 'html5lib') # 需要安装html5lib
查找元素方法
BeautifulSoup提供多种方法查找文档中的元素。
示例
from bs4 import BeautifulSoup html = """

网页标题

文章1

这是第一篇文章的内容。

作者: 张三

文章2

这是第二篇文章的内容。

作者: 李四

链接1 链接2
""" soup = BeautifulSoup(html, 'html.parser') # find() - 查找第一个匹配的元素 first_h2 = soup.find('h2') print(f"第一个h2标签: {first_h2.text}") # find_all() - 查找所有匹配的元素 all_h2 = soup.find_all('h2') print("\n所有h2标签:") for i, h2 in enumerate(all_h2, 1): print(f" {i}. {h2.text}") # 通过属性查找 main_title = soup.find(id='main-title') print(f"\n通过id查找: {main_title.text}") articles = soup.find_all(class_='article') print(f"\n找到 {len(articles)} 篇文章") # 通过CSS选择器查找 authors = soup.select('.author') print("\n所有作者:") for author in authors: print(f" {author.text}") # 查找特定属性的标签 links = soup.find_all('a', href=True) print("\n所有链接:") for link in links: print(f" 文本: {link.text}, 地址: {link['href']}")
导航文档树
通过父子、兄弟关系在文档树中导航。
示例
from bs4 import BeautifulSoup html = """

主标题

段落1

嵌套段落

链接

段落2

  • 项目1
  • 项目2
  • 项目3
""" soup = BeautifulSoup(html, 'html.parser') div_main = soup.find('div', class_='main') # 获取父元素 parent = div_main.parent print(f"div.main的父元素: {parent.name}") # 获取子元素 children = list(div_main.children) print(f"\ndiv.main有 {len(children)} 个子元素:") for i, child in enumerate(children): if child.name: print(f" {i}. {child.name}") else: print(f" {i}. 文本节点") # 获取所有后代 descendants = list(div_main.descendants) print(f"\ndiv.main有 {len(descendants)} 个后代元素") # 获取兄弟元素 first_p = div_main.find('p') print(f"\n第一个p标签: {first_p.text}") next_sibling = first_p.find_next_sibling() print(f"下一个兄弟: {next_sibling.name if next_sibling.name else '文本'}") # 查找前一个和后一个元素 nested_div = soup.find('div', class_='nested') previous = nested_div.find_previous_sibling() next = nested_div.find_next_sibling() print(f"\n嵌套div的前一个兄弟: {previous.name}") print(f"嵌套div的后一个兄弟: {next.name}")
提取和修改数据
从元素中提取内容和属性,以及修改文档。
示例
from bs4 import BeautifulSoup html = """

笔记本电脑

¥6999

高性能笔记本电脑,适合编程和设计。

CPU: i7 内存: 16GB 硬盘: 512GB SSD
立即购买
""" soup = BeautifulSoup(html, 'html.parser') product = soup.find('div', class_='product') # 提取文本 print("产品信息:") print(f"名称: {product.find('h2').get_text(strip=True)}") print(f"价格: {product.find(class_='price').get_text(strip=True)}") # 提取属性 buy_link = product.find('a', class_='buy-btn') print(f"购买链接: {buy_link['href']}") print(f"链接文本: {buy_link.text}") print(f"链接类名: {buy_link.get('class', [])}") # 提取所有规格 specs = product.find('div', class_='specs') print("\n规格:") for spec in specs.find_all('span'): print(f" {spec.text}") # 修改内容 product.find('h2').string = "游戏笔记本电脑" product.find(class_='price').string = "¥7999" print(f"\n修改后的名称: {product.find('h2').text}") print(f"修改后的价格: {product.find(class_='price').text}") # 添加新元素 new_tag = soup.new_tag('p') new_tag.string = "库存: 10台" new_tag['class'] = 'stock' product.append(new_tag) print(f"\n添加库存信息: {product.find(class_='stock').text}") # 删除元素 if product.find(class_='description'): product.find(class_='description').decompose() print("已删除描述信息") # 获取修改后的HTML print(f"\n修改后的HTML:\n{product.prettify()}")

matplotlib可视化库

matplotlib - 数据可视化
matplotlib是Python最流行的绘图库,可以创建各种静态、动态、交互式的图表,包括线图、柱状图、散点图、饼图等。
基本绘图 - plot()
plt.plot(x, y, format_string, **kwargs)
绘制线图,是最常用的绘图函数之一。可以绘制单条或多条线。
示例
import matplotlib.pyplot as plt import numpy as np # 设置中文字体(如果需要显示中文) plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 # 创建数据 x = np.linspace(0, 10, 100) y1 = np.sin(x) y2 = np.cos(x) # 创建图形和坐标轴 fig, ax = plt.subplots(figsize=(10, 6)) # 绘制线图 ax.plot(x, y1, 'r-', label='正弦曲线', linewidth=2) # 红色实线 ax.plot(x, y2, 'b--', label='余弦曲线', linewidth=2) # 蓝色虚线 # 设置图表属性 ax.set_title('三角函数曲线', fontsize=16, fontweight='bold') ax.set_xlabel('X轴', fontsize=12) ax.set_ylabel('Y轴', fontsize=12) ax.legend(loc='upper right') ax.grid(True, alpha=0.3) # 设置坐标轴范围 ax.set_xlim(0, 10) ax.set_ylim(-1.5, 1.5) # 添加文本标注 ax.text(5, 0.8, '正弦波峰值', fontsize=10, bbox=dict(boxstyle="round,pad=0.3", facecolor="yellow", alpha=0.5)) # 显示图表 plt.tight_layout() print("图表已创建,使用plt.show()显示") # plt.show() # 在实际环境中取消注释 # 保存图表 # plt.savefig('sine_cosine.png', dpi=300, bbox_inches='tight') # print("图表已保存为 sine_cosine.png")
柱状图 - bar()
plt.bar(x, height, width=0.8, bottom=None, *, align='center', **kwargs)
绘制垂直柱状图,用于比较不同类别的数据。
示例
import matplotlib.pyplot as plt import numpy as np # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 创建数据 categories = ['苹果', '香蕉', '橙子', '葡萄', '西瓜'] sales_2022 = [120, 85, 110, 65, 95] sales_2023 = [135, 90, 125, 70, 110] x = np.arange(len(categories)) # 类别位置 width = 0.35 # 柱状图宽度 # 创建图形 fig, ax = plt.subplots(figsize=(10, 6)) # 绘制柱状图 bars1 = ax.bar(x - width/2, sales_2022, width, label='2022年', color='skyblue', edgecolor='black') bars2 = ax.bar(x + width/2, sales_2023, width, label='2023年', color='lightcoral', edgecolor='black') # 设置图表属性 ax.set_title('水果销售额对比', fontsize=16, fontweight='bold') ax.set_xlabel('水果种类', fontsize=12) ax.set_ylabel('销售额 (万元)', fontsize=12) ax.set_xticks(x) ax.set_xticklabels(categories) ax.legend() # 添加数值标签 def autolabel(bars): """在柱状图上显示数值""" for bar in bars: height = bar.get_height() ax.annotate(f'{height}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), # 偏移量 textcoords="offset points", ha='center', va='bottom', fontsize=10) autolabel(bars1) autolabel(bars2) # 添加网格 ax.grid(True, axis='y', alpha=0.3) # 调整布局 plt.tight_layout() print("柱状图已创建") # 水平柱状图示例 fig2, ax2 = plt.subplots(figsize=(8, 6)) bars_h = ax2.barh(categories, sales_2023, color='lightgreen', edgecolor='black') ax2.set_title('2023年水果销售额', fontsize=14) ax2.set_xlabel('销售额 (万元)') for bar in bars_h: width = bar.get_width() ax2.annotate(f'{width}', xy=(width, bar.get_y() + bar.get_height()/2), xytext=(5, 0), textcoords="offset points", ha='left', va='center') print("水平柱状图已创建") # plt.show()
散点图 - scatter()
plt.scatter(x, y, s=None, c=None, marker=None, **kwargs)
绘制散点图,用于显示两个变量之间的关系,或数据点的分布。
示例
import matplotlib.pyplot as plt import numpy as np # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 创建模拟数据 np.random.seed(42) # 设置随机种子以便重现结果 n_points = 100 # 创建三组数据 x1 = np.random.normal(50, 15, n_points) y1 = np.random.normal(60, 10, n_points) sizes1 = np.random.uniform(20, 200, n_points) x2 = np.random.normal(80, 12, n_points) y2 = np.random.normal(40, 8, n_points) sizes2 = np.random.uniform(20, 200, n_points) x3 = np.random.normal(30, 10, n_points) y3 = np.random.normal(30, 12, n_points) sizes3 = np.random.uniform(20, 200, n_points) # 创建图形 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) # 基本散点图 scatter1 = ax1.scatter(x1, y1, s=sizes1, alpha=0.6, edgecolors='w', linewidth=0.5) ax1.set_title('基本散点图', fontsize=14) ax1.set_xlabel('X值') ax1.set_ylabel('Y值') ax1.grid(True, alpha=0.3) # 带颜色映射的散点图 all_x = np.concatenate([x1, x2, x3]) all_y = np.concatenate([y1, y2, y3]) all_sizes = np.concatenate([sizes1, sizes2, sizes3]) categories = np.array(['A']*n_points + ['B']*n_points + ['C']*n_points) # 为不同类别分配颜色 colors = {'A': 'red', 'B': 'blue', 'C': 'green'} color_list = [colors[cat] for cat in categories] scatter2 = ax2.scatter(all_x, all_y, s=all_sizes, c=color_list, alpha=0.6, edgecolors='w', linewidth=0.5) ax2.set_title('多类别散点图', fontsize=14) ax2.set_xlabel('X值') ax2.set_ylabel('Y值') ax2.grid(True, alpha=0.3) # 添加图例 import matplotlib.patches as mpatches legend_elements = [mpatches.Patch(color='red', alpha=0.6, label='类别A'), mpatches.Patch(color='blue', alpha=0.6, label='类别B'), mpatches.Patch(color='green', alpha=0.6, label='类别C')] ax2.legend(handles=legend_elements, loc='upper right') # 添加颜色条(colorbar) # 如果需要连续颜色映射: # plt.colorbar(scatter2, ax=ax2, label='数值大小') plt.tight_layout() print("散点图已创建") # plt.show()
子图和多图布局
创建多个子图,实现复杂的图表布局。
示例
import matplotlib.pyplot as plt import numpy as np # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 创建数据 np.random.seed(42) x = np.linspace(0, 10, 100) y1 = np.sin(x) y2 = np.cos(x) y3 = np.tan(x) / 10 # 缩小tan值以便显示 # 饼图数据 categories = ['技术', '销售', '市场', '行政', '研发'] sizes = [30, 25, 20, 15, 10] colors = ['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#c2c2f0'] # 创建2x2的子图网格 fig, axs = plt.subplots(2, 2, figsize=(12, 10)) fig.suptitle('多图表展示', fontsize=16, fontweight='bold') # 子图1: 线图 axs[0, 0].plot(x, y1, 'r-', label='sin(x)') axs[0, 0].plot(x, y2, 'b--', label='cos(x)') axs[0, 0].set_title('三角函数线图') axs[0, 0].set_xlabel('x') axs[0, 0].set_ylabel('y') axs[0, 0].legend() axs[0, 0].grid(True, alpha=0.3) # 子图2: 柱状图 bar_categories = ['Q1', 'Q2', 'Q3', 'Q4'] bar_values = [120, 135, 150, 130] bars = axs[0, 1].bar(bar_categories, bar_values, color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']) axs[0, 1].set_title('季度销售额') axs[0, 1].set_ylabel('销售额 (万)') # 添加数值标签 for bar in bars: height = bar.get_height() axs[0, 1].text(bar.get_x() + bar.get_width()/2., height + 3, f'{height}', ha='center', va='bottom') # 子图3: 散点图 scatter_x = np.random.rand(50) * 100 scatter_y = np.random.rand(50) * 100 scatter_sizes = np.random.rand(50) * 500 scatter_colors = np.random.rand(50) scatter = axs[1, 0].scatter(scatter_x, scatter_y, s=scatter_sizes, c=scatter_colors, alpha=0.6, cmap='viridis') axs[1, 0].set_title('随机散点图') axs[1, 0].set_xlabel('X') axs[1, 0].set_ylabel('Y') # 添加颜色条 plt.colorbar(scatter, ax=axs[1, 0]) # 子图4: 饼图 wedges, texts, autotexts = axs[1, 1].pie(sizes, labels=categories, colors=colors, autopct='%1.1f%%', startangle=90, explode=(0.05, 0, 0, 0, 0)) axs[1, 1].set_title('部门分布饼图') # 美化文本 for autotext in autotexts: autotext.set_color('white') autotext.set_fontsize(10) for text in texts: text.set_fontsize(11) # 调整布局 plt.tight_layout() print("多图布局已创建") # plt.show() # 保存整个图形 # fig.savefig('multiple_plots.png', dpi=300, bbox_inches='tight') # print("图表已保存为 multiple_plots.png")

flask Web框架

flask - 轻量级Web框架
Flask是一个轻量级的Python Web框架,简单易用,适合快速开发Web应用。它基于Werkzeug WSGI工具包和Jinja2模板引擎。
基本Flask应用
from flask import Flask app = Flask(__name__)
创建Flask应用实例,这是所有Flask应用的起点。
示例:最简单的Flask应用
# app.py from flask import Flask # 创建Flask应用实例 app = Flask(__name__) # 定义路由和视图函数 @app.route('/') def home(): return '<h1>欢迎来到Flask世界!</h1><p>这是一个简单的Flask应用。</p>' @app.route('/hello') def hello(): return 'Hello, World!' @app.route('/user/<username>') def show_user(username): return f'用户: {username}' @app.route('/post/<int:post_id>') def show_post(post_id): return f'文章ID: {post_id}' # 运行应用 if __name__ == '__main__': # 调试模式,代码修改后自动重启 app.run(debug=True, host='0.0.0.0', port=5000) # 运行命令: # python app.py # 然后在浏览器中访问: # http://localhost:5000/ # http://localhost:5000/hello # http://localhost:5000/user/张三 # http://localhost:5000/post/123
模板渲染
render_template(template_name, **context)
渲染Jinja2模板,将动态数据传递给HTML模板。
示例:使用模板
# app_with_templates.py from flask import Flask, render_template, request from datetime import datetime app = Flask(__name__) # 模拟数据库 users = { 'zhangsan': {'name': '张三', 'age': 25, 'city': '北京', 'skills': ['Python', 'Java', 'SQL'], 'join_date': '2022-01-15'}, 'lisi': {'name': '李四', 'age': 30, 'city': '上海', 'skills': ['JavaScript', 'HTML/CSS', 'React'], 'join_date': '2021-08-22'} } visit_count = 0 @app.route('/') def index(): global visit_count visit_count += 1 return render_template('index.html', current_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), visit_count=visit_count) @app.route('/user/<username>') def user_profile(username): user = users.get(username) if user: return render_template('user.html', user=user) else: return render_template('404.html'), 404 @app.route('/about') def about(): return render_template('about.html', title='关于我们') @app.route('/greet', methods=['POST']) def greet(): name = request.form.get('name', '访客') return render_template('greet.html', name=name, title='问候') if __name__ == '__main__': app.run(debug=True) # templates/index.html 示例 """ <!DOCTYPE html> <html> <head> <title>我的网站</title> </head> <body> <h1>欢迎来到我的网站</h1> <p>当前时间: {{ current_time }}</p> <p>访问次数: {{ visit_count }}</p> <h2>用户信息</h2> <form action="/greet" method="post"> <input type="text" name="name" placeholder="请输入您的姓名" required> <button type="submit">提交</button> </form> </body> </html> """
表单处理和数据库
处理Web表单和连接数据库,创建完整的CRUD应用。
示例:任务管理应用
# task_app.py from flask import Flask, render_template, request, redirect, url_for, flash from flask_sqlalchemy import SQLAlchemy from datetime import datetime app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key-here' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///tasks.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 定义任务模型 class Task(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(100), nullable=False) description = db.Column(db.Text) status = db.Column(db.String(20), default='待办') # 待办, 进行中, 已完成 created_at = db.Column(db.DateTime, default=datetime.utcnow) def __repr__(self): return f'<Task {self.title}>' # 创建数据库表 with app.app_context(): db.create_all() # 主页 - 显示所有任务 @app.route('/') def index(): tasks = Task.query.order_by(Task.created_at.desc()).all() return render_template('tasks/index.html', tasks=tasks) # 创建新任务 @app.route('/task/new', methods=['GET', 'POST']) def new_task(): if request.method == 'POST': title = request.form['title'] description = request.form['description'] status = request.form.get('status', '待办') task = Task(title=title, description=description, status=status) db.session.add(task) db.session.commit() flash('任务创建成功!', 'success') return redirect(url_for('index')) return render_template('tasks/new.html') # 查看任务详情 @app.route('/task/<int:task_id>') def view_task(task_id): task = Task.query.get_or_404(task_id) return render_template('tasks/view.html', task=task) # 编辑任务 @app.route('/task/<int:task_id>/edit', methods=['GET', 'POST']) def edit_task(task_id): task = Task.query.get_or_404(task_id) if request.method == 'POST': task.title = request.form['title'] task.description = request.form['description'] task.status = request.form['status'] db.session.commit() flash('任务更新成功!', 'success') return redirect(url_for('view_task', task_id=task.id)) return render_template('tasks/edit.html', task=task) # 删除任务 @app.route('/task/<int:task_id>/delete', methods=['POST']) def delete_task(task_id): task = Task.query.get_or_404(task_id) db.session.delete(task) db.session.commit() flash('任务删除成功!', 'success') return redirect(url_for('index')) # 完成任务 @app.route('/task/<int:task_id>/complete', methods=['POST']) def complete_task(task_id): task = Task.query.get_or_404(task_id) task.status = '已完成' db.session.commit() flash('任务已完成!', 'success') return redirect(url_for('index')) if __name__ == '__main__': app.run(debug=True) # templates/tasks/index.html 示例结构 """ <!DOCTYPE html> <html> <head> <title>任务管理</title> </head> <body> <h1>任务管理</h1> <a href="{{ url_for('new_task') }}">新建任务</a> <table> <thead> <tr> <th>标题</th> <th>状态</th> <th>创建时间</th> <th>操作</th> </tr> </thead> <tbody> {% for task in tasks %} <tr> <td>{{ task.title }}</td> <td>{{ task.status }}</td> <td>{{ task.created_at.strftime('%Y-%m-%d') }}</td> <td> <a href="{{ url_for('view_task', task_id=task.id) }}">查看</a> <a href="{{ url_for('edit_task', task_id=task.id) }}">编辑</a> <form action="{{ url_for('delete_task', task_id=task.id) }}" method="POST" style="display: inline;"> <button type="submit" onclick="return confirm('确定删除吗?')">删除</button> </form> </td> </tr> {% endfor %} </tbody> </table> </body> </html> """
用户认证和会话
实现用户注册、登录、会话管理和权限控制。
示例:用户认证系统
# auth_app.py from flask import Flask, render_template, request, redirect, url_for, flash, session from werkzeug.security import generate_password_hash, check_password_hash from functools import wraps app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key-here-change-in-production' # 模拟用户数据库 users_db = { 'admin': { 'username': 'admin', 'password_hash': generate_password_hash('admin123'), 'email': 'admin@example.com' } } # 登录装饰器 def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'username' not in session: flash('请先登录!', 'warning') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function @app.route('/') def index(): return render_template('index.html') @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] email = request.form['email'] password = request.form['password'] confirm_password = request.form['confirm_password'] # 验证 if password != confirm_password: flash('两次输入的密码不一致!', 'danger') return redirect(url_for('register')) if username in users_db: flash('用户名已存在!', 'danger') return redirect(url_for('register')) # 创建用户 users_db[username] = { 'username': username, 'password_hash': generate_password_hash(password), 'email': email } flash('注册成功! 请登录。', 'success') return redirect(url_for('login')) return render_template('auth/register.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = users_db.get(username) if user and check_password_hash(user['password_hash'], password): session['username'] = user['username'] flash('登录成功!', 'success') return redirect(url_for('dashboard')) else: flash('用户名或密码错误!', 'danger') return render_template('auth/login.html') @app.route('/logout') def logout(): session.clear() flash('已退出登录!', 'info') return redirect(url_for('index')) @app.route('/dashboard') @login_required def dashboard(): return render_template('dashboard.html', username=session['username']) @app.route('/profile') @login_required def profile(): user = users_db.get(session['username']) return render_template('auth/profile.html', user=user) # 错误处理 @app.errorhandler(404) def not_found_error(error): return render_template('errors/404.html'), 404 if __name__ == '__main__': app.run(debug=True)

sqlite3数据库库

sqlite3 - 嵌入式数据库
sqlite3是Python内置的SQLite数据库接口,用于操作SQLite数据库文件。SQLite是一个轻量级的嵌入式数据库,不需要独立的服务器进程。
连接和创建数据库
sqlite3.connect(database[, timeout, detect_types, isolation_level, ...])
连接到SQLite数据库文件。如果文件不存在,会自动创建。
示例:数据库连接和基本操作
import sqlite3 import os from datetime import datetime # 连接到数据库(如果不存在则创建) def connect_db(db_name='example.db'): """连接到SQLite数据库""" conn = sqlite3.connect(db_name) print(f"已连接到数据库: {db_name}") return conn def create_tables(conn): """创建数据表""" cursor = conn.cursor() # 创建用户表 cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, email TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT 1 ) ''') # 创建文章表 cursor.execute(''' CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, content TEXT NOT NULL, user_id INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) ''') # 创建评论表 cursor.execute(''' CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, post_id INTEGER NOT NULL, user_id INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (post_id) REFERENCES posts (id) ON DELETE CASCADE, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) ''') conn.commit() print("数据表创建完成") def insert_sample_data(conn): """插入示例数据""" cursor = conn.cursor() # 插入用户数据 users = [ ('张三', 'zhangsan@example.com', 'password123'), ('李四', 'lisi@example.com', 'password456'), ('王五', 'wangwu@example.com', 'password789') ] cursor.executemany(''' INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?) ''', users) # 插入文章数据 posts = [ ('Python入门教程', 'Python是一种高级编程语言...', 1), ('Flask Web开发', 'Flask是一个轻量级Web框架...', 1), ('SQLite数据库操作', 'SQLite是一个嵌入式数据库...', 2), ('数据可视化技巧', '使用matplotlib创建图表...', 3) ] cursor.executemany(''' INSERT INTO posts (title, content, user_id) VALUES (?, ?, ?) ''', posts) # 插入评论数据 comments = [ ('好文章,学习了!', 1, 2), ('很有帮助,谢谢分享', 1, 3), ('期待更多教程', 2, 2), ('例子很实用', 3, 1), ('内容很详细', 4, 2) ] cursor.executemany(''' INSERT INTO comments (content, post_id, user_id) VALUES (?, ?, ?) ''', comments) conn.commit() print("示例数据插入完成") def query_data(conn): """查询数据""" cursor = conn.cursor() # 查询所有用户 print("所有用户:") cursor.execute('SELECT id, username, email, created_at FROM users') users = cursor.fetchall() for user in users: print(f" ID: {user[0]}, 用户名: {user[1]}, 邮箱: {user[2]}, 注册时间: {user[3]}") # 查询所有文章 print("\n所有文章:") cursor.execute(''' SELECT p.id, p.title, u.username, p.created_at FROM posts p JOIN users u ON p.user_id = u.id ORDER BY p.created_at DESC ''') posts = cursor.fetchall() for post in posts: print(f" ID: {post[0]}, 标题: {post[1]}, 作者: {post[2]}, 发布时间: {post[3]}") # 查询带评论数量的文章 print("\n文章评论统计:") cursor.execute(''' SELECT p.title, u.username, COUNT(c.id) as comment_count FROM posts p JOIN users u ON p.user_id = u.id LEFT JOIN comments c ON p.id = c.post_id GROUP BY p.id ORDER BY comment_count DESC ''') stats = cursor.fetchall() for stat in stats: print(f" 文章: {stat[0]}, 作者: {stat[1]}, 评论数: {stat[2]}") def update_data(conn): """更新数据""" cursor = conn.cursor() # 更新用户信息 cursor.execute(''' UPDATE users SET username = ? WHERE id = ? ''', ('张老三', 1)) # 更新文章内容 cursor.execute(''' UPDATE posts SET content = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', ('更新后的Python教程内容...', 1)) conn.commit() print("数据更新完成") def delete_data(conn): """删除数据""" cursor = conn.cursor() # 删除特定评论 cursor.execute('DELETE FROM comments WHERE id = ?', (1,)) # 删除不活跃用户 cursor.execute('DELETE FROM users WHERE is_active = 0') conn.commit() print("数据删除完成") def transaction_example(conn): """事务处理示例""" try: cursor = conn.cursor() # 开始事务 cursor.execute('BEGIN TRANSACTION') # 执行多个操作 cursor.execute('INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)', ('赵六', 'zhaoliu@example.com', 'password999')) user_id = cursor.lastrowid cursor.execute('INSERT INTO posts (title, content, user_id) VALUES (?, ?, ?)', ('新文章标题', '新文章内容...', user_id)) # 提交事务 conn.commit() print("事务执行成功") except sqlite3.Error as e: # 回滚事务 conn.rollback() print(f"事务执行失败,已回滚: {e}") def backup_database(conn, backup_name='backup.db'): """备份数据库""" try: # 使用SQLite的备份API import sqlite3.backup as backup # 创建备份数据库连接 backup_conn = sqlite3.connect(backup_name) # 执行备份 with backup_conn: conn.backup(backup_conn, pages=1) backup_conn.close() print(f"数据库已备份到: {backup_name}") except Exception as e: print(f"备份失败: {e}") # 主程序 def main(): db_name = 'example.db' # 如果数据库已存在,先删除(仅用于演示) if os.path.exists(db_name): os.remove(db_name) print(f"已删除旧数据库: {db_name}") # 连接数据库 conn = connect_db(db_name) try: # 创建表 create_tables(conn) # 插入示例数据 insert_sample_data(conn) # 查询数据 query_data(conn) # 更新数据 update_data(conn) # 删除数据 delete_data(conn) # 事务示例 transaction_example(conn) # 备份数据库 backup_database(conn) # 再次查询验证 print("\n最终数据状态:") query_data(conn) finally: # 关闭连接 conn.close() print(f"\n数据库连接已关闭") if __name__ == '__main__': main()
高级查询和参数化查询
使用参数化查询防止SQL注入,执行复杂的SQL查询。
示例:高级查询技巧
import sqlite3 import json def advanced_queries(): """高级查询示例""" conn = sqlite3.connect(':memory:') # 使用内存数据库 conn.row_factory = sqlite3.Row # 使用Row对象,可以通过列名访问 cursor = conn.cursor() # 创建示例表 cursor.execute(''' CREATE TABLE employees ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, department TEXT, salary REAL, hire_date DATE, skills TEXT -- JSON格式存储技能 ) ''') # 插入示例数据 employees = [ (1, '张三', '技术部', 15000, '2020-01-15', '["Python", "Linux", "Docker"]'), (2, '李四', '市场部', 12000, '2019-05-20', '["Marketing", "Excel", "PPT"]'), (3, '王五', '技术部', 18000, '2018-03-10', '["Java", "Spring", "MySQL"]'), (4, '赵六', '销售部', 10000, '2021-08-05', '["Sales", "Communication"]'), (5, '钱七', '技术部', 16000, '2020-11-30', '["Python", "Flask", "SQLite"]'), (6, '孙八', '市场部', 13000, '2019-09-15', '["SEO", "Google Analytics"]') ] cursor.executemany(''' INSERT INTO employees (id, name, department, salary, hire_date, skills) VALUES (?, ?, ?, ?, ?, ?) ''', employees) conn.commit() print("=== 高级查询示例 ===\n") # 1. 参数化查询(防止SQL注入) def search_employees_by_department(dept): """安全的参数化查询""" cursor.execute('SELECT * FROM employees WHERE department = ?', (dept,)) return cursor.fetchall() print("1. 参数化查询 - 技术部员工:") tech_employees = search_employees_by_department('技术部') for emp in tech_employees: print(f" 姓名: {emp['name']}, 薪资: {emp['salary']}") # 2. 使用IN子句的参数化查询 def search_employees_in_departments(departments): """多个值的参数化查询""" placeholders = ','.join('?' * len(departments)) query = f'SELECT * FROM employees WHERE department IN ({placeholders})' cursor.execute(query, departments) return cursor.fetchall() print("\n2. 多部门查询:") multi_dept_emps = search_employees_in_departments(['技术部', '市场部']) for emp in multi_dept_emps: print(f" 姓名: {emp['name']}, 部门: {emp['department']}") # 3. 聚合函数和分组 print("\n3. 部门薪资统计:") cursor.execute(''' SELECT department, COUNT(*) as employee_count, AVG(salary) as avg_salary, MIN(salary) as min_salary, MAX(salary) as max_salary, SUM(salary) as total_salary FROM employees GROUP BY department ORDER BY avg_salary DESC ''') dept_stats = cursor.fetchall() for stat in dept_stats: print(f" 部门: {stat['department']:8} 人数: {stat['employee_count']:2} " f"平均薪资: {stat['avg_salary']:8.2f} 总薪资: {stat['total_salary']:8.2f}") # 4. 子查询 print("\n4. 高于平均薪资的员工:") cursor.execute(''' SELECT name, department, salary FROM employees WHERE salary > (SELECT AVG(salary) FROM employees) ORDER BY salary DESC ''') above_avg = cursor.fetchall() for emp in above_avg: print(f" 姓名: {emp['name']}, 部门: {emp['department']}, 薪资: {emp['salary']}") # 5. 窗口函数(SQLite 3.25.0+) print("\n5. 部门内薪资排名:") try: cursor.execute(''' SELECT name, department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank_in_dept, RANK() OVER (ORDER BY salary DESC) as overall_rank FROM employees ORDER BY department, salary DESC ''') ranked = cursor.fetchall() for emp in ranked: print(f" 姓名: {emp['name']:6} 部门: {emp['department']:6} " f"薪资: {emp['salary']:8.2f} 部门内排名: {emp['rank_in_dept']}") except sqlite3.OperationalError: print(" (需要SQLite 3.25.0+版本支持窗口函数)") # 6. JSON函数处理(SQLite 3.38.0+) print("\n6. 处理JSON数据:") try: cursor.execute(''' SELECT name, json_extract(skills, '$[0]') as first_skill, json_array_length(skills) as skill_count FROM employees WHERE json_extract(skills, '$') LIKE '%Python%' ''') python_developers = cursor.fetchall() for dev in python_developers: skills_json = json.loads(dev['skills']) print(f" 姓名: {dev['name']}, 技能数: {dev['skill_count']}, 技能: {', '.join(skills_json)}") except sqlite3.OperationalError: print(" (需要SQLite 3.38.0+版本支持JSON函数)") # 7. 复杂条件查询 print("\n7. 复杂条件组合查询:") cursor.execute(''' SELECT name, department, salary, hire_date FROM employees WHERE (department = '技术部' AND salary > 15000) OR (department = '市场部' AND salary > 12000) OR hire_date < '2020-01-01' ORDER BY department, salary DESC ''') complex_results = cursor.fetchall() for result in complex_results: print(f" 姓名: {result['name']}, 部门: {result['department']}, " f"薪资: {result['salary']}, 入职: {result['hire_date']}") # 8. 分页查询 print("\n8. 分页查询(第1页,每页3条):") page = 1 page_size = 3 offset = (page - 1) * page_size cursor.execute(''' SELECT name, department, salary FROM employees ORDER BY salary DESC LIMIT ? OFFSET ? ''', (page_size, offset)) page_results = cursor.fetchall() for i, result in enumerate(page_results, 1): print(f" {i}. 姓名: {result['name']}, 部门: {result['department']}, 薪资: {result['salary']}") # 获取总记录数(用于分页) cursor.execute('SELECT COUNT(*) as total FROM employees') total = cursor.fetchone()['total'] total_pages = (total + page_size - 1) // page_size print(f" 总记录数: {total}, 总页数: {total_pages}") conn.close() if __name__ == '__main__': advanced_queries()
实用工具函数
创建实用的数据库操作工具函数。
示例:数据库工具类
import sqlite3 import contextlib from typing import List, Dict, Any, Optional, Union from datetime import datetime import json class SQLiteDatabase: """SQLite数据库工具类""" def __init__(self, db_path: str): """ 初始化数据库连接 Args: db_path: 数据库文件路径,使用':memory:'表示内存数据库 """ self.db_path = db_path self.conn = None def connect(self): """连接到数据库""" self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row # 返回字典样式的行 return self.conn def close(self): """关闭数据库连接""" if self.conn: self.conn.close() self.conn = None @contextlib.contextmanager def get_cursor(self): """获取游标的上下文管理器""" if not self.conn: self.connect() cursor = self.conn.cursor() try: yield cursor self.conn.commit() except Exception as e: self.conn.rollback() raise e finally: cursor.close() def execute(self, sql: str, params: tuple = None): """执行SQL语句""" with self.get_cursor() as cursor: if params: cursor.execute(sql, params) else: cursor.execute(sql) return cursor def executemany(self, sql: str, params_list: List[tuple]): """执行多条SQL语句""" with self.get_cursor() as cursor: cursor.executemany(sql, params_list) return cursor def fetch_one(self, sql: str, params: tuple = None) -> Optional[Dict]: """获取单条记录""" cursor = self.execute(sql, params) row = cursor.fetchone() return dict(row) if row else None def fetch_all(self, sql: str, params: tuple = None) -> List[Dict]: """获取所有记录""" cursor = self.execute(sql, params) rows = cursor.fetchall() return [dict(row) for row in rows] def insert(self, table: str, data: Dict) -> int: """ 插入数据 Args: table: 表名 data: 数据字典 Returns: 插入行的ID """ columns = ', '.join(data.keys()) placeholders = ', '.join(['?'] * len(data)) sql = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})' with self.get_cursor() as cursor: cursor.execute(sql, tuple(data.values())) return cursor.lastrowid def update(self, table: str, data: Dict, where: str, where_params: tuple = None) -> int: """ 更新数据 Args: table: 表名 data: 要更新的数据字典 where: WHERE条件 where_params: WHERE条件参数 Returns: 影响的行数 """ set_clause = ', '.join([f'{k} = ?' for k in data.keys()]) sql = f'UPDATE {table} SET {set_clause} WHERE {where}' params = tuple(data.values()) if where_params: params = params + where_params with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.rowcount def delete(self, table: str, where: str, params: tuple = None) -> int: """ 删除数据 Args: table: 表名 where: WHERE条件 params: 条件参数 Returns: 删除的行数 """ sql = f'DELETE FROM {table} WHERE {where}' with self.get_cursor() as cursor: cursor.execute(sql, params or ()) return cursor.rowcount def table_exists(self, table_name: str) -> bool: """检查表是否存在""" sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?" result = self.fetch_one(sql, (table_name,)) return result is not None def get_table_info(self, table_name: str) -> List[Dict]: """获取表结构信息""" sql = f"PRAGMA table_info({table_name})" return self.fetch_all(sql) def backup(self, backup_path: str): """备份数据库""" import shutil shutil.copy2(self.db_path, backup_path) print(f"数据库已备份到: {backup_path}") def export_to_json(self, table: str, output_file: str): """导出表数据到JSON文件""" data = self.fetch_all(f'SELECT * FROM {table}') # 处理Row对象和日期时间 def serialize(obj): if isinstance(obj, datetime): return obj.isoformat() return str(obj) with open(output_file, 'w', encoding='utf-8') as f: json.dump(data, f, default=serialize, ensure_ascii=False, indent=2) print(f"表 {table} 数据已导出到: {output_file}") def import_from_json(self, table: str, input_file: str): """从JSON文件导入数据""" with open(input_file, 'r', encoding='utf-8') as f: data = json.load(f) if not data: print("没有数据可导入") return # 获取列名(使用第一条数据) columns = list(data[0].keys()) placeholders = ', '.join(['?'] * len(columns)) sql = f'INSERT INTO {table} ({", ".join(columns)}) VALUES ({placeholders})' # 准备数据 rows = [tuple(row[col] for col in columns) for row in data] with self.get_cursor() as cursor: cursor.executemany(sql, rows) print(f"已导入 {len(rows)} 条数据到表 {table}") # 使用示例 def demo_database_tool(): """演示数据库工具类的使用""" # 创建数据库实例 db = SQLiteDatabase(':memory:') try: # 创建表 db.execute(''' CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, email TEXT UNIQUE NOT NULL, age INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') db.execute(''' CREATE TABLE posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, content TEXT NOT NULL, user_id INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') print("1. 插入数据:") # 插入用户 user_data = { 'username': '张三', 'email': 'zhangsan@example.com', 'age': 25 } user_id = db.insert('users', user_data) print(f" 插入用户,ID: {user_id}") # 插入文章 post_data = { 'title': 'Python数据库操作', 'content': 'SQLite是Python内置的数据库...', 'user_id': user_id } post_id = db.insert('posts', post_data) print(f" 插入文章,ID: {post_id}") print("\n2. 查询数据:") # 查询用户 user = db.fetch_one('SELECT * FROM users WHERE id = ?', (user_id,)) print(f" 用户信息: {user}") # 查询所有用户 users = db.fetch_all('SELECT * FROM users ORDER BY created_at DESC') print(f" 所有用户: {len(users)} 条记录") print("\n3. 更新数据:") updated = db.update('users', {'age': 26}, 'id = ?', (user_id,)) print(f" 更新了 {updated} 条记录") # 验证更新 updated_user = db.fetch_one('SELECT * FROM users WHERE id = ?', (user_id,)) print(f" 更新后的年龄: {updated_user['age']}") print("\n4. 删除数据:") deleted = db.delete('posts', 'id = ?', (post_id,)) print(f" 删除了 {deleted} 条记录") print("\n5. 表信息:") table_info = db.get_table_info('users') print(f" users表结构:") for col in table_info: print(f" 字段: {col['name']}, 类型: {col['type']}, 主键: {col['pk']}") print("\n6. 批量插入:") # 批量插入用户 users_batch = [ ('李四', 'lisi@example.com', 30), ('王五', 'wangwu@example.com', 28), ('赵六', 'zhaoliu@example.com', 35) ] db.executemany( 'INSERT INTO users (username, email, age) VALUES (?, ?, ?)', users_batch ) # 验证批量插入 all_users = db.fetch_all('SELECT username, age FROM users') print(f" 所有用户列表:") for u in all_users: print(f" {u['username']} - {u['age']}岁") print("\n7. 复杂查询(连接查询):") # 重新插入文章数据用于演示连接查询 db.execute("INSERT INTO posts (title, content, user_id) VALUES ('测试文章', '内容...', ?)", (user_id,)) query = ''' SELECT u.username, p.title, p.content FROM users u JOIN posts p ON u.id = p.user_id ORDER BY u.username ''' results = db.fetch_all(query) print(f" 用户文章列表:") for r in results: print(f" 用户: {r['username']}, 文章: {r['title']}") finally: db.close() if __name__ == '__main__': demo_database_tool()