2024-08-19

在CentOS和Ubuntu上安装Python 3.12.2的步骤如下:

CentOS

  1. 安装依赖项:

    
    
    
    sudo yum groupinstall -y "Development Tools"
    sudo yum install -y openssl-devel bzip2-devel libffi-devel
  2. 下载Python 3.12.2源码:

    
    
    
    wget https://www.python.org/ftp/python/3.12.2/Python-3.12.2.tgz
  3. 解压源码:

    
    
    
    tar xzf Python-3.12.2.tgz
  4. 编译安装:

    
    
    
    cd Python-3.12.2
    ./configure --enable-optimizations
    make altinstall

使用 altinstall 是为了避免覆盖系统默认的Python版本。

Ubuntu

  1. 安装依赖项:

    
    
    
    sudo apt update
    sudo apt install -y build-essential zlib1g-dev libssl-dev libncurses5-dev libncursesw5-dev libreadline-dev libffi-dev
  2. 下载Python 3.12.2源码:

    
    
    
    wget https://www.python.org/ftp/python/3.12.2/Python-3.12.2.tgz
  3. 解压源码:

    
    
    
    tar xzf Python-3.12.2.tgz
  4. 编译安装:

    
    
    
    cd Python-3.12.2
    ./configure --enable-optimizations
    make altinstall

同样,使用 altinstall 是为了避免覆盖默认的Python版本。

请注意,Python 3.12.2可能不会在所有系统上完美工作,因为它是一个较新的版本,可能还需要一些时间来稳定和测试。在生产环境中,建议使用更加稳定的版本,如Python 3.8、3.9或3.10。

2024-08-19



# 检查变量是否为空字符串、空列表、空字典、None
def is_empty(var):
    if var == "" or var is None:
        return True
    elif type(var) is list:
        return len(var) == 0
    elif type(var) is dict:
        return len(var) == 0
    else:
        return False
 
# 示例
print(is_empty(""))  # 输出: True
print(is_empty(None))  # 输出: True
print(is_empty([]))  # 输出: True
print(is_empty({}))  # 输出: True
print(is_empty("非空字符串"))  # 输出: False
print(is_empty([1, 2, 3]))  # 输出: False
print(is_empty({"key": "value"}))  # 输出: False

这段代码定义了一个函数is_empty,它能够检查一个变量是否为空字符串、None、空列表、或空字典。它通过直接比较和类型检查来实现这一功能。代码简洁,易于理解,并提供了清晰的注释。

2024-08-19

Python的queue模块提供了一种线程间共享数据的安全方式,它实现了一个线程安全的FIFO(先进先出)队列。

以下是一些使用queue模块的常见方法:

  1. 创建队列:



import queue
 
# 创建一个线程安全的队列
q = queue.Queue()
  1. 添加元素到队列:



# 添加元素到队列
q.put('element')
  1. 从队列中取出元素:



# 从队列中取出元素
element = q.get()
  1. 检查队列是否为空:



# 检查队列是否为空
is_empty = q.empty()
  1. 获取队列中元素的数量:



# 获取队列中元素的数量
queue_size = q.qsize()
  1. 使用队列实现线程池:



import queue
import threading
import time
 
# 定义工作函数
def worker(q):
    while True:
        # 获取任务
        task = q.get()
        do_work(task)
        q.task_done()  # 任务完成标志
 
# 创建队列
work_queue = queue.Queue()
 
# 创建线程
threads = []
for i in range(4):
    t = threading.Thread(target=worker, args=(work_queue,))
    t.start()
    threads.append(t)
 
# 添加任务到队列
for task in tasks:
    work_queue.put(task)
 
# 等待所有任务完成
work_queue.join()
 
# 终止线程
for t in threads:
    t.join()

以上代码展示了如何使用queue模块创建一个线程安全的队列,并在多线程环境中使用它来管理任务。

2024-08-19



import asyncio
import aiohttp
 
async def fetch(session, url, semaphore):
    async with semaphore:
        async with session.get(url) as response:
            return await response.text()
 
async def main():
    urls = ['http://httpbin.org/delay/1', 'http://httpbin.org/delay/2']
    semaphore = asyncio.Semaphore(5)  # 最多同时进行5个网络请求
 
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

这段代码使用了aiohttp库来发送异步的HTTP GET请求,并使用了asyncio.Semaphore来限制同时进行的请求数量。这是一个简单的异步多任务爬虫示例,适合用于处理并发请求的场景。

2024-08-19

zipfile模块提供了对zip压缩文件的创建、读取、添加、删除以及解压等操作。

  1. 创建zip压缩文件



import zipfile
 
# 创建一个zip文件并写入内容
with zipfile.ZipFile('test.zip', 'w') as myzip:
    myzip.write('test.txt')
  1. 读取zip压缩文件



import zipfile
 
# 读取zip文件内容
with zipfile.ZipFile('test.zip', 'r') as myzip:
    myzip.printdir()  # 打印文件列表
    myzip.read('test.txt')  # 读取文件内容
  1. 向zip压缩文件中添加文件



import zipfile
 
# 向zip文件中添加文件
with zipfile.ZipFile('test.zip', 'a') as myzip:
    myzip.write('test2.txt')
  1. 从zip压缩文件中删除文件



import zipfile
 
# 删除zip文件中的文件
with zipfile.ZipFile('test.zip', 'r') as myzip:
    myzip.extract('test.txt', path='.')  # 先解压到当前目录
with zipfile.ZipFile('test.zip', 'a') as myzip:
    myzip.delete('test.txt')  # 再从zip文件中删除
  1. 解压zip压缩文件



import zipfile
 
# 解压zip文件
with zipfile.ZipFile('test.zip', 'r') as myzip:
    myzip.extractall(path='.')  # 解压到当前目录

以上代码展示了如何使用zipfile模块创建、读取、添加、删除以及解压zip文件。需要注意的是,在对zip文件进行写操作时,如果文件已存在,会默认覆盖。在读取或写入时,如果文件不存在或路径错误,会抛出FileNotFoundError异常。在添加、删除或解压时,如果操作的文件在zip中不存在,会抛出KeyError异常。

2024-08-19



import pandas as pd
 
# 创建一个简单的DataFrame
data = {'Name': ['John', 'Anna', 'Peter', 'Linda'],
        'Age': [28, 23, 34, 29]}
df = pd.DataFrame(data)
 
# 打印DataFrame
print(df)
 
# 将DataFrame导出到CSV文件
df.to_csv('output.csv', index=False)
 
# 从CSV文件读取数据到新的DataFrame
df_from_csv = pd.read_csv('output.csv')
 
# 打印新的DataFrame
print(df_from_csv)

这段代码展示了如何使用pandas库创建一个简单的DataFrame,并将其导出为CSV文件,然后再从CSV文件读取数据到新的DataFrame。这个过程是数据处理和分析的常见步骤,pandas库提供了丰富的功能来处理和分析数据。

2024-08-19

在Python中,进行分布式图像处理通常涉及到使用像Dask或者Ray这样的库。以下是一个简单的例子,使用Dask进行图像分布式处理的框架:




import dask.array as da
import numpy as np
from PIL import Image
 
# 假设你有一个大图像,我们将其分块
image_path = 'path_to_your_image.jpg'
img = np.array(Image.open(image_path))
 
# 假设块的大小为128x128
block_size = 128
 
# 将图像分块
blocks = [da.from_array(img[i:i+block_size, j:j+block_size]) for i in range(0, img.shape[0], block_size) for j in range(0, img.shape[1], block_size)]
 
# 现在你可以在每个块上应用任何Dask支持的函数,例如:
results = [block.mean().compute() for block in blocks]  # 计算每个块的平均值
 
# 处理完毕后,你可能需要将结果重新组织成一个图像
# 这里省略重组结果的代码

这个例子展示了如何将一个大图像分割成多个块,并且使用Dask的da.from_array函数来创建图像块的延迟数组。然后,你可以应用任何Dask支持的图像处理函数,例如.mean()来计算每个块的平均值。最后,使用.compute()来执行计算。

请注意,这只是一个简化的例子。在实际应用中,你可能需要考虑更多的并行处理细节,比如分块策略、任务调度、资源管理等。此外,图像处理完后的结果重组也需要额外的逻辑来保证图像的完整性和一致性。

2024-08-19

在Python爬虫Scrapy框架中,中间件是一种扩展机制,允许你自定义爬虫的请求和响应处理过程。

以下是一个简单的Scrapy中间件示例,用于限制爬虫的请求发送频率:




import random
from scrapy.downloadermiddlewares.robotstxt import RobotsTxtMiddleware
 
class RandomUserAgentMiddleware(object):
    """
    随机更换请求的User-Agent
    """
    def __init__(self, user_agent='Scrapy'):
        self.user_agent = user_agent
 
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            user_agent=crawler.settings.get('USER_AGENT')
        )
 
    def process_request(self, request, spider):
        user_agent = random.choice(spider.settings['USER_AGENT_LIST'])
        request.headers.setdefault('User-Agent', user_agent)
 
class ProxyMiddleware(object):
    """
    代理IP中间件
    """
    def process_request(self, request, spider):
        proxy = spider.settings['PROXY']
        request.meta['proxy'] = proxy
 
class CustomDownloaderMiddleware(object):
    """
    自定义下载器中间件
    """
    def process_response(self, request, response, spider):
        # 自定义处理下载器得到的响应
        return response
 
class CustomRobotsMiddleware(RobotsTxtMiddleware):
    """
    自定义的Robots.txt中间件
    """
    def process_request(self, request, spider):
        # 自定义处理Robots.txt的逻辑
        return super(CustomRobotsMiddleware, self).process_request(request, spider)

在这个例子中,我们定义了三个中间件:RandomUserAgentMiddleware用于随机更换请求的User-Agent,ProxyMiddleware用于设置代理,CustomDownloaderMiddleware用于自定义处理响应。同时,我们还创建了一个CustomRobotsMiddleware来自定义处理Robots.txt的逻辑。

要在Scrapy中使用这些中间件,你需要在爬虫的settings.py文件中进行相应的配置。例如:




DOWNLOADER_MIDDLEWARES = {
    'myproject.middlewares.RandomUserAgentMiddleware': 400,
    'myproject.middlewares.ProxyMiddleware': 410,
    'myproject.middlewares.CustomDownloaderMiddleware': 420,
    'myproject.middlewares.CustomRobotsMiddleware': 430,
}
 
USER_AGENT_LIST = [
    'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)',
    # ... 其他User-Agent字符串
]
 
PROXY = 'http://12.34.56.78:9010'

在这个配置中,每个中间件被赋予了一个唯一的优先级,数字越小表示优先级越高。USER_AGENT_LISTPROXY也需要在settings.py中进行相应的配置。

2024-08-19



import requests
 
# 定义一个简单的爬虫函数,用于获取指定URL的内容
def crawl_content(url):
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        else:
            return "Error: Server responded with status code {}".format(response.status_code)
    except requests.exceptions.RequestException as e:
        return "Error: {}".format(e)
 
# 示例URL
example_url = "https://www.example.com"
 
# 调用函数并打印结果
print(crawl_content(example_url))

这段代码使用了Python的requests库来发送HTTP GET请求,并获取目标URL的内容。函数crawl_content捕获异常,确保在遇到网络问题时能够返回错误信息。这个简单的例子展示了如何使用Python进行基本的网络爬虫操作。

2024-08-19

以下是一个使用Python的requests和BeautifulSoup库来爬取网站信息并将其保存到文件的基本示例。




import requests
from bs4 import BeautifulSoup
 
# 目标网址
url = 'http://example.com/'
 
# 发送HTTP请求
response = requests.get(url)
 
# 检查请求是否成功
if response.status_code == 200:
    # 解析响应内容
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # 提取你需要的信息,例如页面标题
    title = soup.title.text
    
    # 将信息写入文件
    with open('output.txt', 'w', encoding='utf-8') as file:
        file.write(title)
    print(f'网站标题已保存到 output.txt: {title}')
else:
    print('网页请求失败')

确保你已经安装了requests和beautifulsoup4库,可以使用以下命令安装:




pip install requests beautifulsoup4

这段代码会发送一个HTTP GET请求到指定的网址,然后使用BeautifulSoup解析HTML内容,提取页面标题,并将其保存到当前目录下的output.txt文件中。你可以根据需要修改这段代码,以提取不同的网页信息或保存到不同的文件。