from pydantic import BaseModel
from typing import Optional
from databases import Database
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from fastapi_utils.cbv import cbv
from fastapi import FastAPI, Depends
app = FastAPI()
# 定义数据库配置
DATABASE_URL = "sqlite:///./test.db"
# 初始化数据库
database = Database(DATABASE_URL)
# 创建SQLAlchemy引擎
engine = create_engine(DATABASE_URL)
# 创建SQLAlchemy的基类
Base = declarative_base()
# 定义一个Pydantic模型,用于数据库表的映射
class Item(BaseModel):
id: Optional[int] = None
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
class Config:
orm_mode = True
# 定义数据库表
class ItemTable(Base):
__tablename__ = "items"
id = Base.Column(Base.Integer, primary_key=True, autoincrement=True)
name = Base.Column(Base.String(255))
description = Base.Column(Base.String(255))
price = Base.Column(Base.Float)
tax = Base.Column(Base.Float)
def __str__(self):
return self.name
# 创建表
Base.metadata.create_all(bind=engine)
# 创建SQLAlchemy会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 获取会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# FastAPI路由依赖项
async def get_db_session():
async with database.session() as session:
yield session
# 创建CRUD操作的基类
class CRUDBase:
def create(self, db: Session, *, obj_in: Item):
db_obj = ItemTable(**obj_in.dict())
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get(self, db: Session, *, id: int):
return db.query(ItemTable).filter(ItemTable.id == id).first()
def get_multi(self, db: Session, *, skip=0, limit=100):
return db.query(ItemTable).offset(skip).limit(limit).all()
# 定义CRUD操作
class ItemCRUD(CRUDBase):
def get_by_name(self, db: Session, *, name: str):
return db.query(ItemTable).filter(ItemTable.name == name).first()
# 初始化CRUD操作
item_crud = ItemCRUD()
# FastAPI路由
@app.post("/items/", response_model=Item)
async def create_item(*, item_in: Item, session: Session = Depends(get_db_session)):
return item_crud.create(session, obj_in=item_in)
@app.get("/items/{item_id}")
asy
import sqlite3
import sys
# 连接到SQLite数据库
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建FTS5虚拟表,使用simple分词器,并建立全文索引
cursor.execute('''
DROP TABLE IF EXISTS fts;
CREATE VIRTUAL TABLE fts USING fts5(
content TEXT,
tokenize=simple
);
''')
# 向虚拟表中插入数据
cursor.execute('INSERT INTO fts(content) VALUES (?)', ('你好 世界',))
cursor.execute('INSERT INTO fts(content) VALUES (?)', ('你好 世界 编程',))
# 执行全文检索
cursor.execute('SELECT * FROM fts WHERE fts MATCH ?', ('"你好" OR "世界"',))
# 打印查询结果
for row in cursor.fetchall():
print(row)
# 提交事务并关闭连接
conn.commit()
conn.close()
这段代码演示了如何在SQLite中使用FTS5和simple分词器进行中文拼音的全文检索。首先,它创建了一个FTS5虚拟表,并指定了使用simple分词器。然后,向虚拟表中插入了两条记录。最后,执行了一个全文检索查询,搜索包含“你好”或“世界”的记录。
在Python 3中,可以使用标准库中的codecs
模块来处理ASCII编码的二进制数据。以下是一个简单的例子,展示了如何将ASCII字符串编码为二进制表示,以及如何将二进制数据解码回ASCII字符串。
import codecs
# 将ASCII字符串编码为二进制数据
ascii_string = "hello"
binary_data = ascii_string.encode('ascii')
print(binary_data) # 输出: b'hello'
# 将二进制数据解码回ASCII字符串
decoded_string = binary_data.decode('ascii')
print(decoded_string) # 输出: hello
在这个例子中,encode('ascii')
方法被用来将ASCII字符串转换成二进制数据,而decode('ascii')
方法则用来将二进制数据转换回ASCII字符串。这里的输出b'hello'
表明了Python 3中字符串的表示方式,b
前缀表示这是一个二进制数据序列。
from pymongo import MongoClient
# 连接到MongoDB
client = MongoClient('mongodb://localhost:27017/')
# 选择数据库
db = client['example_db']
# 选择集合(类似于SQL中的表)
collection = db['example_collection']
# 查询所有文档
for doc in collection.find():
print(doc)
# 查询一个文档
doc = collection.find_one()
print(doc)
# 查询指定条件的文档
filter_condition = {'name': 'Alice'}
for doc in collection.find(filter_condition):
print(doc)
# 查询指定字段的文档
projection = {'name': 1, '_id': 0}
for doc in collection.find(filter_condition, projection):
print(doc)
# 查询排序
sort_condition = [('age', -1)] # 按年龄降序排列
for doc in collection.find().sort(sort_condition):
print(doc)
# 限制返回文档数量
for doc in collection.find().limit(3):
print(doc)
# 跳过指定数量的文档
for doc in collection.find().skip(2):
print(doc)
这段代码展示了如何使用pymongo
库连接到MongoDB,选择数据库和集合,并执行查询操作,包括查找所有文档、查找一个文档、根据条件查找文档、指定查询字段、排序、限制返回数量和跳过文档数量。这些是进行MongoDB查询时常用的方法和技巧。
Python的glob
模块提供了一个函数glob()
,它可以查找符合特定规则的文件路径名。该模块主要用于查找文件系统中的文件,可以使用简单的模式匹配。
以下是一些使用Python3的glob
模块的常见方法:
- 查找所有
.txt
文件:
import glob
for filename in glob.glob('*.txt'):
print(filename)
- 查找当前目录及子目录下的所有
.txt
文件:
import glob
for filename in glob.glob('**/*.txt', recursive=True):
print(filename)
- 查找特定目录下的所有
.txt
文件:
import glob
for filename in glob.glob('/path/*.txt'):
print(filename)
- 查找特定目录及子目录下的所有
.txt
文件:
import glob
for filename in glob.glob('/path/**/*.txt', recursive=True):
print(filename)
- 查找多种文件类型:
import glob
for filename in glob.glob('*.txt', '*.jpg'):
print(filename)
- 使用
glob.iglob()
进行迭代匹配:
import glob
file_iterator = glob.iglob(r'**/*.txt', recursive=True)
for filename in file_iterator:
print(filename)
以上代码中,glob.glob()
函数返回所有匹配的文件路径列表,而glob.iglob()
返回一个可迭代的对象,可以用来逐个获取匹配的文件路径。
注意:**/*.txt
中的**/
用于匹配所有子目录。这种语法是shell的语法,在Python中,你需要使用两个星号**
来表示任意层级的子目录。如果你在Windows系统中使用这种语法,请确保你的Python脚本是在raw字符串或者加转义的情况下使用。
import peewee
# 假设我们已经定义了一个Model类,并且数据库已经连接
class MyModel(peewee.Model):
# 这里是Model的字段定义
name = peewee.CharField()
value = peewee.IntegerField()
class Meta:
database = db_conn # 假设db_conn是已经连接的数据库实例
# 创建表
MyModel.create_table()
# 插入数据
MyModel.create(name='example', value=100)
# 查询数据
query_result = MyModel.select().where(MyModel.name == 'example')
for entry in query_result:
print(entry.value) # 输出: 100
# 更新数据
MyModel.update(value=200).where(MyModel.name == 'example').execute()
# 删除数据
MyModel.delete().where(MyModel.name == 'example').execute()
# 删除表
MyModel.drop_table()
这个示例展示了如何使用Peewee ORM来操作SQLite数据库。首先定义了一个Model类,并通过create_table
方法创建了对应的数据库表。然后展示了如何插入、查询、更新和删除数据。最后,通过drop_table
方法删除了数据库表。这个过程是对SQLite数据库操作的基本范例。
在选择Python Flask或Django框架进行Web开发时,主要考虑以下因素:
- 项目规模:小型项目可以考虑Flask,而大型项目更适合使用Django。
- 快速上手:如果你想快速开始项目,Flask更简单;而对于需要更多现成功能和支持的项目,Django可能是更好的选择。
- 学习曲线:Flask的学习曲线相对平滑,而Django有较为复杂的学习曲线。
- 社区支持:Flask相对来说社区支持较少,而Django有完整的社区支持和大量的第三方插件。
- 性能:在某些情况下,Django可能会比Flask慢,但Flask通过使用Blueprint和Jinja的cache机制可以优化性能。
- 部署:两者都有成熟的部署解决方案,但根据项目需求选择适合的部署方式。
以下是一个简单的Flask和Django项目比较的代码示例:
Flask(hello.py):
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello():
return 'Hello, Flask!'
Django(hello/views.py):
from django.http import HttpResponse
def hello(request):
return HttpResponse('Hello, Django!')
在Django中,你还需要定义URL模式(hello/urls.py):
from django.urls import path
from .views import hello
urlpatterns = [
path('', hello),
]
并在项目的根URL配置中引入Django应用的URL模式(hello/urls.py):
from django.urls import include, path
urlpatterns = [
path('hello/', include('hello.urls')),
]
最后,在Django项目的settings.py中添加应用:
INSTALLED_APPS = [
# ...
'hello',
]
两者都可以运行和开发Web应用,选择哪个取决于具体需求和项目的规模。
import requests
from bs4 import BeautifulSoup
# 图书详情页URL
book_url = 'https://book.douban.com/subject/1084336/discussion/new?start=200'
# 发送HTTP请求
response = requests.get(book_url)
# 检查请求是否成功
if response.status_code == 200:
# 使用BeautifulSoup解析页面
soup = BeautifulSoup(response.text, 'html.parser')
# 提取图书名称
book_name = soup.find('h1', class_='title').text
print(f'图书名称: {book_name}')
# 提取评论数
comments_count = soup.find('span', class_='comments-count').text
print(f'评论数: {comments_count}')
# 提取评分
rating_score = soup.find('strong', class_='ll rating-score').text
print(f'评分: {rating_score}')
# 提取作者信息
author_info = soup.find('div', class_='info').text
print(f'作者信息: {author_info}')
# 提取书评摘要
comment_summary = soup.find_all('div', class_='comment-summary')
for summary in comment_summary:
print(summary.text)
else:
print('请求失败')
这段代码使用了requests库来发送HTTP请求,使用BeautifulSoup库来解析页面,并提取了图书名称、评论数、评分、作者信息和书评摘要。这是一个简单的网页爬取示例,适合作为学习如何使用Python进行网页爬取的起点。
Python3 应用程序通常由以下主要组成元素构成:
- 程序入口:Python3 应用程序通常从
if __name__ == "__main__":
块开始执行。 - 变量和数据类型:Python3 支持多种数据类型,如整数(int)、浮点数(float)、字符串(str)、列表(list)、元组(tuple)、集合(set)、字典(dict)等。
- 控制流语句:Python3 支持条件语句(if, else)和循环语句(for, while)。
- 函数:函数是组织和重用代码的基本方式,使用
def
关键词定义。 - 类和对象:Python3 是一种面向对象的语言,使用
class
关键词定义类,之后可以创建类的实例(对象)。 - 异常处理:使用
try...except
块处理异常。 - 模块:Python3 使用模块来组织和重用代码,通过
import
语句导入。
下面是一个简单的Python3应用程序的例子:
# 程序入口
if __name__ == "__main__":
# 变量和数据类型
a = 10
b = 20.5
c = "Hello, World!"
d = [1, 2, 3]
e = (4, 5, 6)
f = {"apple", "banana"}
g = {"key1": "value1", "key2": "value2"}
# 控制流语句
if a > b:
print(a)
else:
print(b)
# 循环语句
for i in range(3):
print(i)
# 函数
def greet(name):
return "Hello, " + name + "!"
print(greet("Alice"))
# 类和对象
class MyClass:
def __init__(self, value):
self.value = value
def display_value(self):
print(self.value)
obj = MyClass(100)
obj.display_value()
# 异常处理
try:
result = 10 / 0
except ZeroDivisionError:
print("Cannot divide by zero!")
except Exception as e:
print("An error occurred:", e)
# 导入模块
import math
print(math.sqrt(a))
这个例子展示了Python3应用程序的基础结构和基本组件。
from django.http import HttpResponse
from django.views import View
class GreetingView(View):
"""
Django视图类示例:返回简单的问候。
"""
def get(self, request):
"""
处理GET请求,并返回问候消息。
"""
return HttpResponse("Hello, World!")
# 使用URL路由配置
# 在urls.py文件中添加如下路由配置:
# path('greeting/', GreetingView.as_view(), name='greeting')
这段代码定义了一个名为GreetingView
的Django视图类,它处理GET请求并返回一个简单的问候消息。在实际的Web应用中,你可以通过Django的URL配置将GreetingView
与一个URL路径关联起来,使得用户可以通过浏览器访问这个视图,并获取问候消息。