Python 学习教程 - 第五篇:文件操作进阶
欢迎继续 Python 学习之旅!今天我们将深入学习文件操作的高级技巧,包括 JSON、CSV、Excel 处理以及常用标准库。
目录
JSON 数据处理
什么是 JSON?
JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,同时也易于机器解析。
JSON 基本结构
# JSON 对象(字典)
{
"name": "张三",
"age": 25,
"city": "北京"
}
# JSON 数组(列表)
[1, 2, 3, 4, 5]
# 嵌套结构
{
"users": [
{"name": "张三", "age": 25},
{"name": "李四", "age": 30}
]
}
Python 中的 JSON 类型映射
| JSON 类型 | Python 类型 |
|---|---|
| 对象 | dict |
| 数组 | list |
| 字符串 | str |
| 数字 | int / float |
| true/false | True / False |
| null | None |
JSON 序列化(转换为 JSON)
import json
# 序列化字典
data = {
"name": "张三",
"age": 25,
"city": "北京",
"is_student": True
}
# 转换为 JSON 字符串
json_str = json.dumps(data, ensure_ascii=False, indent=4)
print(json_str)
输出:
{
"name": "张三",
"age": 25,
"city": "北京",
"is_student": true
}
JSON 反序列化(从 JSON 转换为 Python)
import json
json_str = '{"name": "张三", "age": 25, "city": "北京"}'
# 解析 JSON 字符串
data = json.loads(json_str)
print(data)
print(type(data)) # <class 'dict'>
print(data["name"]) # 张三
保存 JSON 到文件
import json
data = {
"users": [
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"},
{"name": "王五", "age": 28, "city": "广州"}
],
"total": 3
}
# 保存到文件
with open("users.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print("JSON 文件已保存!")
从 JSON 文件读取
import json
# 读取 JSON 文件
with open("users.json", "r", encoding="utf-8") as f:
data = json.load(f)
print(data)
print(data["users"][0]["name"]) # 张三
JSON 高级用法
import json
# 自定义 JSON 转换器
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super().default(obj)
# 使用自定义编码器
data = {
"name": "张三",
"tags": {"python", "programming", "ai"}
}
json_str = json.dumps(data, cls=CustomEncoder, indent=2)
print(json_str)
CSV 文件处理
什么是 CSV?
CSV(Comma-Separated Values)是一种简单的文件格式,用逗号分隔值。
读取 CSV 文件
import csv
# 方法1:使用 csv.reader
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
print(row)
# 方法2:使用 csv.DictReader(推荐)
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
print(row["name"], row["age"], row["city"])
写入 CSV 文件
import csv
# 方法1:使用 csv.writer
data = [
["姓名", "年龄", "城市"],
["张三", 25, "北京"],
["李四", 30, "上海"],
["王五", 28, "广州"]
]
with open("data.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerows(data)
print("CSV 文件已创建!")
使用 DictWriter 写入 CSV
import csv
data = [
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"},
{"name": "王五", "age": 28, "city": "广州"}
]
# 指定字段名
fieldnames = ["name", "age", "city"]
with open("data.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # 写入表头
writer.writerows(data)
print("CSV 文件已创建!")
处理带分隔符的 CSV
import csv
# 使用分号分隔
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.reader(f, delimiter=";")
for row in reader:
print(row)
# 使用制表符分隔
with open("data.tsv", "r", encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t")
for row in reader:
print(row)
Excel 文件处理
使用 openpyxl 库
首先安装 openpyxl:
pip install openpyxl
读取 Excel 文件
from openpyxl import load_workbook
# 加载工作簿
wb = load_workbook("data.xlsx")
# 获取所有工作表名称
sheet_names = wb.sheetnames
print(sheet_names)
# 选择工作表
sheet = wb.active
# 读取单元格值
value = sheet.cell(row=1, column=1).value
print(value)
# 读取整个工作表
for row in sheet.iter_rows(values_only=True):
print(row)
写入 Excel 文件
from openpyxl import Workbook
# 创建新的工作簿
wb = Workbook()
sheet = wb.active
sheet.title = "学生信息"
# 写入数据
data = [
["姓名", "年龄", "城市"],
["张三", 25, "北京"],
["李四", 30, "上海"],
["王五", 28, "广州"]
]
for row in data:
sheet.append(row)
# 保存文件
wb.save("students.xlsx")
print("Excel 文件已创建!")
格式化 Excel
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
wb = Workbook()
sheet = wb.active
# 设置标题样式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4F81BD", end_color="4F81BD", fill_type="solid")
header_alignment = Alignment(horizontal="center")
sheet.append(["姓名", "年龄", "城市"])
row = sheet[2]
row[0].font = header_font
row[0].fill = header_fill
row[0].alignment = header_alignment
row = sheet[3]
row[0].alignment = Alignment(horizontal="center")
wb.save("formatted.xlsx")
使用 pandas 处理 Excel
import pandas as pd
# 读取 Excel 文件
df = pd.read_excel("students.xlsx")
# 显示数据
print(df)
# 获取基本信息
print(df.info())
print(df.describe())
# 按年龄排序
sorted_df = df.sort_values("年龄")
print(sorted_df)
常用标准库
1. os - 操作系统接口
import os
# 获取当前目录
current_dir = os.getcwd()
print(f"当前目录:{current_dir}")
# 创建目录
os.makedirs("new_folder", exist_ok=True)
# 删除目录
# os.rmdir("new_folder")
# 列出目录内容
files = os.listdir(".")
print(files)
# 获取文件大小
file_size = os.path.getsize("data.csv")
print(f"文件大小:{file_size} 字节")
# 检查文件是否存在
if os.path.exists("data.csv"):
print("文件存在")
# 检查是否是文件或目录
print(os.path.isfile("data.csv")) # True
print(os.path.isdir("new_folder")) # True
# 路径操作
path = "/home/user/data.csv"
print(os.path.basename(path)) # data.csv
print(os.path.dirname(path)) # /home/user
print(os.path.join("home", "user", "data.csv")) # home/user/data.csv
2. sys - 系统相关
import sys
# 获取 Python 版本
print(f"Python 版本:{sys.version}")
# 获取命令行参数
print(f"命令行参数:{sys.argv}")
# 系统信息
print(f"平台:{sys.platform}")
print(f"最大递归深度:{sys.getrecursionlimit()}")
# 退出程序
# sys.exit(0) # 正常退出
3. datetime - 日期时间
import datetime
# 获取当前时间
now = datetime.datetime.now()
print(f"当前时间:{now}")
# 格式化时间
formatted = now.strftime("%Y-%m-%d %H:%M:%S")
print(f"格式化时间:{formatted}")
# 时间戳
timestamp = now.timestamp()
print(f"时间戳:{timestamp}")
# 创建指定时间
target_time = datetime.datetime(2024, 12, 25, 15, 30, 0)
print(f"目标时间:{target_time}")
# 计算时间差
diff = now - target_time
print(f"时间差:{diff}")
# 时间计算
tomorrow = now + datetime.timedelta(days=1)
print(f"明天:{tomorrow}")
4. random - 随机数
import random
# 随机整数
print(random.randint(1, 10)) # 1 到 10 之间的整数
print(random.randrange(0, 10, 2)) # 0, 2, 4, 6, 8
# 随机浮点数
print(random.random()) # 0.0 到 1.0
print(random.uniform(1, 10)) # 1.0 到 10.0 之间的浮点数
# 随机选择
fruits = ["苹果", "香蕉", "橙子", "葡萄"]
print(random.choice(fruits)) # 随机选择一个
print(random.sample(fruits, 2)) # 随机选择2个(不重复)
# 打乱列表
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
print(numbers)
5. json - JSON 处理(已介绍)
6. re - 正则表达式
import re
# 查找匹配
text = "我的邮箱是 test@example.com,另一个是 test2@example.com"
pattern = r'\b\w+@\w+\.\w+\b'
matches = re.findall(pattern, text)
print(matches) # ['test@example.com', 'test2@example.com']
# 替换
new_text = re.sub(pattern, "邮箱已隐藏", text)
print(new_text)
# 分割
text = "苹果,香蕉,橙子,葡萄"
fruits = re.split(",", text)
print(fruits)
# 匹配特定格式
phone_pattern = r'1[3-9]\d{9}'
phone = "13812345678"
if re.match(phone_pattern, phone):
print("手机号格式正确")
7. pathlib - 面向对象的路径操作(Python 3.4+)
from pathlib import Path
# 创建路径对象
path = Path("home/user/documents/file.txt")
# 路径操作
print(path.name) # file.txt
print(path.stem) # file
print(path.suffix) # .txt
print(path.parent) # home/user/documents
print(path.exists()) # False
# 路径拼接
new_path = path.parent / "backup" / "file.txt"
print(new_path) # home/user/documents/backup/file.txt
# 读写文件
path = Path("test.txt")
path.write_text("Hello, World!")
content = path.read_text()
print(content)
# 列出目录
for p in Path(".").iterdir():
print(p.name)
文件系统操作
批量文件操作
import os
from pathlib import Path
# 批量重命名文件
folder = "images"
for i, filename in enumerate(os.listdir(folder), 1):
if filename.endswith(".jpg"):
old_path = os.path.join(folder, filename)
new_name = f"image_{i:03d}.jpg"
new_path = os.path.join(folder, new_name)
os.rename(old_path, new_path)
# 批量转换文件格式
for filename in os.listdir("videos"):
if filename.endswith(".mp4"):
old_path = Path(filename)
new_path = old_path.with_suffix(".avi")
# 在这里添加转换代码
print(f"转换:{filename} -> {new_path}")
文件监控
import os
import time
def monitor_folder(folder_path):
"""监控文件夹变化"""
last_files = set(os.listdir(folder_path))
while True:
current_files = set(os.listdir(folder_path))
# 检查新增文件
new_files = current_files - last_files
for file in new_files:
print(f"新增文件:{file}")
# 检查删除文件
deleted_files = last_files - current_files
for file in deleted_files:
print(f"删除文件:{file}")
last_files = current_files
time.sleep(1)
# monitor_folder("documents")
递归文件搜索
import os
def search_files(keyword, folder, recursive=True):
"""搜索包含关键词的文件"""
results = []
if recursive:
# 递归搜索
for root, dirs, files in os.walk(folder):
for file in files:
if keyword in file:
results.append(os.path.join(root, file))
else:
# 非递归搜索
for file in os.listdir(folder):
if keyword in file:
results.append(os.path.join(folder, file))
return results
# 使用示例
results = search_files("data", "documents")
for result in results:
print(result)
综合练习
练习1:日志分析工具
import json
from datetime import datetime
class LogAnalyzer:
"""日志分析工具"""
def __init__(self, log_file="app.log"):
self.log_file = log_file
self.logs = []
def load_logs(self):
"""加载日志文件"""
try:
with open(self.log_file, "r", encoding="utf-8") as f:
for line in f:
log_data = json.loads(line.strip())
self.logs.append(log_data)
except FileNotFoundError:
print(f"日志文件 {self.log_file} 不存在")
return self.logs
def count_errors(self):
"""统计错误数量"""
error_count = sum(1 for log in self.logs if log.get("level") == "ERROR")
print(f"错误数量:{error_count}")
return error_count
def get_top_errors(self, top_n=5):
"""获取最常见的错误"""
error_counts = {}
for log in self.logs:
if log.get("level") == "ERROR":
message = log.get("message", "")
error_counts[message] = error_counts.get(message, 0) + 1
sorted_errors = sorted(error_counts.items(), key=lambda x: x[1], reverse=True)
print(f"Top {top_n} 错误:")
for i, (message, count) in enumerate(sorted_errors[:top_n], 1):
print(f"{i}. {message} - {count} 次")
def get_logs_by_date(self, date_str):
"""按日期获取日志"""
target_date = datetime.strptime(date_str, "%Y-%m-%d")
filtered_logs = [
log for log in self.logs
if datetime.fromtimestamp(log["timestamp"]).date() == target_date.date()
]
print(f"{date_str} 的日志数量:{len(filtered_logs)}")
return filtered_logs
# 使用示例
analyzer = LogAnalyzer("app.log")
analyzer.load_logs()
analyzer.count_errors()
analyzer.get_top_errors()
analyzer.get_logs_by_date("2024-01-15")
练习2:数据转换工具
import csv
import json
from pathlib import Path
class DataConverter:
"""数据格式转换工具"""
def csv_to_json(self, csv_file, json_file):
"""CSV 转 JSON"""
with open(csv_file, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
data = list(reader)
with open(json_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"已转换:{csv_file} -> {json_file}")
def json_to_csv(self, json_file, csv_file):
"""JSON 转 CSV"""
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
if not data:
print("JSON 数据为空")
return
# 获取所有键作为表头
fieldnames = list(data[0].keys())
with open(csv_file, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"已转换:{json_file} -> {csv_file}")
def convert_all(self, folder):
"""批量转换文件夹中的文件"""
folder = Path(folder)
for csv_file in folder.glob("*.csv"):
json_file = csv_file.with_suffix(".json")
self.csv_to_json(csv_file, json_file)
for json_file in folder.glob("*.json"):
csv_file = json_file.with_suffix(".csv")
self.json_to_csv(json_file, csv_file)
# 使用示例
converter = DataConverter()
converter.csv_to_json("data.csv", "data.json")
converter.json_to_csv("data.json", "data.csv")
练习3:文件备份系统
import os
import shutil
from datetime import datetime
from pathlib import Path
class FileBackup:
"""文件备份系统"""
def __init__(self, source_dir, backup_dir="backup"):
self.source_dir = Path(source_dir)
self.backup_dir = Path(backup_dir)
def backup_file(self, filename):
"""备份单个文件"""
try:
source_path = self.source_dir / filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{filename}_{timestamp}.bak"
backup_path = self.backup_dir / backup_name
shutil.copy2(source_path, backup_path)
print(f"✓ 已备份:{filename}")
return True
except Exception as e:
print(f"✗ 备份 {filename} 失败:{e}")
return False
def backup_all(self, recursive=False):
"""备份所有文件"""
# 创建备份目录
self.backup_dir.mkdir(parents=True, exist_ok=True)
files = []
if recursive:
for root, _, filenames in os.walk(self.source_dir):
for filename in filenames:
files.append(Path(root) / filename)
else:
files = self.source_dir.iterdir()
success_count = 0
fail_count = 0
for file in files:
if file.is_file():
if self.backup_file(file.name):
success_count += 1
else:
fail_count += 1
print(f"\n备份完成:成功 {success_count} 个,失败 {fail_count} 个")
def create_archive(self, archive_name="backup.zip"):
"""创建压缩包"""
import zipfile
archive_path = self.backup_dir / archive_name
with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file in self.backup_dir.iterdir():
if file.is_file():
zipf.write(file, file.name)
print(f"已创建压缩包:{archive_path}")
# 使用示例
backup = FileBackup("documents")
backup.backup_all(recursive=True)
backup.create_archive()
总结
今天我们学习了:
✅ JSON 数据处理:序列化、反序列化、文件读写
✅ CSV 文件处理:reader、writer、DictReader、DictWriter
✅ Excel 文件处理:openpyxl、pandas
✅ 常用标准库:os、sys、datetime、random、re、pathlib
✅ 文件系统操作:批量处理、文件监控、递归搜索
重要概念
- 编码问题:文件操作时始终指定 encoding=“utf-8”
- 路径处理:使用 pathlib 或 os.path 处理路径
- 异常处理:文件操作可能抛出异常,需要 try-except
- 上下文管理器:使用
with语句自动管理文件资源
常用库总结
| 库 | 用途 |
|---|---|
| json | JSON 数据处理 |
| csv | CSV 文件读写 |
| openpyxl | Excel 文件处理 |
| pandas | 数据分析 |
| os | 操作系统接口 |
| sys | 系统相关 |
| datetime | 日期时间 |
| random | 随机数 |
| re | 正则表达式 |
| pathlib | 面向对象路径操作 |
下次学习预告
下一篇我们将学习:
- 装饰器(Decorators)
- 生成器和迭代器
- 上下文管理器(Context Managers)
- 常用第三方库
你已经掌握了文件操作的核心技能!可以开始处理真实的数据文件了!