2024-09-02



import concurrent.futures
import urllib.request
 
# 定义一个函数,下载指定URL的网页内容
def download_website(url):
    with urllib.request.urlopen(url) as response:
        return response.read()
 
# 定义URL列表
urls = [
    'https://www.python.org',
    'https://www.python.org/about/',
    'https://www.onlamp.com/pub/a/python/2003/04/16/metaclasses.html',
    # ... 添加更多的URLs
]
 
# 使用concurrent.futures.ThreadPoolExecutor来并发下载网页
def download_websites(urls):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_url = {executor.submit(download_website, url): url for url in urls}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
                print(f'{url} page is {len(data)} bytes')
            except Exception as exc:
                print(f'{url} generated an exception: {exc}')
 
# 运行并发下载
download_websites(urls)

这段代码使用了concurrent.futures模块中的ThreadPoolExecutor来并发下载一系列网页。它定义了一个函数download_website来处理单个网页的下载,然后使用该函数来处理提供的URL列表。通过concurrent.futures.as_completed,我们可以按照任务完成的顺序来处理结果,而不是按照提交的顺序。这样可以提高效率,并且能够更好地处理异步任务。

2024-09-02



from pydantic import BaseModel
from typing import Optional
from databases import Database
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from fastapi_utils.cbv import cbv
from fastapi import FastAPI, Depends
 
app = FastAPI()
 
# 定义数据库配置
DATABASE_URL = "sqlite:///./test.db"
 
# 初始化数据库
database = Database(DATABASE_URL)
 
# 创建SQLAlchemy引擎
engine = create_engine(DATABASE_URL)
 
# 创建SQLAlchemy的基类
Base = declarative_base()
 
# 定义一个Pydantic模型,用于数据库表的映射
class Item(BaseModel):
    id: Optional[int] = None
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None
 
    class Config:
        orm_mode = True
 
# 定义数据库表
class ItemTable(Base):
    __tablename__ = "items"
    id = Base.Column(Base.Integer, primary_key=True, autoincrement=True)
    name = Base.Column(Base.String(255))
    description = Base.Column(Base.String(255))
    price = Base.Column(Base.Float)
    tax = Base.Column(Base.Float)
 
    def __str__(self):
        return self.name
 
# 创建表
Base.metadata.create_all(bind=engine)
 
# 创建SQLAlchemy会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
 
# 获取会话
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
 
# FastAPI路由依赖项
async def get_db_session():
    async with database.session() as session:
        yield session
 
# 创建CRUD操作的基类
class CRUDBase:
    def create(self, db: Session, *, obj_in: Item):
        db_obj = ItemTable(**obj_in.dict())
        db.add(db_obj)
        db.commit()
        db.refresh(db_obj)
        return db_obj
 
    def get(self, db: Session, *, id: int):
        return db.query(ItemTable).filter(ItemTable.id == id).first()
 
    def get_multi(self, db: Session, *, skip=0, limit=100):
        return db.query(ItemTable).offset(skip).limit(limit).all()
 
# 定义CRUD操作
class ItemCRUD(CRUDBase):
    def get_by_name(self, db: Session, *, name: str):
        return db.query(ItemTable).filter(ItemTable.name == name).first()
 
# 初始化CRUD操作
item_crud = ItemCRUD()
 
# FastAPI路由
@app.post("/items/", response_model=Item)
async def create_item(*, item_in: Item, session: Session = Depends(get_db_session)):
    return item_crud.create(session, obj_in=item_in)
 
@app.get("/items/{item_id}")
asy
2024-09-02



import sqlite3
import sys
 
# 连接到SQLite数据库
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
 
# 创建FTS5虚拟表,使用simple分词器,并建立全文索引
cursor.execute('''
DROP TABLE IF EXISTS fts;
CREATE VIRTUAL TABLE fts USING fts5(
    content TEXT,
    tokenize=simple
);
''')
 
# 向虚拟表中插入数据
cursor.execute('INSERT INTO fts(content) VALUES (?)', ('你好 世界',))
cursor.execute('INSERT INTO fts(content) VALUES (?)', ('你好 世界 编程',))
 
# 执行全文检索
cursor.execute('SELECT * FROM fts WHERE fts MATCH ?', ('"你好" OR "世界"',))
 
# 打印查询结果
for row in cursor.fetchall():
    print(row)
 
# 提交事务并关闭连接
conn.commit()
conn.close()

这段代码演示了如何在SQLite中使用FTS5和simple分词器进行中文拼音的全文检索。首先,它创建了一个FTS5虚拟表,并指定了使用simple分词器。然后,向虚拟表中插入了两条记录。最后,执行了一个全文检索查询,搜索包含“你好”或“世界”的记录。

2024-09-02

在Python 3中,可以使用标准库中的codecs模块来处理ASCII编码的二进制数据。以下是一个简单的例子,展示了如何将ASCII字符串编码为二进制表示,以及如何将二进制数据解码回ASCII字符串。




import codecs
 
# 将ASCII字符串编码为二进制数据
ascii_string = "hello"
binary_data = ascii_string.encode('ascii')
print(binary_data)  # 输出: b'hello'
 
# 将二进制数据解码回ASCII字符串
decoded_string = binary_data.decode('ascii')
print(decoded_string)  # 输出: hello

在这个例子中,encode('ascii')方法被用来将ASCII字符串转换成二进制数据,而decode('ascii')方法则用来将二进制数据转换回ASCII字符串。这里的输出b'hello'表明了Python 3中字符串的表示方式,b前缀表示这是一个二进制数据序列。

2024-09-02



from pymongo import MongoClient
 
# 连接到MongoDB
client = MongoClient('mongodb://localhost:27017/')
 
# 选择数据库
db = client['example_db']
 
# 选择集合(类似于SQL中的表)
collection = db['example_collection']
 
# 查询所有文档
for doc in collection.find():
    print(doc)
 
# 查询一个文档
doc = collection.find_one()
print(doc)
 
# 查询指定条件的文档
filter_condition = {'name': 'Alice'}
for doc in collection.find(filter_condition):
    print(doc)
 
# 查询指定字段的文档
projection = {'name': 1, '_id': 0}
for doc in collection.find(filter_condition, projection):
    print(doc)
 
# 查询排序
sort_condition = [('age', -1)]  # 按年龄降序排列
for doc in collection.find().sort(sort_condition):
    print(doc)
 
# 限制返回文档数量
for doc in collection.find().limit(3):
    print(doc)
 
# 跳过指定数量的文档
for doc in collection.find().skip(2):
    print(doc)

这段代码展示了如何使用pymongo库连接到MongoDB,选择数据库和集合,并执行查询操作,包括查找所有文档、查找一个文档、根据条件查找文档、指定查询字段、排序、限制返回数量和跳过文档数量。这些是进行MongoDB查询时常用的方法和技巧。

2024-09-02

Python的glob模块提供了一个函数glob(),它可以查找符合特定规则的文件路径名。该模块主要用于查找文件系统中的文件,可以使用简单的模式匹配。

以下是一些使用Python3的glob模块的常见方法:

  1. 查找所有.txt文件:



import glob
 
for filename in glob.glob('*.txt'):
    print(filename)
  1. 查找当前目录及子目录下的所有.txt文件:



import glob
 
for filename in glob.glob('**/*.txt', recursive=True):
    print(filename)
  1. 查找特定目录下的所有.txt文件:



import glob
 
for filename in glob.glob('/path/*.txt'):
    print(filename)
  1. 查找特定目录及子目录下的所有.txt文件:



import glob
 
for filename in glob.glob('/path/**/*.txt', recursive=True):
    print(filename)
  1. 查找多种文件类型:



import glob
 
for filename in glob.glob('*.txt', '*.jpg'):
    print(filename)
  1. 使用glob.iglob()进行迭代匹配:



import glob
 
file_iterator = glob.iglob(r'**/*.txt', recursive=True)
for filename in file_iterator:
    print(filename)

以上代码中,glob.glob()函数返回所有匹配的文件路径列表,而glob.iglob()返回一个可迭代的对象,可以用来逐个获取匹配的文件路径。

注意:**/*.txt中的**/用于匹配所有子目录。这种语法是shell的语法,在Python中,你需要使用两个星号**来表示任意层级的子目录。如果你在Windows系统中使用这种语法,请确保你的Python脚本是在raw字符串或者加转义的情况下使用。

2024-09-02



import peewee
 
# 假设我们已经定义了一个Model类,并且数据库已经连接
class MyModel(peewee.Model):
    # 这里是Model的字段定义
    name = peewee.CharField()
    value = peewee.IntegerField()
 
    class Meta:
        database = db_conn  # 假设db_conn是已经连接的数据库实例
 
# 创建表
MyModel.create_table()
 
# 插入数据
MyModel.create(name='example', value=100)
 
# 查询数据
query_result = MyModel.select().where(MyModel.name == 'example')
for entry in query_result:
    print(entry.value)  # 输出: 100
 
# 更新数据
MyModel.update(value=200).where(MyModel.name == 'example').execute()
 
# 删除数据
MyModel.delete().where(MyModel.name == 'example').execute()
 
# 删除表
MyModel.drop_table()

这个示例展示了如何使用Peewee ORM来操作SQLite数据库。首先定义了一个Model类,并通过create_table方法创建了对应的数据库表。然后展示了如何插入、查询、更新和删除数据。最后,通过drop_table方法删除了数据库表。这个过程是对SQLite数据库操作的基本范例。

2024-09-02

在选择Python Flask或Django框架进行Web开发时,主要考虑以下因素:

  1. 项目规模:小型项目可以考虑Flask,而大型项目更适合使用Django。
  2. 快速上手:如果你想快速开始项目,Flask更简单;而对于需要更多现成功能和支持的项目,Django可能是更好的选择。
  3. 学习曲线:Flask的学习曲线相对平滑,而Django有较为复杂的学习曲线。
  4. 社区支持:Flask相对来说社区支持较少,而Django有完整的社区支持和大量的第三方插件。
  5. 性能:在某些情况下,Django可能会比Flask慢,但Flask通过使用Blueprint和Jinja的cache机制可以优化性能。
  6. 部署:两者都有成熟的部署解决方案,但根据项目需求选择适合的部署方式。

以下是一个简单的Flask和Django项目比较的代码示例:

Flask(hello.py):




from flask import Flask
app = Flask(__name__)
 
@app.route('/')
def hello():
    return 'Hello, Flask!'

Django(hello/views.py):




from django.http import HttpResponse
 
def hello(request):
    return HttpResponse('Hello, Django!')

在Django中,你还需要定义URL模式(hello/urls.py):




from django.urls import path
from .views import hello
 
urlpatterns = [
    path('', hello),
]

并在项目的根URL配置中引入Django应用的URL模式(hello/urls.py):




from django.urls import include, path
 
urlpatterns = [
    path('hello/', include('hello.urls')),
]

最后,在Django项目的settings.py中添加应用:




INSTALLED_APPS = [
    # ...
    'hello',
]

两者都可以运行和开发Web应用,选择哪个取决于具体需求和项目的规模。

2024-09-02



import requests
from bs4 import BeautifulSoup
 
# 图书详情页URL
book_url = 'https://book.douban.com/subject/1084336/discussion/new?start=200'
 
# 发送HTTP请求
response = requests.get(book_url)
 
# 检查请求是否成功
if response.status_code == 200:
    # 使用BeautifulSoup解析页面
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # 提取图书名称
    book_name = soup.find('h1', class_='title').text
    print(f'图书名称: {book_name}')
    
    # 提取评论数
    comments_count = soup.find('span', class_='comments-count').text
    print(f'评论数: {comments_count}')
    
    # 提取评分
    rating_score = soup.find('strong', class_='ll rating-score').text
    print(f'评分: {rating_score}')
    
    # 提取作者信息
    author_info = soup.find('div', class_='info').text
    print(f'作者信息: {author_info}')
    
    # 提取书评摘要
    comment_summary = soup.find_all('div', class_='comment-summary')
    for summary in comment_summary:
        print(summary.text)
else:
    print('请求失败')

这段代码使用了requests库来发送HTTP请求,使用BeautifulSoup库来解析页面,并提取了图书名称、评论数、评分、作者信息和书评摘要。这是一个简单的网页爬取示例,适合作为学习如何使用Python进行网页爬取的起点。

2024-09-02

Python3 应用程序通常由以下主要组成元素构成:

  1. 程序入口:Python3 应用程序通常从 if __name__ == "__main__": 块开始执行。
  2. 变量和数据类型:Python3 支持多种数据类型,如整数(int)、浮点数(float)、字符串(str)、列表(list)、元组(tuple)、集合(set)、字典(dict)等。
  3. 控制流语句:Python3 支持条件语句(if, else)和循环语句(for, while)。
  4. 函数:函数是组织和重用代码的基本方式,使用 def 关键词定义。
  5. 类和对象:Python3 是一种面向对象的语言,使用 class 关键词定义类,之后可以创建类的实例(对象)。
  6. 异常处理:使用 try...except 块处理异常。
  7. 模块:Python3 使用模块来组织和重用代码,通过 import 语句导入。

下面是一个简单的Python3应用程序的例子:




# 程序入口
if __name__ == "__main__":
    # 变量和数据类型
    a = 10
    b = 20.5
    c = "Hello, World!"
    d = [1, 2, 3]
    e = (4, 5, 6)
    f = {"apple", "banana"}
    g = {"key1": "value1", "key2": "value2"}
 
    # 控制流语句
    if a > b:
        print(a)
    else:
        print(b)
 
    # 循环语句
    for i in range(3):
        print(i)
 
    # 函数
    def greet(name):
        return "Hello, " + name + "!"
    print(greet("Alice"))
 
    # 类和对象
    class MyClass:
        def __init__(self, value):
            self.value = value
        def display_value(self):
            print(self.value)
    obj = MyClass(100)
    obj.display_value()
 
    # 异常处理
    try:
        result = 10 / 0
    except ZeroDivisionError:
        print("Cannot divide by zero!")
    except Exception as e:
        print("An error occurred:", e)
 
    # 导入模块
    import math
    print(math.sqrt(a))

这个例子展示了Python3应用程序的基础结构和基本组件。