2024-11-30

Python3 asyncio — 异步 I/O,事件循环和并发工具

在 Python 编程中,asyncio 是用于编写并发代码的标准库。它使得 Python 程序能够高效地处理 I/O 密集型任务,如网络请求、文件读取等,特别是在需要同时处理大量并发任务时。asyncio 的核心思想是异步 I/O,基于事件循环机制,让多个任务在同一个线程中并发执行,而不需要多线程或多进程的复杂处理。

本文将详细介绍 Python3 中的 asyncio 模块,包括如何使用事件循环、异步 I/O 任务和并发工具,以及通过代码示例帮助你掌握其核心概念和应用场景。


一、什么是 asyncio

asyncio 是 Python 3.3 引入的标准库,用于支持异步编程。它提供了一个事件循环机制,可以在同一个线程中调度和执行多个 I/O 密集型任务。asyncio 使得 Python 能够以非阻塞的方式运行多个任务,从而提高了处理并发任务的效率。

1.1 主要概念

  • 事件循环(Event Loop):是 asyncio 的核心,负责调度任务和执行异步操作。它会不断地检查哪些任务已经完成,哪些任务需要等待,并在适当的时机运行它们。
  • 协程(Coroutines):是 Python 中定义的异步函数,它通过 async 关键字声明,await 关键字用于暂停协程的执行,等待某个异步操作完成。
  • 任务(Tasks):协程的封装,可以将多个协程调度并行执行。
  • Future:表示将来某个时刻完成的异步操作的结果。Future 是一个特殊的对象,它用来表示一个尚未完成的操作的结果。

二、事件循环(Event Loop)

事件循环是 asyncio 的基础,控制着异步操作的执行流程。在 asyncio 中,程序通过事件循环来执行多个协程。通常情况下,你不需要手动创建事件循环,asyncio.run() 函数会自动为你创建并运行事件循环。

2.1 使用 asyncio.run() 启动事件循环

import asyncio

async def say_hello():
    print("Hello, world!")

# 启动事件循环
asyncio.run(say_hello())

解释

  • 使用 async def 定义一个异步协程 say_hello()
  • asyncio.run() 启动事件循环,执行 say_hello() 协程。

2.2 事件循环中的异步任务

在事件循环中,多个异步任务可以并发执行。asyncio.create_task() 用于将协程包装成任务,并将其调度到事件循环中执行。

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    # 创建并启动任务
    task1_obj = asyncio.create_task(task1())
    task2_obj = asyncio.create_task(task2())
    
    # 等待任务完成
    await task1_obj
    await task2_obj

# 启动事件循环
asyncio.run(main())

解释

  • asyncio.create_task()task1()task2() 协程创建为任务,并交给事件循环。
  • await 等待任务完成,确保事件循环在所有任务完成后才结束。

输出

Task 1 started
Task 2 started
Task 2 finished
Task 1 finished

从输出可以看出,两个任务并发执行,task2() 完成后 task1() 继续执行。


三、异步 I/O 操作

异步 I/O 操作允许我们在等待 I/O 操作完成时不阻塞整个程序。asyncio 提供了异步版本的 I/O 操作,例如 asyncio.sleep()asyncio.gather() 等。

3.1 asyncio.sleep():模拟 I/O 操作

asyncio.sleep() 是一个模拟延时的异步操作。它不会阻塞事件循环,在等待过程中可以执行其他任务。

import asyncio

async def async_task(name, seconds):
    print(f"Task {name} started")
    await asyncio.sleep(seconds)
    print(f"Task {name} finished after {seconds} seconds")

async def main():
    # 启动多个异步任务
    await asyncio.gather(
        async_task("A", 2),
        async_task("B", 1),
        async_task("C", 3)
    )

# 启动事件循环
asyncio.run(main())

解释

  • asyncio.gather() 用来并发执行多个异步任务,并等待所有任务完成。
  • asyncio.sleep() 使当前协程暂停一段时间,模拟 I/O 操作。

输出

Task A started
Task B started
Task C started
Task B finished after 1 seconds
Task A finished after 2 seconds
Task C finished after 3 seconds

四、并发工具:asyncio.gather()await

4.1 asyncio.gather():并发执行多个协程

asyncio.gather() 可以并发执行多个协程,并等待它们的结果。它非常适用于需要并行执行多个任务的场景。

import asyncio

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(2)  # 模拟 I/O 操作
    return f"Data from {url}"

async def main():
    urls = ["https://example.com", "https://google.com", "https://github.com"]
    # 同时请求多个网址
    results = await asyncio.gather(*(fetch_data(url) for url in urls))
    print(results)

# 启动事件循环
asyncio.run(main())

解释

  • asyncio.gather() 用于并行执行多个协程,返回它们的结果。
  • *(fetch_data(url) for url in urls) 使用生成器表达式创建多个协程。

输出

Fetching data from https://example.com
Fetching data from https://google.com
Fetching data from https://github.com
Data from https://example.com
Data from https://google.com
Data from https://github.com

4.2 任务的结果处理

asyncio.gather() 会返回所有任务的结果,可以直接对结果进行处理。

import asyncio

async def task(name, seconds):
    await asyncio.sleep(seconds)
    return f"Task {name} finished after {seconds} seconds"

async def main():
    tasks = [
        asyncio.create_task(task("A", 1)),
        asyncio.create_task(task("B", 2)),
        asyncio.create_task(task("C", 3))
    ]
    
    # 获取任务结果
    results = await asyncio.gather(*tasks)
    print(results)

# 启动事件循环
asyncio.run(main())

解释

  • asyncio.create_task() 创建任务并并发执行。
  • asyncio.gather() 返回任务的执行结果。

输出

['Task A finished after 1 seconds', 'Task B finished after 2 seconds', 'Task C finished after 3 seconds']

五、总结

asyncio 是 Python3 提供的一个强大工具,用于简化异步 I/O 操作和并发编程。通过事件循环和协程,asyncio 能够高效地执行多个任务而不阻塞程序的执行。主要功能包括:

  • 异步 I/O 操作,避免阻塞。
  • 使用 asyncawait 定义协程。
  • 使用 asyncio.create_task() 调度任务。
  • 使用 asyncio.gather() 并发执行多个协程。

理解并掌握 asyncio 的基本概念和用法,可以帮助你高效地处理 I/O 密集型任务,提升程序的并发能力。

希望这篇教程能帮助你更好地理解 Python 中的异步编程,并应用到实际项目中!

2024-11-30

Python3 io — 文本、二进制和原生流的 I/O 工具

Python 提供了多种工具来处理输入输出(I/O)操作,其中 io 模块是一个非常重要的模块。它提供了对文本、二进制文件以及原生流操作的强大支持。本文将详细介绍 io 模块的使用,包括文本与二进制文件的读写、内存中的流操作以及其他常见应用场景,并通过代码示例帮助你更好地理解其功能。


一、什么是 io 模块?

io 模块是 Python 3 中用于处理 I/O 操作的标准库,支持文本流和二进制流的操作。io 提供了对文件、内存、管道等数据流的操作接口,涵盖了对各种流的读取、写入等常见操作。

1.1 主要的流类型

  • 文本流(Text I/O):用于处理字符数据,Python 使用 Unicode 编码对文本进行处理。
  • 二进制流(Binary I/O):用于处理原始字节数据。
  • 内存流(Memory I/O):允许在内存中进行 I/O 操作。

io 模块提供了这些流的类和方法,常见的类有:

  • io.TextIOWrapper:文本流
  • io.BytesIO:二进制流
  • io.StringIO:文本内存流

二、文本流操作

2.1 使用 TextIOWrapper 处理文本文件

文本文件用于处理字符数据。我们可以通过 open() 函数来创建文本文件的文件对象,或者使用 io 模块中的 TextIOWrapper 来进行流式处理。

示例:文本文件的读写操作

import io

# 写入文本文件
with open("example.txt", "w", encoding="utf-8") as file:
    file.write("Hello, Python I/O!")
    
# 读取文本文件
with open("example.txt", "r", encoding="utf-8") as file:
    content = file.read()
    print(content)

解释

  • 使用 open() 函数时,指定 "w" 模式表示写入模式,"r" 模式表示读取模式。
  • encoding="utf-8" 确保文本文件使用 UTF-8 编码。
  • file.read() 用于读取文件中的内容。

2.2 使用 TextIOWrapper 操作内存中的文本流

除了文件 I/O,我们也可以使用 StringIO 类来模拟内存中的文本文件。

示例:内存中的文本流

from io import StringIO

# 创建内存中文本流
text_stream = StringIO("Hello, Memory Stream!")

# 读取文本流
content = text_stream.read()
print(content)

# 向流中写入数据
text_stream.write("\nNew data added to memory stream.")

# 重置流位置到开始
text_stream.seek(0)
print(text_stream.read())

解释

  • StringIO 创建了一个内存中的文本流,我们可以像文件一样进行读写操作。
  • seek(0) 将流的位置指针重新设置到开始,以便再次读取。

三、二进制流操作

二进制流操作用于处理非字符数据(如图片、音频文件等)。io 模块通过 BytesIO 类提供了对内存中二进制数据流的支持。

3.1 使用 BytesIO 操作二进制数据

示例:操作二进制数据流

from io import BytesIO

# 创建内存中的二进制流
binary_stream = BytesIO(b"Hello, Binary Stream!")

# 读取二进制流
content = binary_stream.read()
print(content)

# 向二进制流写入数据
binary_stream.write(b"\nNew data added to binary stream.")

# 重置流位置到开始
binary_stream.seek(0)
print(binary_stream.read())

解释

  • BytesIO 创建了一个内存中的二进制流。
  • b"" 表示字节数据,read() 方法读取二进制内容。

3.2 处理二进制文件

对于二进制文件(如图片、音频文件等),我们也可以使用 open() 函数,并指定二进制模式来进行操作。

示例:读写二进制文件

# 写入二进制文件
with open("example.jpg", "wb") as file:
    file.write(b"Binary data content")

# 读取二进制文件
with open("example.jpg", "rb") as file:
    content = file.read()
    print(content)

解释

  • "wb""rb" 模式分别表示写入二进制文件和读取二进制文件。
  • 使用 file.write()file.read() 操作二进制数据。

四、原生流操作

原生流是处理系统级别 I/O 操作的一种方式,它不依赖于文件,而是直接与操作系统交互。通常原生流用于处理管道、套接字等低级 I/O。

示例:使用原生流

import os

# 获取系统的标准输入流
input_stream = os.fdopen(0, 'r')  # 0 表示标准输入

# 从标准输入读取数据
data = input_stream.read()
print(f"从标准输入读取到的数据: {data}")

解释

  • os.fdopen() 可以打开一个原生流,0 表示标准输入流。
  • read() 从原生流中读取数据。

五、io 模块的其他常见功能

5.1 open() 函数

Python 的内建 open() 函数支持文本和二进制文件的读写,它底层使用了 io 模块的流操作。可以通过设置不同的模式来控制文件操作:

  • "r": 读取文本文件
  • "w": 写入文本文件
  • "rb": 读取二进制文件
  • "wb": 写入二进制文件

示例:不同模式的文件操作

# 打开文本文件并读取
with open("example.txt", "r") as file:
    content = file.read()
    print(content)

# 打开二进制文件并读取
with open("example.jpg", "rb") as file:
    content = file.read()
    print(content)

六、总结

Python3 的 io 模块为我们提供了处理文本流、二进制流以及内存中的 I/O 操作的工具。通过 TextIOWrapperBytesIOStringIO 等类,Python 使得对流的操作变得更加简洁易用。掌握 io 模块的基本用法,将帮助你高效地进行文件、内存及其他低级数据流的处理。

在处理文件和数据时,合理选择流类型(文本流或二进制流)是关键。理解不同流的使用场景和操作方法,将极大提升你在 Python 中进行 I/O 操作的能力。

希望本文能帮助你更好地理解和掌握 io 模块的用法,提升你的 Python 编程技巧!

2024-11-30

Python3 contextlib — 上下文管理器工具

在 Python 中,上下文管理器(Context Manager)是一种对象,它定义了代码块的“入口”和“退出”行为,通常与 with 语句一起使用。Python 的 contextlib 模块提供了一些工具,帮助我们更简便地创建和管理上下文管理器。本文将深入探讨 Python3 中的 contextlib 模块,包括它的使用方法、常用工具、代码示例和详细说明。


一、什么是上下文管理器?

上下文管理器是用于管理资源的工具,它确保在某些特定代码块执行之前和之后进行资源的正确配置和释放。最常见的上下文管理器应用场景是文件操作,在打开文件后,我们需要确保文件被正确关闭,即使在文件操作过程中发生异常。

通常我们使用 with 语句来处理上下文管理器:

with open("file.txt", "r") as file:
    content = file.read()
# 文件在代码块执行完毕后自动关闭

在上面的例子中,open() 返回的文件对象是一个上下文管理器,它自动在 with 语句块结束后关闭文件。


二、contextlib 模块

contextlib 是 Python 中提供的一个标准库模块,专门用于支持和简化上下文管理器的创建和使用。它包含了若干非常有用的工具和函数,帮助我们更方便地管理代码中的资源。

2.1 使用 contextlibcontextmanager 装饰器

contextmanager 装饰器是 contextlib 中一个非常有用的工具,可以让我们通过简单的生成器函数来创建上下文管理器。它使得上下文管理器的创建变得简单而直观。

示例:使用 contextmanager 装饰器创建一个简单的上下文管理器

假设我们需要创建一个上下文管理器,它能帮助我们跟踪开始和结束时间。

from contextlib import contextmanager
import time

@contextmanager
def timing_context():
    start_time = time.time()
    print("开始计时")
    yield  # 在此处可以进行上下文中的代码块操作
    end_time = time.time()
    print(f"计时结束,总耗时:{end_time - start_time:.4f}秒")

# 使用上下文管理器
with timing_context():
    time.sleep(2)  # 模拟需要计时的操作

输出

开始计时
计时结束,总耗时:2.0001秒

解释

  • @contextmanager 装饰器将 timing_context 函数转换为上下文管理器。
  • yield 语句标记了上下文代码块的开始和结束。yield 前的代码在进入上下文时执行,而 yield 后的代码在退出上下文时执行。
  • 使用 with 语句时,timing_context() 上下文管理器会在执行块代码前后执行开始和结束的时间记录。

三、contextlib 中的其他实用功能

除了 contextmanagercontextlib 还提供了其他一些常用工具,下面我们来看几个常用的功能。

3.1 使用 closing 函数

closingcontextlib 提供的一个上下文管理器,它能够确保对象的 close() 方法被自动调用,常用于需要关闭的对象,如网络连接、数据库连接等。

示例:使用 closing 管理对象的关闭

from contextlib import closing
import sqlite3

# 创建一个简单的 SQLite 连接
with closing(sqlite3.connect("example.db")) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    print(cursor.fetchall())
# 连接自动关闭

解释

  • closing 确保无论上下文代码块是否正常执行结束,都能调用 conn.close() 来关闭数据库连接。

3.2 使用 suppress 函数

suppress 是一个上下文管理器,用于抑制指定的异常。它允许我们在某些情况下忽略特定的异常,而不需要显式的 try-except 块。

示例:使用 suppress 抑制异常

from contextlib import suppress

with suppress(FileNotFoundError):
    # 尝试打开一个不存在的文件,抑制 FileNotFoundError 异常
    open("nonexistent_file.txt")
print("继续执行后面的代码")

解释

  • with 语句块中,FileNotFoundError 异常被抑制。程序不会中断,而是继续执行后续的代码。

3.3 使用 nested 函数

nestedcontextlib 中的一个函数,它允许我们在同一个 with 语句块中使用多个上下文管理器,而不需要嵌套多个 with 语句。

示例:使用 nested 同时处理多个上下文

from contextlib import nested

with nested(open("file1.txt", "r"), open("file2.txt", "r")) as (file1, file2):
    content1 = file1.read()
    content2 = file2.read()
    print(content1, content2)

解释

  • nested 允许我们在同一个 with 语句块中同时打开多个文件。这里文件 file1.txtfile2.txt 同时被打开,代码块结束后文件会自动关闭。

四、总结

Python 的 contextlib 模块极大地简化了上下文管理器的创建和使用。通过 contextmanager 装饰器、closingsuppress 等工具,我们可以轻松地管理资源的获取和释放,避免冗长的 try-finally 语句,让代码更简洁易读。

通过本文的讲解,你已经掌握了 contextlib 模块的基本用法。无论是计时、资源管理还是异常处理,contextlib 都能为你提供强大的支持,帮助你高效地编写 Python 程序。

希望这篇教程能够帮助你更好地理解和应用 Python 中的上下文管理器工具!

2024-11-30

Python之科学计数法

在 Python 编程中,科学计数法是一种表示非常大或非常小的数字的方法。科学计数法使用基数与指数的形式,将数字转换为更简洁的表达方式。本文将详细介绍 Python 中科学计数法的使用,包括基本语法、代码示例、图解和详细说明,帮助你更好地理解并掌握这一概念。


一、什么是科学计数法?

科学计数法(Scientific Notation)是一种简洁的数字表示方式,通常用于表示非常大或非常小的数字。科学计数法的形式为:

a × 10^b

其中,a 是一个常数,b 是指数,表示 a 乘以 10 的 b 次方。在 Python 中,我们可以直接使用科学计数法来表示数字。

例如:

  • 123456789 可以表示为 1.23456789 × 10^8,在科学计数法中为 1.23456789e8
  • 0.0000123 可以表示为 1.23 × 10^-5,在科学计数法中为 1.23e-5

二、Python 中的科学计数法

2.1 使用科学计数法表示数字

在 Python 中,数字使用科学计数法时,我们只需要在数字和指数之间加上字母 eE。例如:

# 使用科学计数法表示数字
a = 1.23e5   # 1.23 × 10^5
b = 4.56E-3  # 4.56 × 10^-3

print(a)  # 输出:123000.0
print(b)  # 输出:0.00456

解释

  • 1.23e5 表示 1.23 乘以 10^5,即 123000.0
  • 4.56E-3 表示 4.56 乘以 10^-3,即 0.00456

2.2 科学计数法的运算

Python 允许直接对科学计数法中的数字进行运算,包括加法、减法、乘法、除法等。

x = 2.5e3  # 2.5 × 10^3 = 2500
y = 3.0e2  # 3.0 × 10^2 = 300

# 加法
result_add = x + y
print(f"{x} + {y} = {result_add}")  # 输出:2500.0 + 300.0 = 2800.0

# 乘法
result_multiply = x * y
print(f"{x} * {y} = {result_multiply}")  # 输出:2500.0 * 300.0 = 750000.0

# 除法
result_divide = x / y
print(f"{x} / {y} = {result_divide}")  # 输出:2500.0 / 300.0 = 8.333333333333334

2.3 格式化输出科学计数法

在输出时,如果需要将数字以科学计数法的格式显示,可以使用 Python 的字符串格式化功能。常用的格式符号包括 %e%.nf

# 使用%e格式符输出科学计数法
num = 1234567890
print(f"科学计数法表示:{num:e}")  # 输出:1.234568e+09

# 使用%.2e格式符输出保留两位小数的科学计数法
print(f"保留两位小数:{num:.2e}")  # 输出:1.23e+09

解释

  • %e 表示使用科学计数法格式化数字。
  • %.2e 表示使用科学计数法并保留两位小数。

三、科学计数法与浮点数

科学计数法通常用于表示浮点数。在 Python 中,浮点数是通过底层的双精度浮点表示来处理的。这意味着当数字较大或较小时,Python 会自动使用科学计数法来存储和表示浮点数。

# 自动转换为科学计数法
large_num = 12345678901234567890.0
small_num = 0.000000123456789

print(large_num)  # 输出:1.2345678901234568e+19
print(small_num)  # 输出:1.23456789e-07

四、科学计数法的优势与应用

4.1 优势

  • 简洁:科学计数法能够将非常大或非常小的数字表示得更加简洁,便于查看和处理。
  • 减少误差:对于浮点数来说,使用科学计数法能够避免精度损失和溢出问题,尤其是在处理非常大或非常小的数字时。
  • 计算效率:在数值计算中,使用科学计数法有时能提高计算效率,特别是在高精度计算时。

4.2 应用

  • 科学计算:例如物理学、天文学中的数字表示。
  • 机器学习:在处理大型数据集时,科学计数法用于存储和处理浮点数可以减少内存占用。
  • 金融领域:处理利率、货币、股市波动等数据时,常用科学计数法进行表示。

五、图解科学计数法

以下图解展示了科学计数法的基本概念:

科学计数法:a × 10^b
      |        |
      |        |
  基数a——> 数字的有效位数
      |        |
      |        |
    指数b——> 数字乘以10的多少次方
  • 例如 2.5e3,其中 a = 2.5b = 3,表示 2.5 × 10^3,即 2500

六、总结

科学计数法在 Python 中是一种非常实用的数字表示方法,尤其是在处理极大或极小的数值时。通过 Python 提供的格式化功能,我们可以轻松地使用科学计数法进行输入、输出和运算。掌握科学计数法的应用,不仅可以提高代码的可读性,还能帮助我们更高效地处理数值计算问题。

希望通过本文的讲解,你能够更加清楚地理解 Python 中科学计数法的使用方式,并能够在实际项目中灵活应用。

2024-11-30

一文弄懂 Python 中的缓存机制

在 Python 编程中,缓存机制是优化性能的重要手段之一。通过缓存,Python 可以避免重复计算,节省时间和计算资源。本文将深入讲解 Python 中的缓存机制,包括内存缓存、文件缓存等,结合代码示例、图解以及详细说明,让你轻松掌握 Python 中缓存的使用方法。


一、什么是缓存?

缓存是指在数据请求时,将计算结果或数据存储在易于访问的位置,以提高后续请求的处理速度。缓存可以是内存中的数据结构,也可以是文件系统中的文件。通过缓存,Python 可以避免重复的计算、查询操作,从而提高程序的运行效率。

缓存的常见应用场景:

  • 函数计算缓存:避免函数对相同输入值重复计算。
  • 数据库查询缓存:避免频繁查询相同的数据库记录。
  • Web 页面缓存:加速频繁访问的网页加载。

二、Python 中的缓存机制

2.1 Python 内置的缓存机制

Python 提供了一些内置的缓存机制,如 functools.lru_cache@property 等,用来提高性能。

2.1.1 使用 functools.lru_cache 进行函数缓存

functools.lru_cache 是一个装饰器,用来缓存函数的结果。当函数被调用时,若传入的参数已经被缓存过,则直接返回缓存值,避免了重复计算。

示例:lru_cache 用法
import functools

@functools.lru_cache(maxsize=128)
def expensive_function(x, y):
    print("计算中...")
    return x * y

# 第一次调用,计算结果
print(expensive_function(3, 5))  # 输出:计算中... 15

# 第二次调用,直接返回缓存结果
print(expensive_function(3, 5))  # 输出:15

解释

  • maxsize=128:指定缓存的最大数量,超过此数量的缓存会被删除(LRU:Least Recently Used,最近最少使用的缓存会被清除)。
  • 第一次调用时,expensive_function 会计算结果并返回。
  • 第二次调用时,lru_cache 会直接返回缓存的结果,避免了重复计算。

2.1.2 缓存失效策略

缓存有时需要失效,特别是在数据更新后。functools.lru_cache 提供了 cache_clear() 方法来手动清空缓存:

expensive_function.cache_clear()  # 清空缓存

2.2 自定义缓存机制

有时我们需要自定义缓存,特别是在特定的存储媒介上(如文件系统、数据库等)。下面是如何使用 Python 的字典和文件实现简单的缓存机制。

2.2.1 使用字典作为缓存存储

字典是一种高效的缓存存储方式,可以直接将函数的结果存储在字典中。

class SimpleCache:
    def __init__(self):
        self.cache = {}

    def get(self, key):
        return self.cache.get(key)

    def set(self, key, value):
        self.cache[key] = value

cache = SimpleCache()
cache.set("result", 42)

# 获取缓存数据
print(cache.get("result"))  # 输出:42

2.2.2 使用文件作为缓存存储

如果数据较大或需要持久化,可以将缓存数据存储到文件中。

import os
import pickle

class FileCache:
    def __init__(self, filename="cache.pkl"):
        self.filename = filename
        self.load_cache()

    def load_cache(self):
        if os.path.exists(self.filename):
            with open(self.filename, 'rb') as f:
                self.cache = pickle.load(f)
        else:
            self.cache = {}

    def save_cache(self):
        with open(self.filename, 'wb') as f:
            pickle.dump(self.cache, f)

    def get(self, key):
        return self.cache.get(key)

    def set(self, key, value):
        self.cache[key] = value
        self.save_cache()

cache = FileCache()
cache.set("user_data", {"name": "John", "age": 30})

# 获取缓存数据
print(cache.get("user_data"))  # 输出:{'name': 'John', 'age': 30}

三、缓存的优缺点

3.1 优点

  • 提升性能:缓存机制能够大幅度提升程序的运行效率,尤其是对于频繁计算的函数或查询操作。
  • 减少资源消耗:通过减少不必要的计算,降低了 CPU 和内存的消耗。

3.2 缺点

  • 内存消耗:缓存需要占用一定的内存空间。如果缓存管理不当,可能导致内存占用过高。
  • 缓存失效:缓存数据可能变得过时,需要有合适的失效机制。
  • 复杂度增加:在一些场景下,缓存机制可能会增加程序的复杂度,尤其是在处理缓存一致性时。

四、图解 Python 缓存机制

下图展示了 Python 缓存机制的基本流程:

  上次计算结果 -----------------------> 缓存
         |                                    |
         V                                    |
  请求数据 -> 检查缓存是否存在 -> 存在返回缓存值 -> 否则计算结果并缓存

五、常见的缓存策略

  • LRU(Least Recently Used):最近最少使用的缓存会被清除。
  • FIFO(First In, First Out):最早的缓存会被清除。
  • TTL(Time To Live):缓存具有有效期,过期后自动失效。
  • LFU(Least Frequently Used):最少使用的缓存会被清除。

六、总结

Python 中的缓存机制可以有效地提高程序的性能,尤其是在处理重复计算或查询时。通过 functools.lru_cache,字典,文件缓存等方法,Python 提供了多种缓存策略,帮助开发者根据需求优化程序性能。

在实际应用中,缓存机制需要合理设计,例如设置合适的缓存大小、清除过时数据等。理解并运用缓存机制,将为你的 Python 项目带来显著的性能提升。

2024-11-30

手把手一起完成 Python 上位机与下位机 USB 通信

USB 通信是嵌入式开发中常见的功能之一,上位机(通常是电脑)可以通过 USB 接口与下位机(通常是 MCU 开发板或其他硬件设备)进行数据交换。本文将带你一步步完成基于 Python 的上位机与下位机 USB 通信示例,包括代码实现、图解和详细说明。


一、USB 通信的基础概念

1.1 什么是 USB 通信?

USB(Universal Serial Bus)是计算机与外部设备之间通信的标准接口。通过 USB 通信,上位机和下位机可以交换指令和数据。

1.2 常见的 USB 通信模式

  • HID(Human Interface Device):用于鼠标、键盘等设备。
  • CDC(Communications Device Class):用于串口通信。
  • Bulk(大容量传输):用于文件传输。
  • ISO(同步传输):用于音频或视频设备。

1.3 本文通信模型

我们将采用 虚拟串口通信(CDC 模式) 的方式。即:

  • 下位机通过 USB 转换为虚拟串口。
  • 上位机通过 Python 访问该虚拟串口进行通信。

二、环境准备

2.1 硬件

  • 一块支持 USB 串口通信的开发板(如 Arduino、ESP32、STM32)。
  • USB 数据线。

2.2 软件

  • Python(建议使用 Python 3.7+)。
  • 必要的 Python 库:pyserial

安装 pyserial

pip install pyserial

三、下位机程序示例

以 Arduino 为例,实现一个简单的 USB 通信功能。

3.1 Arduino 下位机代码

void setup() {
  Serial.begin(9600);  // 初始化串口,波特率为 9600
}

void loop() {
  if (Serial.available() > 0) {  // 检测是否有数据从上位机发送过来
    String data = Serial.readString();  // 读取上位机发送的数据
    Serial.print("收到数据: ");        // 返回收到的数据
    Serial.println(data);
  }
}
  • 说明

    • 使用 Serial 类与上位机通信。
    • 上位机发送任意数据,下位机将其接收并回复。

3.2 上传代码到开发板

  • 使用 Arduino IDE 或其他编程工具,将代码上传到开发板。

四、上位机 Python 程序

使用 pyserial 库与下位机通信。

4.1 代码示例

import serial
import time

# 配置串口
ser = serial.Serial(
    port="COM3",      # 替换为你的设备端口
    baudrate=9600,    # 波特率,与下位机一致
    timeout=1         # 超时时间(秒)
)

# 等待设备初始化
time.sleep(2)

print("开始通信...")

try:
    while True:
        # 发送数据到下位机
        data_to_send = input("输入要发送的数据:")
        ser.write(data_to_send.encode("utf-8"))  # 编码后发送

        # 接收下位机返回的数据
        response = ser.readline().decode("utf-8").strip()
        if response:
            print(f"下位机回复: {response}")
except KeyboardInterrupt:
    print("通信结束!")
    ser.close()

五、运行结果

  1. 硬件连接

    • 使用 USB 数据线将开发板连接到电脑。
    • 在设备管理器中找到开发板的虚拟串口号(如 COM3)。
  2. 运行程序

    • 启动上位机程序,输入需要发送的数据:

      输入要发送的数据:Hello
      下位机回复: 收到数据: Hello
  3. 下位机输出

    • 通过串口监视器可以看到下位机的输出:

      收到数据: Hello

六、图解 USB 通信流程

上位机 (Python)                  下位机 (Arduino)
   输入数据:Hello  ------------------>
                              接收并解析数据
   <-- 回复数据:收到数据: Hello

七、常见问题及解决方法

7.1 端口占用

现象:运行程序时报错 Access is deniedPort is already in use
解决

  • 检查是否有其他程序占用了该串口(如串口监视器)。
  • 确保关闭其他占用串口的程序。

7.2 数据乱码

现象:收到的数据显示为乱码。
解决

  • 检查上下位机的波特率是否一致。
  • 确保使用正确的字符编码(如 UTF-8)。

7.3 下位机未响应

现象:下位机无返回数据。
解决

  • 确保 USB 数据线支持数据传输。
  • 检查下位机代码是否正常运行。

八、扩展功能

8.1 多线程实现异步通信

在实际应用中,可以使用多线程分别处理发送和接收数据,提高通信效率。

8.2 增加校验机制

为了保证通信的准确性,可以在传输的数据中添加校验位,例如 CRC 校验。

8.3 图形化界面

使用 Python 的 tkinterPyQt 库开发一个图形化界面,提升用户体验。


九、总结

通过本文,我们实现了一个基于 Python 的上位机和 Arduino 下位机之间的 USB 通信功能。希望这篇教程能够帮助你轻松掌握 USB 通信的基础知识,并为你的项目开发提供参考!

动手试试吧,你会发现 USB 通信并没有想象中那么难!

2024-11-30

Socket TCP 和 UDP 编程基础(Python)

网络编程是现代计算机科学的重要组成部分,而 socket 是 Python 提供的一个模块,用于实现网络通信。本文将详细介绍 TCP 和 UDP 两种通信协议的基础知识,配合 Python 代码示例和图解,让你轻松上手。


一、TCP 和 UDP 概述

1.1 TCP 协议

  • 特点:面向连接、可靠传输、适合数据量大且顺序重要的场景。
  • 应用:网页浏览、文件传输。
  • 通信模型

    • 客户端发起连接。
    • 服务端接受连接。
    • 双方建立可靠的数据传输通道。

1.2 UDP 协议

  • 特点:无连接、快速、不保证可靠传输。
  • 应用:视频直播、在线游戏。
  • 通信模型

    • 无需建立连接。
    • 数据直接发送到目标。

二、Python Socket 基础

socket 模块是 Python 提供的网络通信接口,可以实现 TCP 和 UDP 通信。

2.1 常用方法

方法说明
socket()创建一个新的 socket 对象
bind(address)绑定到指定的 IP 和端口
listen(backlog)在 TCP 中监听连接
accept()接受客户端连接
connect(address)客户端连接到服务器
send(data)发送数据
recv(buffer_size)接收数据
sendto(data, addr)向指定地址发送数据(UDP)
recvfrom(buffer_size)接收数据(UDP)
close()关闭 socket

三、TCP 编程

3.1 TCP 服务端代码

import socket

# 创建 TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("127.0.0.1", 65432))  # 绑定 IP 和端口
server_socket.listen(5)  # 最大连接数

print("服务端已启动,等待连接...")
conn, addr = server_socket.accept()  # 等待客户端连接
print(f"连接来自: {addr}")

# 接收并发送数据
data = conn.recv(1024).decode("utf-8")
print(f"收到客户端消息: {data}")
conn.send("你好,客户端!".encode("utf-8"))

# 关闭连接
conn.close()
server_socket.close()

3.2 TCP 客户端代码

import socket

# 创建 TCP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(("127.0.0.1", 65432))  # 连接服务器

# 发送数据
client_socket.send("你好,服务端!".encode("utf-8"))

# 接收数据
data = client_socket.recv(1024).decode("utf-8")
print(f"收到服务端消息: {data}")

# 关闭连接
client_socket.close()

运行结果

  • 服务端输出

    服务端已启动,等待连接...
    连接来自: ('127.0.0.1', 50000)
    收到客户端消息: 你好,服务端!
  • 客户端输出

    收到服务端消息: 你好,客户端!

TCP 通信模型图解

客户端               服务端
  |   connect()       |
  | ----------------> |
  |                   |
  |     accept()       |
  | <---------------- |
  |                   |
  |     send()         |
  | ----------------> |
  |                   |
  |     recv()         |
  | <---------------- |
  |                   |

四、UDP 编程

4.1 UDP 服务端代码

import socket

# 创建 UDP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(("127.0.0.1", 65432))  # 绑定 IP 和端口
print("UDP 服务端已启动,等待数据...")

# 接收数据
data, addr = server_socket.recvfrom(1024)
print(f"收到来自 {addr} 的消息: {data.decode('utf-8')}")

# 发送响应
server_socket.sendto("你好,客户端!".encode("utf-8"), addr)

server_socket.close()

4.2 UDP 客户端代码

import socket

# 创建 UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 发送数据
client_socket.sendto("你好,服务端!".encode("utf-8"), ("127.0.0.1", 65432))

# 接收响应
data, addr = client_socket.recvfrom(1024)
print(f"收到服务端消息: {data.decode('utf-8')}")

client_socket.close()

运行结果

  • 服务端输出

    UDP 服务端已启动,等待数据...
    收到来自 ('127.0.0.1', 50000) 的消息: 你好,服务端!
  • 客户端输出

    收到服务端消息: 你好,客户端!

UDP 通信模型图解

客户端               服务端
  |   sendto()        |
  | ----------------> |
  |                   |
  |     recvfrom()     |
  | <---------------- |

五、TCP 与 UDP 的区别

特性TCPUDP
是否连接面向连接无连接
数据可靠性高,保证数据按顺序到达不可靠,不保证顺序
速度较慢,需建立连接和确认快,无需连接
应用场景文件传输、网页浏览视频流、在线游戏

六、综合练习:多人聊天室

服务端代码

import socket
import threading

# 创建服务端
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("127.0.0.1", 65432))
server_socket.listen(5)
clients = []

def handle_client(conn, addr):
    print(f"新客户端加入: {addr}")
    while True:
        try:
            data = conn.recv(1024).decode("utf-8")
            if not data:
                break
            print(f"{addr}: {data}")
            for client in clients:
                if client != conn:
                    client.send(f"{addr}: {data}".encode("utf-8"))
        except:
            break
    conn.close()
    clients.remove(conn)

print("聊天室启动中...")
while True:
    conn, addr = server_socket.accept()
    clients.append(conn)
    threading.Thread(target=handle_client, args=(conn, addr)).start()

客户端代码

import socket
import threading

def receive_messages(client_socket):
    while True:
        try:
            message = client_socket.recv(1024).decode("utf-8")
            print(message)
        except:
            print("连接已关闭")
            break

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(("127.0.0.1", 65432))

threading.Thread(target=receive_messages, args=(client_socket,)).start()

print("加入聊天室!输入消息开始聊天...")
while True:
    msg = input()
    client_socket.send(msg.encode("utf-8"))

七、总结

通过本文,你应该已经掌握了 Python 中基于 socket 模块进行 TCP 和 UDP 编程的基本方法。无论是构建简单的客户端-服务端模型,还是开发复杂的网络应用,都可以从这些基础知识入手。希望你通过练习能够熟练运用这些技术!

2024-11-30

Python Selenium 的安装和教程

Selenium 是一款强大的 Web 自动化测试工具,它可以用来模拟浏览器操作、爬取动态数据或进行自动化任务。本文将全面介绍 Selenium 的安装和基本使用,通过代码示例和图解帮助你快速上手。


一、Selenium 的安装

1.1 安装 Selenium 库

首先,你需要安装 selenium 库:

pip install selenium

1.2 下载 WebDriver

Selenium 需要配合浏览器驱动 (WebDriver) 一起使用,不同浏览器对应的驱动如下:

下载后将驱动程序添加到系统的环境变量 PATH 中。


二、Selenium 的基本使用

2.1 启动浏览器

示例代码

以下代码演示如何启动 Chrome 浏览器并打开百度主页:

from selenium import webdriver

# 设置 WebDriver 路径
driver_path = "path/to/chromedriver"  # 替换为实际路径
driver = webdriver.Chrome(executable_path=driver_path)

# 打开百度
driver.get("https://www.baidu.com")

# 打印页面标题
print("页面标题:", driver.title)

# 关闭浏览器
driver.quit()

输出示例

页面标题: 百度一下,你就知道

2.2 查找页面元素

Selenium 提供了多种方式查找页面元素:

  • ID: find_element_by_id
  • 类名: find_element_by_class_name
  • CSS选择器: find_element_by_css_selector
  • XPath: find_element_by_xpath

示例代码

from selenium import webdriver

driver = webdriver.Chrome(executable_path="path/to/chromedriver")
driver.get("https://www.baidu.com")

# 查找搜索框并输入文字
search_box = driver.find_element_by_id("kw")
search_box.send_keys("Python Selenium")

# 点击“百度一下”按钮
search_button = driver.find_element_by_id("su")
search_button.click()

# 打印当前页面 URL
print("当前页面 URL:", driver.current_url)

# 关闭浏览器
driver.quit()

2.3 模拟用户操作

示例代码:自动登录示例

以自动登录 GitHub 为例:

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time

driver = webdriver.Chrome(executable_path="path/to/chromedriver")
driver.get("https://github.com/login")

# 输入用户名和密码
driver.find_element(By.ID, "login_field").send_keys("your_username")
driver.find_element(By.ID, "password").send_keys("your_password")

# 点击登录按钮
driver.find_element(By.NAME, "commit").click()

# 等待加载并打印登录结果
time.sleep(2)
print("登录成功" if "dashboard" in driver.current_url else "登录失败")

driver.quit()

三、常用功能示例

3.1 截屏功能

Selenium 可以截取页面截图:

driver.save_screenshot("screenshot.png")
print("截图已保存")

3.2 动态等待

在加载动态页面时,可以使用显式或隐式等待:

隐式等待

driver.implicitly_wait(10)  # 等待 10 秒

显式等待

from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

element = WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.ID, "some_id"))
)

3.3 滚动页面

滚动到页面底部:

driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

3.4 处理弹窗

示例代码:关闭弹窗

alert = driver.switch_to.alert
print("弹窗内容:", alert.text)
alert.accept()  # 点击“确定”

3.5 爬取动态网页数据

Selenium 可以用于爬取 JavaScript 动态渲染的内容。例如:

driver.get("https://quotes.toscrape.com/js/")
quotes = driver.find_elements(By.CLASS_NAME, "quote")
for quote in quotes:
    print(quote.text)

四、完整示例:自动化搜索并截图

from selenium import webdriver
from selenium.webdriver.common.by import By
import time

# 设置 WebDriver
driver = webdriver.Chrome(executable_path="path/to/chromedriver")

# 打开百度并搜索
driver.get("https://www.baidu.com")
search_box = driver.find_element(By.ID, "kw")
search_box.send_keys("Python Selenium 教程")
search_button = driver.find_element(By.ID, "su")
search_button.click()

# 等待加载完成并截图
time.sleep(2)
driver.save_screenshot("search_results.png")
print("搜索结果已截图保存")

# 关闭浏览器
driver.quit()

五、注意事项

  1. 浏览器版本匹配:确保 WebDriver 与浏览器的版本匹配,否则会报错。
  2. 反爬策略:很多网站对 Selenium 的行为进行检测,可以通过添加请求头或使用无头模式规避。
  3. 资源管理:使用完浏览器后务必调用 driver.quit() 释放资源。

六、总结

Selenium 是一个功能强大的工具,在 Web 自动化测试和动态数据抓取中有广泛应用。本文通过代码示例详细讲解了 Selenium 的基本用法及常见功能,希望能帮助你更高效地完成自动化任务。

如果想深入学习 Selenium,可以尝试结合 无头浏览器模式集成 pytest 框架 实现更复杂的应用!

2024-11-30

Python神器:psutil库使用详解

psutil 是 Python 中一个功能强大的第三方库,用于获取系统的运行状况和硬件信息,如 CPU、内存、磁盘、网络等资源的使用情况。这使得它在系统监控、资源管理和性能调试等场景中大有用途。

本文将全面介绍 psutil 的使用,配以详细代码示例和图解,助你快速上手这一神器!


一、psutil 的安装与基本概念

1.1 安装

在命令行中运行以下命令安装 psutil

pip install psutil

1.2 psutil 能做什么?

  • CPU 信息:获取 CPU 使用率、逻辑/物理核心数等。
  • 内存信息:包括总内存、可用内存、内存占用率等。
  • 磁盘信息:获取磁盘分区、使用情况和 IO 信息。
  • 网络信息:查看网络接口、连接状态和流量统计。
  • 进程管理:列举和管理系统进程。

二、获取系统资源信息

2.1 获取 CPU 信息

示例代码

import psutil

# CPU 核心数
print(f"物理核心数: {psutil.cpu_count(logical=False)}")
print(f"逻辑核心数: {psutil.cpu_count(logical=True)}")

# CPU 使用率
print(f"CPU 使用率: {psutil.cpu_percent(interval=1)}%")

# 每个核心的使用率
print(f"每个核心的使用率: {psutil.cpu_percent(interval=1, percpu=True)}")

输出示例

物理核心数: 4
逻辑核心数: 8
CPU 使用率: 12.3%
每个核心的使用率: [5.3, 10.1, 20.7, 12.9, 4.8, 7.5, 15.2, 10.6]

图解


2.2 获取内存信息

示例代码

# 获取内存使用情况
memory_info = psutil.virtual_memory()
print(f"总内存: {memory_info.total / 1024**3:.2f} GB")
print(f"已用内存: {memory_info.used / 1024**3:.2f} GB")
print(f"可用内存: {memory_info.available / 1024**3:.2f} GB")
print(f"内存使用率: {memory_info.percent}%")

输出示例

总内存: 16.00 GB
已用内存: 8.24 GB
可用内存: 7.76 GB
内存使用率: 51.5%

图解

一个简单的内存使用率饼图可以清晰展示当前内存的占用情况:


2.3 获取磁盘信息

示例代码

# 获取磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
    print(f"分区设备: {partition.device}")
    print(f"挂载点: {partition.mountpoint}")
    print(f"文件系统类型: {partition.fstype}")

# 获取磁盘使用情况
disk_usage = psutil.disk_usage('/')
print(f"磁盘总容量: {disk_usage.total / 1024**3:.2f} GB")
print(f"已用空间: {disk_usage.used / 1024**3:.2f} GB")
print(f"可用空间: {disk_usage.free / 1024**3:.2f} GB")
print(f"磁盘使用率: {disk_usage.percent}%")

输出示例

分区设备: /dev/sda1
挂载点: /
文件系统类型: ext4
磁盘总容量: 512.00 GB
已用空间: 120.23 GB
可用空间: 391.77 GB
磁盘使用率: 23.5%

2.4 获取网络信息

示例代码

# 获取网络接口信息
net_io = psutil.net_io_counters()
print(f"已发送数据: {net_io.bytes_sent / 1024**2:.2f} MB")
print(f"已接收数据: {net_io.bytes_recv / 1024**2:.2f} MB")

# 获取网络连接信息
connections = psutil.net_connections(kind='inet')
for conn in connections[:5]:  # 仅展示前 5 条连接
    print(f"本地地址: {conn.laddr}, 远程地址: {conn.raddr}, 状态: {conn.status}")

输出示例

已发送数据: 50.32 MB
已接收数据: 120.89 MB
本地地址: 127.0.0.1:8080, 远程地址: None, 状态: LISTEN
本地地址: 192.168.1.5:52415, 远程地址: 192.168.1.1:80, 状态: ESTABLISHED

三、进程管理

3.1 列举所有进程

示例代码

# 获取所有进程
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
    print(f"PID: {proc.info['pid']}, 名称: {proc.info['name']}, CPU 使用率: {proc.info['cpu_percent']}%")

示例输出

PID: 1, 名称: systemd, CPU 使用率: 0.0%
PID: 1234, 名称: python3, CPU 使用率: 10.3%
PID: 5678, 名称: chrome, CPU 使用率: 5.2%

3.2 操作进程

示例代码

# 获取某个进程的信息
pid = 1234  # 替换为实际 PID
try:
    process = psutil.Process(pid)
    print(f"进程名: {process.name()}")
    print(f"进程状态: {process.status()}")
    print(f"内存使用: {process.memory_info().rss / 1024**2:.2f} MB")
    print(f"CPU 使用率: {process.cpu_percent(interval=1)}%")
    
    # 杀死进程
    process.terminate()
    print(f"进程 {pid} 已终止")
except psutil.NoSuchProcess:
    print(f"进程 {pid} 不存在")

四、实时监控示例

一个实时监控系统资源的 Python 程序:

import psutil
import time

while True:
    cpu = psutil.cpu_percent(interval=1)
    memory = psutil.virtual_memory().percent
    disk = psutil.disk_usage('/').percent
    print(f"CPU: {cpu}%, 内存: {memory}%, 磁盘: {disk}%")
    time.sleep(1)

运行后可以实时查看系统的资源占用情况。


五、总结

psutil 是一个功能全面且易用的 Python 库,适用于多种场景,包括:

  • 开发系统监控工具;
  • 实现资源管理与性能调试;
  • 构建服务器性能监控脚本。

通过本文的详细教程,相信你已经掌握了 psutil 的核心功能,并能灵活运用于实际项目中!

2024-11-30

Python——多线程的共享变量用法

在多线程编程中,共享变量 是一个重要但容易出错的概念。多个线程访问或修改同一个变量时,可能会引发竞态条件(race condition),导致数据错误或不可预测的行为。本教程详细介绍多线程中共享变量的使用方法,并结合代码示例与图解,帮助你更好地理解和避免常见问题。


一、什么是共享变量?

共享变量是指多个线程能够同时访问的变量。例如,多个线程对同一个全局变量或同一个对象的属性进行读写操作。

示例:共享变量引发竞态条件

import threading

# 初始化共享变量
shared_counter = 0

def increment():
    global shared_counter
    for _ in range(100000):
        shared_counter += 1

# 创建线程
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

print(f"Final counter value: {shared_counter}")

输出结果:
最终的 shared_counter 可能不会是预期的 200,000,原因是多个线程在同时修改变量时,操作并不是原子的,导致了竞态条件。


二、解决共享变量问题的方法

Python 提供了几种机制来安全地操作共享变量。

2.1 使用锁(Lock)

锁(threading.Lock)是最常用的方式,用于防止多个线程同时访问共享资源。

示例:使用锁解决竞态条件

import threading

shared_counter = 0
lock = threading.Lock()

def increment_with_lock():
    global shared_counter
    for _ in range(100000):
        with lock:  # 使用锁保护共享变量
            shared_counter += 1

# 创建线程
thread1 = threading.Thread(target=increment_with_lock)
thread2 = threading.Thread(target=increment_with_lock)

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

print(f"Final counter value: {shared_counter}")

输出结果:
无论运行多少次,shared_counter 的值始终是 200,000。

图解:

  • 未加锁:

    • 线程 1 和线程 2 可能同时读取相同的值,导致操作冲突。
  • 加锁:

    • 线程 1 获取锁后操作共享变量,线程 2 必须等待锁释放。

2.2 使用条件变量(Condition)

条件变量是高级的同步机制,可以让线程在满足特定条件时继续执行。

示例:生产者-消费者模型

import threading
import time
from queue import Queue

queue = Queue(maxsize=5)
condition = threading.Condition()

def producer():
    for i in range(10):
        with condition:
            while queue.full():
                condition.wait()  # 等待队列有空位
            queue.put(i)
            print(f"Produced: {i}")
            condition.notify_all()  # 通知消费者

def consumer():
    for _ in range(10):
        with condition:
            while queue.empty():
                condition.wait()  # 等待队列有数据
            item = queue.get()
            print(f"Consumed: {item}")
            condition.notify_all()  # 通知生产者

# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()

输出结果:
生产者和消费者交替运行,保证了队列的安全操作。


2.3 使用线程安全的数据结构

Python 的 queue 模块提供了线程安全的数据结构(如 QueueLifoQueuePriorityQueue),无需手动加锁。

示例:使用线程安全的队列

from queue import Queue
import threading

queue = Queue()

def producer():
    for i in range(5):
        queue.put(i)
        print(f"Produced: {i}")

def consumer():
    while not queue.empty():
        item = queue.get()
        print(f"Consumed: {item}")

# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
producer_thread.join()  # 等生产者完成后再启动消费者
consumer_thread.start()
consumer_thread.join()

三、避免死锁

在多线程中使用锁时,需要注意死锁问题。死锁通常发生在多个线程同时等待对方释放锁的情况下。

示例:避免死锁的技巧

使用 threading.Lockthreading.RLock 的上下文管理器,确保锁总是被正确释放。

改进的死锁避免代码

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1_task():
    with lock1:
        print("Thread 1 acquired lock1")
        with lock2:
            print("Thread 1 acquired lock2")

def thread2_task():
    with lock2:
        print("Thread 2 acquired lock2")
        with lock1:
            print("Thread 2 acquired lock1")

t1 = threading.Thread(target=thread1_task)
t2 = threading.Thread(target=thread2_task)

t1.start()
t2.start()

t1.join()
t2.join()

通过控制加锁顺序或使用 RLock 可有效避免死锁。


四、图解多线程共享变量的流程

示例场景:两个线程共享一个计数器

  • 线程 1 和线程 2 都尝试增加计数器。
  • 加锁后,计数器修改变得有序且安全。
未加锁:       加锁:
线程 1 ---> 读取共享变量        线程 1 ---> 加锁
线程 2 ---> 读取共享变量        线程 2 ---> 等待锁释放
线程 1 ---> 修改变量            线程 1 ---> 修改变量
线程 2 ---> 修改变量            线程 1 ---> 释放锁

五、总结与最佳实践

  1. 始终保护共享变量:

    • 使用锁(LockRLock)保护共享变量。
    • 对复杂同步问题,考虑使用条件变量(Condition)或线程安全的数据结构。
  2. 尽量避免手动加锁:

    • 使用高层工具如 queue.Queue 来自动管理线程安全。
  3. 小心死锁:

    • 控制锁的顺序,避免多个锁之间的循环等待。
    • 尽量使用上下文管理器来管理锁。
  4. 线程池的使用:

    • 对于较大的并发任务,建议使用 concurrent.futures.ThreadPoolExecutor 来简化线程管理。

通过本文的讲解和示例代码,相信你已经掌握了在 Python 中多线程共享变量的安全使用方法。希望你能够灵活运用这些技巧,编写高效、稳定的多线程程序!