05-文件操作进阶

judgingly
3
2026-02-14

Python 学习教程 - 第五篇:文件操作进阶

欢迎继续 Python 学习之旅!今天我们将深入学习文件操作的高级技巧,包括 JSON、CSV、Excel 处理以及常用标准库。

目录

  1. JSON 数据处理
  2. CSV 文件处理
  3. Excel 文件处理
  4. 常用标准库
  5. 文件系统操作
  6. 综合练习
  7. 总结

JSON 数据处理

什么是 JSON?

JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,同时也易于机器解析。

JSON 基本结构

# JSON 对象(字典)
{
    "name": "张三",
    "age": 25,
    "city": "北京"
}

# JSON 数组(列表)
[1, 2, 3, 4, 5]

# 嵌套结构
{
    "users": [
        {"name": "张三", "age": 25},
        {"name": "李四", "age": 30}
    ]
}

Python 中的 JSON 类型映射

JSON 类型 Python 类型
对象 dict
数组 list
字符串 str
数字 int / float
true/false True / False
null None

JSON 序列化(转换为 JSON)

import json

# 序列化字典
data = {
    "name": "张三",
    "age": 25,
    "city": "北京",
    "is_student": True
}

# 转换为 JSON 字符串
json_str = json.dumps(data, ensure_ascii=False, indent=4)
print(json_str)

输出:

{
    "name": "张三",
    "age": 25,
    "city": "北京",
    "is_student": true
}

JSON 反序列化(从 JSON 转换为 Python)

import json

json_str = '{"name": "张三", "age": 25, "city": "北京"}'

# 解析 JSON 字符串
data = json.loads(json_str)
print(data)
print(type(data))  # <class 'dict'>
print(data["name"])  # 张三

保存 JSON 到文件

import json

data = {
    "users": [
        {"name": "张三", "age": 25, "city": "北京"},
        {"name": "李四", "age": 30, "city": "上海"},
        {"name": "王五", "age": 28, "city": "广州"}
    ],
    "total": 3
}

# 保存到文件
with open("users.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=4)

print("JSON 文件已保存!")

从 JSON 文件读取

import json

# 读取 JSON 文件
with open("users.json", "r", encoding="utf-8") as f:
    data = json.load(f)

print(data)
print(data["users"][0]["name"])  # 张三

JSON 高级用法

import json

# 自定义 JSON 转换器
class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        return super().default(obj)

# 使用自定义编码器
data = {
    "name": "张三",
    "tags": {"python", "programming", "ai"}
}

json_str = json.dumps(data, cls=CustomEncoder, indent=2)
print(json_str)

CSV 文件处理

什么是 CSV?

CSV(Comma-Separated Values)是一种简单的文件格式,用逗号分隔值。

读取 CSV 文件

import csv

# 方法1:使用 csv.reader
with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

# 方法2:使用 csv.DictReader(推荐)
with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row["name"], row["age"], row["city"])

写入 CSV 文件

import csv

# 方法1:使用 csv.writer
data = [
    ["姓名", "年龄", "城市"],
    ["张三", 25, "北京"],
    ["李四", 30, "上海"],
    ["王五", 28, "广州"]
]

with open("data.csv", "w", encoding="utf-8", newline="") as f:
    writer = csv.writer(f)
    writer.writerows(data)

print("CSV 文件已创建!")

使用 DictWriter 写入 CSV

import csv

data = [
    {"name": "张三", "age": 25, "city": "北京"},
    {"name": "李四", "age": 30, "city": "上海"},
    {"name": "王五", "age": 28, "city": "广州"}
]

# 指定字段名
fieldnames = ["name", "age", "city"]

with open("data.csv", "w", encoding="utf-8", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()  # 写入表头
    writer.writerows(data)

print("CSV 文件已创建!")

处理带分隔符的 CSV

import csv

# 使用分号分隔
with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.reader(f, delimiter=";")
    for row in reader:
        print(row)

# 使用制表符分隔
with open("data.tsv", "r", encoding="utf-8") as f:
    reader = csv.reader(f, delimiter="\t")
    for row in reader:
        print(row)

Excel 文件处理

使用 openpyxl 库

首先安装 openpyxl:

pip install openpyxl

读取 Excel 文件

from openpyxl import load_workbook

# 加载工作簿
wb = load_workbook("data.xlsx")

# 获取所有工作表名称
sheet_names = wb.sheetnames
print(sheet_names)

# 选择工作表
sheet = wb.active

# 读取单元格值
value = sheet.cell(row=1, column=1).value
print(value)

# 读取整个工作表
for row in sheet.iter_rows(values_only=True):
    print(row)

写入 Excel 文件

from openpyxl import Workbook

# 创建新的工作簿
wb = Workbook()
sheet = wb.active
sheet.title = "学生信息"

# 写入数据
data = [
    ["姓名", "年龄", "城市"],
    ["张三", 25, "北京"],
    ["李四", 30, "上海"],
    ["王五", 28, "广州"]
]

for row in data:
    sheet.append(row)

# 保存文件
wb.save("students.xlsx")
print("Excel 文件已创建!")

格式化 Excel

from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill

wb = Workbook()
sheet = wb.active

# 设置标题样式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4F81BD", end_color="4F81BD", fill_type="solid")
header_alignment = Alignment(horizontal="center")

sheet.append(["姓名", "年龄", "城市"])
row = sheet[2]
row[0].font = header_font
row[0].fill = header_fill
row[0].alignment = header_alignment

row = sheet[3]
row[0].alignment = Alignment(horizontal="center")

wb.save("formatted.xlsx")

使用 pandas 处理 Excel

import pandas as pd

# 读取 Excel 文件
df = pd.read_excel("students.xlsx")

# 显示数据
print(df)

# 获取基本信息
print(df.info())
print(df.describe())

# 按年龄排序
sorted_df = df.sort_values("年龄")
print(sorted_df)

常用标准库

1. os - 操作系统接口

import os

# 获取当前目录
current_dir = os.getcwd()
print(f"当前目录:{current_dir}")

# 创建目录
os.makedirs("new_folder", exist_ok=True)

# 删除目录
# os.rmdir("new_folder")

# 列出目录内容
files = os.listdir(".")
print(files)

# 获取文件大小
file_size = os.path.getsize("data.csv")
print(f"文件大小:{file_size} 字节")

# 检查文件是否存在
if os.path.exists("data.csv"):
    print("文件存在")

# 检查是否是文件或目录
print(os.path.isfile("data.csv"))  # True
print(os.path.isdir("new_folder"))  # True

# 路径操作
path = "/home/user/data.csv"
print(os.path.basename(path))  # data.csv
print(os.path.dirname(path))  # /home/user
print(os.path.join("home", "user", "data.csv"))  # home/user/data.csv

2. sys - 系统相关

import sys

# 获取 Python 版本
print(f"Python 版本:{sys.version}")

# 获取命令行参数
print(f"命令行参数:{sys.argv}")

# 系统信息
print(f"平台:{sys.platform}")
print(f"最大递归深度:{sys.getrecursionlimit()}")

# 退出程序
# sys.exit(0)  # 正常退出

3. datetime - 日期时间

import datetime

# 获取当前时间
now = datetime.datetime.now()
print(f"当前时间:{now}")

# 格式化时间
formatted = now.strftime("%Y-%m-%d %H:%M:%S")
print(f"格式化时间:{formatted}")

# 时间戳
timestamp = now.timestamp()
print(f"时间戳:{timestamp}")

# 创建指定时间
target_time = datetime.datetime(2024, 12, 25, 15, 30, 0)
print(f"目标时间:{target_time}")

# 计算时间差
diff = now - target_time
print(f"时间差:{diff}")

# 时间计算
tomorrow = now + datetime.timedelta(days=1)
print(f"明天:{tomorrow}")

4. random - 随机数

import random

# 随机整数
print(random.randint(1, 10))  # 1 到 10 之间的整数
print(random.randrange(0, 10, 2))  # 0, 2, 4, 6, 8

# 随机浮点数
print(random.random())  # 0.0 到 1.0
print(random.uniform(1, 10))  # 1.0 到 10.0 之间的浮点数

# 随机选择
fruits = ["苹果", "香蕉", "橙子", "葡萄"]
print(random.choice(fruits))  # 随机选择一个
print(random.sample(fruits, 2))  # 随机选择2个(不重复)

# 打乱列表
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
print(numbers)

5. json - JSON 处理(已介绍)

6. re - 正则表达式

import re

# 查找匹配
text = "我的邮箱是 test@example.com,另一个是 test2@example.com"
pattern = r'\b\w+@\w+\.\w+\b'

matches = re.findall(pattern, text)
print(matches)  # ['test@example.com', 'test2@example.com']

# 替换
new_text = re.sub(pattern, "邮箱已隐藏", text)
print(new_text)

# 分割
text = "苹果,香蕉,橙子,葡萄"
fruits = re.split(",", text)
print(fruits)

# 匹配特定格式
phone_pattern = r'1[3-9]\d{9}'
phone = "13812345678"
if re.match(phone_pattern, phone):
    print("手机号格式正确")

7. pathlib - 面向对象的路径操作(Python 3.4+)

from pathlib import Path

# 创建路径对象
path = Path("home/user/documents/file.txt")

# 路径操作
print(path.name)  # file.txt
print(path.stem)  # file
print(path.suffix)  # .txt
print(path.parent)  # home/user/documents
print(path.exists())  # False

# 路径拼接
new_path = path.parent / "backup" / "file.txt"
print(new_path)  # home/user/documents/backup/file.txt

# 读写文件
path = Path("test.txt")
path.write_text("Hello, World!")
content = path.read_text()
print(content)

# 列出目录
for p in Path(".").iterdir():
    print(p.name)

文件系统操作

批量文件操作

import os
from pathlib import Path

# 批量重命名文件
folder = "images"
for i, filename in enumerate(os.listdir(folder), 1):
    if filename.endswith(".jpg"):
        old_path = os.path.join(folder, filename)
        new_name = f"image_{i:03d}.jpg"
        new_path = os.path.join(folder, new_name)
        os.rename(old_path, new_path)

# 批量转换文件格式
for filename in os.listdir("videos"):
    if filename.endswith(".mp4"):
        old_path = Path(filename)
        new_path = old_path.with_suffix(".avi")
        # 在这里添加转换代码
        print(f"转换:{filename} -> {new_path}")

文件监控

import os
import time

def monitor_folder(folder_path):
    """监控文件夹变化"""
    last_files = set(os.listdir(folder_path))

    while True:
        current_files = set(os.listdir(folder_path))

        # 检查新增文件
        new_files = current_files - last_files
        for file in new_files:
            print(f"新增文件:{file}")

        # 检查删除文件
        deleted_files = last_files - current_files
        for file in deleted_files:
            print(f"删除文件:{file}")

        last_files = current_files
        time.sleep(1)

# monitor_folder("documents")

递归文件搜索

import os

def search_files(keyword, folder, recursive=True):
    """搜索包含关键词的文件"""
    results = []

    if recursive:
        # 递归搜索
        for root, dirs, files in os.walk(folder):
            for file in files:
                if keyword in file:
                    results.append(os.path.join(root, file))
    else:
        # 非递归搜索
        for file in os.listdir(folder):
            if keyword in file:
                results.append(os.path.join(folder, file))

    return results

# 使用示例
results = search_files("data", "documents")
for result in results:
    print(result)

综合练习

练习1:日志分析工具

import json
from datetime import datetime

class LogAnalyzer:
    """日志分析工具"""

    def __init__(self, log_file="app.log"):
        self.log_file = log_file
        self.logs = []

    def load_logs(self):
        """加载日志文件"""
        try:
            with open(self.log_file, "r", encoding="utf-8") as f:
                for line in f:
                    log_data = json.loads(line.strip())
                    self.logs.append(log_data)
        except FileNotFoundError:
            print(f"日志文件 {self.log_file} 不存在")
        return self.logs

    def count_errors(self):
        """统计错误数量"""
        error_count = sum(1 for log in self.logs if log.get("level") == "ERROR")
        print(f"错误数量:{error_count}")
        return error_count

    def get_top_errors(self, top_n=5):
        """获取最常见的错误"""
        error_counts = {}
        for log in self.logs:
            if log.get("level") == "ERROR":
                message = log.get("message", "")
                error_counts[message] = error_counts.get(message, 0) + 1

        sorted_errors = sorted(error_counts.items(), key=lambda x: x[1], reverse=True)
        print(f"Top {top_n} 错误:")
        for i, (message, count) in enumerate(sorted_errors[:top_n], 1):
            print(f"{i}. {message} - {count} 次")

    def get_logs_by_date(self, date_str):
        """按日期获取日志"""
        target_date = datetime.strptime(date_str, "%Y-%m-%d")
        filtered_logs = [
            log for log in self.logs
            if datetime.fromtimestamp(log["timestamp"]).date() == target_date.date()
        ]
        print(f"{date_str} 的日志数量:{len(filtered_logs)}")
        return filtered_logs

# 使用示例
analyzer = LogAnalyzer("app.log")
analyzer.load_logs()
analyzer.count_errors()
analyzer.get_top_errors()
analyzer.get_logs_by_date("2024-01-15")

练习2:数据转换工具

import csv
import json
from pathlib import Path

class DataConverter:
    """数据格式转换工具"""

    def csv_to_json(self, csv_file, json_file):
        """CSV 转 JSON"""
        with open(csv_file, "r", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            data = list(reader)

        with open(json_file, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=4)

        print(f"已转换:{csv_file} -> {json_file}")

    def json_to_csv(self, json_file, csv_file):
        """JSON 转 CSV"""
        with open(json_file, "r", encoding="utf-8") as f:
            data = json.load(f)

        if not data:
            print("JSON 数据为空")
            return

        # 获取所有键作为表头
        fieldnames = list(data[0].keys())

        with open(csv_file, "w", encoding="utf-8", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)

        print(f"已转换:{json_file} -> {csv_file}")

    def convert_all(self, folder):
        """批量转换文件夹中的文件"""
        folder = Path(folder)

        for csv_file in folder.glob("*.csv"):
            json_file = csv_file.with_suffix(".json")
            self.csv_to_json(csv_file, json_file)

        for json_file in folder.glob("*.json"):
            csv_file = json_file.with_suffix(".csv")
            self.json_to_csv(json_file, csv_file)

# 使用示例
converter = DataConverter()
converter.csv_to_json("data.csv", "data.json")
converter.json_to_csv("data.json", "data.csv")

练习3:文件备份系统

import os
import shutil
from datetime import datetime
from pathlib import Path

class FileBackup:
    """文件备份系统"""

    def __init__(self, source_dir, backup_dir="backup"):
        self.source_dir = Path(source_dir)
        self.backup_dir = Path(backup_dir)

    def backup_file(self, filename):
        """备份单个文件"""
        try:
            source_path = self.source_dir / filename
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_name = f"{filename}_{timestamp}.bak"
            backup_path = self.backup_dir / backup_name

            shutil.copy2(source_path, backup_path)
            print(f"✓ 已备份:{filename}")
            return True
        except Exception as e:
            print(f"✗ 备份 {filename} 失败:{e}")
            return False

    def backup_all(self, recursive=False):
        """备份所有文件"""
        # 创建备份目录
        self.backup_dir.mkdir(parents=True, exist_ok=True)

        files = []
        if recursive:
            for root, _, filenames in os.walk(self.source_dir):
                for filename in filenames:
                    files.append(Path(root) / filename)
        else:
            files = self.source_dir.iterdir()

        success_count = 0
        fail_count = 0

        for file in files:
            if file.is_file():
                if self.backup_file(file.name):
                    success_count += 1
                else:
                    fail_count += 1

        print(f"\n备份完成:成功 {success_count} 个,失败 {fail_count} 个")

    def create_archive(self, archive_name="backup.zip"):
        """创建压缩包"""
        import zipfile

        archive_path = self.backup_dir / archive_name

        with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zipf:
            for file in self.backup_dir.iterdir():
                if file.is_file():
                    zipf.write(file, file.name)

        print(f"已创建压缩包:{archive_path}")

# 使用示例
backup = FileBackup("documents")
backup.backup_all(recursive=True)
backup.create_archive()

总结

今天我们学习了:

JSON 数据处理:序列化、反序列化、文件读写
CSV 文件处理:reader、writer、DictReader、DictWriter
Excel 文件处理:openpyxl、pandas
常用标准库:os、sys、datetime、random、re、pathlib
文件系统操作:批量处理、文件监控、递归搜索

重要概念

  1. 编码问题:文件操作时始终指定 encoding=“utf-8”
  2. 路径处理:使用 pathlib 或 os.path 处理路径
  3. 异常处理:文件操作可能抛出异常,需要 try-except
  4. 上下文管理器:使用 with 语句自动管理文件资源

常用库总结

用途
json JSON 数据处理
csv CSV 文件读写
openpyxl Excel 文件处理
pandas 数据分析
os 操作系统接口
sys 系统相关
datetime 日期时间
random 随机数
re 正则表达式
pathlib 面向对象路径操作

下次学习预告

下一篇我们将学习:

  • 装饰器(Decorators)
  • 生成器和迭代器
  • 上下文管理器(Context Managers)
  • 常用第三方库

你已经掌握了文件操作的核心技能!可以开始处理真实的数据文件了!

动物装饰