python常用模块的常用方法

一、文件系统操作包

1. os模块 - 操作系统交互

python

import os

# === 路径操作 ===
print(os.getcwd()) # 获取当前工作目录
os.chdir('/tmp') # 改变当前目录
print(os.listdir('.')) # 列出目录内容

# === 路径判断 ===
print(os.path.exists('file.txt')) # 路径是否存在
print(os.path.isfile('file.txt')) # 是否是文件
print(os.path.isdir('dir')) # 是否是目录
print(os.path.getsize('file.txt')) # 文件大小(字节)

# === 文件/目录操作 ===
os.mkdir('new_dir') # 创建目录(单层)
os.makedirs('a/b/c', exist_ok=True) # 递归创建目录
os.rename('old.txt', 'new.txt') # 重命名/移动
os.remove('file.txt') # 删除文件
os.rmdir('empty_dir') # 删除空目录

# === 环境变量 ===
print(os.environ.get('PATH')) # 获取环境变量
os.environ['MY_VAR'] = 'value' # 设置环境变量(临时)

2. shutil模块 - 高级文件操作

python

import shutil

# 复制文件
shutil.copy('source.txt', 'dest.txt') # 复制文件
shutil.copy2('source.txt', 'dest.txt') # 复制文件+元数据
shutil.copytree('src_dir', 'dst_dir') # 递归复制目录

# 移动文件/目录
shutil.move('source.txt', 'new_location/') # 移动文件或目录

# 删除目录(非空)
shutil.rmtree('directory') # 删除整个目录树

# 磁盘使用情况
total, used, free = shutil.disk_usage('.') # 返回(总计,已用,空闲)字节数
print(f"可用空间: {free / 1024**3:.2f} GB")

3. tempfile模块 - 临时文件/目录

python

import tempfile

# 创建临时文件(自动删除)
with tempfile.TemporaryFile(mode='w+') as tmp:
tmp.write('临时数据')
tmp.seek(0)
print(tmp.read()) # 读取内容

# 创建有名称的临时文件
with tempfile.NamedTemporaryFile(
mode='w+',
suffix='.txt',
delete=False # 设置为True则关闭后自动删除
) as tmp:
print(f"临时文件路径: {tmp.name}")
tmp.write('数据')

# 创建临时目录
with tempfile.TemporaryDirectory() as tmpdir:
print(f"临时目录: {tmpdir}")
# 在目录中创建文件...
# 离开with块后目录自动删除

4. pathlib模块 - 面向对象的路径操作(Python 3.4+推荐)

python

from pathlib import Path

# 创建Path对象
p = Path('.') # 当前目录
p = Path('/home/user/doc.txt') # 具体路径

# === 常用属性和方法 ===
print(p.resolve()) # 获取绝对路径
print(p.parent) # 父目录
print(p.name) # 文件名(含后缀)
print(p.stem) # 文件名(不含后缀)
print(p.suffix) # 文件后缀(如 .txt)
print(p.parts) # 路径各部分元组

# === 路径判断 ===
print(p.exists()) # 是否存在
print(p.is_file()) # 是否是文件
print(p.is_dir()) # 是否是目录

# === 文件操作 ===
# 创建目录
p.mkdir(exist_ok=True) # 创建目录,exist_ok=True避免目录已存在时报错
p.mkdir(parents=True) # 递归创建父目录

# 文件读写
p.write_text('内容') # 写入文本
content = p.read_text() # 读取文本
p.write_bytes(b'二进制') # 写入二进制
data = p.read_bytes() # 读取二进制

# 遍历目录
for file in Path('.').glob('*.py'): # 当前目录所有.py文件
print(file)
for file in Path('.').rglob('*.txt'): # 递归所有.txt文件
print(file)

# 路径拼接(推荐方式)
new_path = Path('dir') / 'subdir' / 'file.txt'

二、数据序列化包

1. json模块 - JSON数据交换

python

import json

data = {
"name": "张三",
"age": 30,
"skills": ["Python", "Java"],
"married": False
}

# === 序列化(Python对象 → JSON字符串/文件)===
json_str = json.dumps(data, ensure_ascii=False, indent=2)
# ensure_ascii=False: 允许中文正常显示
# indent=2: 缩进格式化,便于阅读

with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)

# === 反序列化(JSON字符串/文件 → Python对象)===
parsed_data = json.loads(json_str)
print(parsed_data['name'])

with open('data.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)

2. pickle模块 - Python对象序列化

python

import pickle

# 复杂Python对象
data = {
'func': lambda x: x*2, # 可序列化函数!
'set': {1, 2, 3},
'nested': [{'a': 1}, {'b': 2}]
}

# 序列化到文件
with open('data.pkl', 'wb') as f: # 注意:二进制模式
pickle.dump(data, f)

# 从文件反序列化
with open('data.pkl', 'rb') as f:
loaded = pickle.load(f)
print(loaded['set']) # {1, 2, 3}
# 序列化到字节
bytes_data = pickle.dumps(data)
new_data = pickle.loads(bytes_data)

# ⚠️ 安全警告:不要反序列化不受信任的来源!

三、时间处理包

1. time模块 - 基础时间操作

python

import time

# === 时间戳(1970.1.1至今的秒数)===
timestamp = time.time() # 当前时间戳(浮点数)
print(f"时间戳: {timestamp}")

# === 时间格式化 ===
# 结构化时间(struct_time)
struct_time = time.localtime() # 本地时间
print(f"年: {struct_time.tm_year}")
print(f"月: {struct_time.tm_mon}")
print(f"日: {struct_time.tm_mday}")

# 格式化为字符串
formatted = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)
print(formatted) # 2024-01-15 14:30:25

# 字符串解析为struct_time
parsed = time.strptime("2024-01-15", "%Y-%m-%d")

# === 延时 ===
print("开始等待...")
time.sleep(2.5) # 休眠2.5秒
print("等待结束")

# === 性能计时 ===
start = time.perf_counter() # 高精度计时器
# 执行一些操作...
end = time.perf_counter()
print(f"耗时: {end - start:.6f}秒")

2. datetime模块(补充) - 高级日期时间

python

from datetime import datetime, date, timedelta

# 当前时间
now = datetime.now()
print(f"当前时间: {now}")
print(f"日期部分: {now.date()}")
print(f"时间部分: {now.time()}")

# 特定时间
dt = datetime(2024, 1, 15, 14, 30, 0)
print(f"指定时间: {dt}")

# 时间运算
tomorrow = now + timedelta(days=1)
last_week = now - timedelta(weeks=1)
print(f"明天: {tomorrow}")

# 时间格式化
formatted = now.strftime("%Y年%m月%d日 %H时%M分")
print(formatted)

# 字符串转datetime
dt = datetime.strptime("2024-01-15 14:30", "%Y-%m-%d %H:%M")

四、并发编程包

1. threading模块 - 线程操作

python

import threading
import time

def worker(name, delay):
"""线程任务函数"""
print(f"线程 {name} 开始")
time.sleep(delay)
print(f"线程 {name} 结束")
return f"结果-{name}"

# === 创建线程 ===
t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1))

t1.start() # 启动线程
t2.start()

t1.join() # 等待线程结束
t2.join()

print("所有线程完成")

# === 线程锁(解决资源竞争)===
counter = 0
lock = threading.Lock()

def safe_increment():
global counter
for _ in range(100000):
with lock: # 自动获取和释放锁
counter += 1

threads = []
for i in range(5):
t = threading.Thread(target=safe_increment)
threads.append(t)
t.start()

for t in threads:
t.join()

print(f"最终计数: {counter}") # 应该是500000

2. multiprocessing模块 - 进程操作

python

import multiprocessing
import time

def cpu_intensive_task(n):
"""CPU密集型任务"""
result = sum(i*i for i in range(n))
print(f"进程 {multiprocessing.current_process().name} 完成")
return result

if __name__ == '__main__': # Windows必须加这句
# 创建进程
p1 = multiprocessing.Process(target=cpu_intensive_task, args=(1000000,))
p2 = multiprocessing.Process(target=cpu_intensive_task, args=(2000000,))
p1.start()
p2.start()
p1.join()
p2.join()
# 使用进程池
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, [1000000, 2000000, 3000000])
print(f"结果: {results}")

3. concurrent.futures模块 - 高级并发

python

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def task(n):
time.sleep(1)
return n * n

# === 线程池(I/O密集型)===
print("=== 线程池示例 ===")
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交单个任务
future = executor.submit(task, 5)
print(f"单个任务结果: {future.result()}")
# 提交多个任务
futures = [executor.submit(task, i) for i in range(5)]
for future in as_completed(futures): # 按完成顺序获取
print(f"完成的任务结果: {future.result()}")
# 使用map(保持顺序)
results = executor.map(task, range(5))
for result in results:
print(f"按顺序结果: {result}")

# === 进程池(CPU密集型)===
print("\n=== 进程池示例 ===")
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in as_completed(futures):
print(f"进程结果: {future.result()}")

五、实际应用示例

场景:批量处理日志文件

python

import os
import shutil
from pathlib import Path
import json
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import time

def process_log_file(log_path):
"""处理单个日志文件"""
print(f"处理: {log_path.name}")
# 模拟处理时间
time.sleep(0.5)
# 读取和分析(示例:统计行数)
try:
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
result = {
'filename': log_path.name,
'line_count': len(lines),
'size_kb': os.path.getsize(log_path) / 1024,
'processed_at': datetime.now().isoformat()
}
# 保存结果
output_path = Path('processed') / f"{log_path.stem}_result.json"
with open(output_path, 'w') as f:
json.dump(result, f, indent=2)
return result
except Exception as e:
return {'filename': log_path.name, 'error': str(e)}

def main():
# 创建目录结构
Path('logs').mkdir(exist_ok=True)
Path('processed').mkdir(exist_ok=True)
Path('archive').mkdir(exist_ok=True)
# 假设有5个日志文件
log_files = list(Path('logs').glob('*.log'))
if not log_files:
print("没有找到日志文件")
return
print(f"找到 {len(log_files)} 个日志文件")
# 使用线程池并行处理
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交所有处理任务
futures = {executor.submit(process_log_file, log_file): log_file
for log_file in log_files}
# 收集结果
results = []
for future in futures:
result = future.result()
results.append(result)
print(f"完成: {result.get('filename', '未知')}")
# 处理完成后移动日志文件到存档
for log_file in log_files:
shutil.move(str(log_file), f"archive/{log_file.name}")
# 汇总结果
total_lines = sum(r.get('line_count', 0) for r in results)
print(f"\n处理完成!")
print(f"总文件数: {len(results)}")
print(f"总行数: {total_lines}")
print(f"总耗时: {time.time() - start_time:.2f}秒")
# 保存汇总结果
summary = {
'total_files': len(results),
'total_lines': total_lines,
'processing_time': time.time() - start_time,
'details': results
}
with open('processing_summary.json', 'w') as f:
json.dump(summary, f, indent=2, default=str)

if __name__ == "__main__":
main()

六、其他常用包

1. sys模块:系统相关,如命令行参数、标准输入输出等。

  1. sys.argv:命令行参数列表。
  2. sys.exit():退出程序。

2. re模块:正则表达式。

  1. re.match(pattern, string):从字符串开头匹配。
  2. re.search(pattern, string):搜索字符串中第一个匹配。
  3. re.findall(pattern, string):返回所有匹配的列表。
  4. re.sub(pattern, repl, string):替换匹配项。

3. collections模块:提供额外的数据结构,如defaultdict, deque, Counter, OrderedDict等。

4. itertools模块:提供迭代器工具,如排列组合、无限迭代器等。

5. math模块:数学函数。

6. random模块:随机数生成。