2024-09-03



from fastapi import FastAPI
from fastapi_utils.tasks import repeat_every_second
from celery import Celery
 
# 初始化FastAPI应用
app = FastAPI()
 
# 创建Celery实例
def create_celery_app(app: FastAPI):
    celery = Celery('fastapi_celery', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
    celery.conf.update(
        result_expires=3600,
    )
    celery.conf.beat_schedule = {
        'add-every-second': {
            'task': 'fastapi_celery.tasks.add',
            'schedule': 1.0,
            'args': (1, 2)
        },
    }
    @celery.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 自定义定时任务
        pass
    return celery
 
# 创建Celery对象
celery_app = create_celery_app(app)
 
# 定义Celery任务
@celery_app.task
def add(x, y):
    return x + y
 
# 定义FastAPI路由
@app.get("/")
async def root():
    return {"message": "Hello World"}
 
# 启动FastAPI和Celery Worker
if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

这个代码示例展示了如何在FastAPI应用中集成Redis和Celery来实现异步任务和定时任务。首先,我们创建了一个Celery实例,指定了Redis作为消息代理和结果存储。然后,我们定义了一个定时任务“add-every-second”,它会每秒调用一个名为fastapi_celery.tasks.add的异步任务。最后,我们启动了FastAPI服务和Celery Worker。这个例子简洁明了,展示了如何将FastAPI与异步任务框架结合使用。

2024-09-03

要使用Psycopg2连接openGauss 3.0,你需要确保你的环境中安装了Psycopg2,并且你需要使用openGauss 3.0的正确连接字符串。

以下是一个简单的Python代码示例,展示了如何使用Psycopg2连接openGauss 3.0数据库:




# 导入Psycopg2模块
import psycopg2
 
# 定义openGauss 3.0的连接字符串
# 格式通常是: "dbname=数据库名 user=用户名 host=主机地址 port=端口号 password=密码"
conn_string = "dbname=your_db name user=your_username host=your_host_address port=your_port password=your_password"
 
# 尝试连接到openGauss 3.0数据库
try:
    # 连接到数据库
    conn = psycopg2.connect(conn_string)
 
    # 创建一个游标对象
    cursor = conn.cursor()
 
    # 编写SQL查询
    cursor.execute('SELECT version();')
 
    # 获取查询结果
    db_version = cursor.fetchone()
 
    # 打印结果
    print("openGauss version:", db_version)
 
    # 关闭游标
    cursor.close()
 
    # 关闭连接
    conn.close()
 
# 捕获异常并打印错误信息
except psycopg2.Error as e:
    print("Unable to connect to the database:", e)
 

请确保将conn_string中的your_db_name, your_username, your_host_address, your_port, 和 your_password替换为你的openGauss 3.0数据库的实际连接信息。

如果你正在使用Python 2,请注意Psycopg2可能不支持Python 2。你可能需要使用Psycopg2-binary或者将Python升级到Python 3。

2024-09-03



from pymongo import MongoClient
 
# 连接到MongoDB
client = MongoClient('mongodb://localhost:27017/')
 
# 选择数据库
db = client['mydatabase']
 
# 选择集合(类似于SQL中的表)
collection = db['mycollection']
 
# 插入文档
collection.insert_one({'name': 'Alice', 'age': 25, 'address': '123 Maple Lane'})
 
# 查询文档
alice = collection.find_one({'name': 'Alice'})
print(alice)
 
# 更新文档
collection.update_one({'name': 'Alice'}, {'$set': {'age': 30}})
 
# 删除文档
collection.delete_one({'name': 'Alice'})
 
# 关闭连接
client.close()

这段代码展示了如何在Python中使用pymongo库进行MongoDB的连接、数据库和集合的选择、文档的插入、查询、更新和删除操作。同时,代码中包含了错误处理和资源清理的部分,确保了在操作完成后正确关闭数据库连接。

2024-09-03

这是一个Python程序,它使用SQLite数据库来存储和检索用户的浏览器历史记录。它可以作为一个简单的桌面应用程序来运行,用户可以添加、查看和删除历史记录。




import sqlite3
import sys
 
def create_connection(db_file):
    try:
        conn = sqlite3.connect(db_file)
        return conn
    except sqlite3.Error as e:
        print(e)
        return None
 
def create_table(conn):
    try:
        c = conn.cursor()
        c.execute('''CREATE TABLE IF NOT EXISTS history
               (id INTEGER PRIMARY KEY, url TEXT, title TEXT, visit_date DATE)''')
    except sqlite3.Error as e:
        print(e)
 
def add_record(conn, url, title):
    try:
        c = conn.cursor()
        visit_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        c.execute("INSERT INTO history (url, title, visit_date) VALUES (?, ?, ?)", (url, title, visit_date))
        conn.commit()
    except sqlite3.Error as e:
        print(e)
 
def view_records(conn):
    try:
        c = conn.cursor()
        for row in c.execute('SELECT * FROM history ORDER BY visit_date DESC'):
            print(row)
    except sqlite3.Error as e:
        print(e)
 
def delete_record(conn, url):
    try:
        c = conn.cursor()
        c.execute('DELETE FROM history WHERE url=?', (url,))
        conn.commit()
    except sqlite3.Error as e:
        print(e)
 
def main():
    database = r"C:\path\to\database\history.db"
    conn = create_connection(database)
    create_table(conn)
 
    # Add a record
    add_record(conn, 'https://www.google.com', 'Google')
 
    # View all records
    print("Recent history:")
    view_records(conn)
 
    # Delete a record
    delete_record(conn, 'https://www.google.com')
 
    # Close the connection
    conn.close()
 
if __name__ == '__main__':
    main()

这个简易的程序展示了如何使用SQLite来存储和管理数据。它提供了创建数据库连接、创建数据表、添加记录、查看记录和删除记录的基本功能。虽然这个程序可以作为桌面应用运行,但它没有图形用户界面(GUI),所以更像是一个命令行工具。如果要将其作为浏览器历史记录的简单管理工具,可以添加一个简单的GUI,比如使用Tkinter或PyQt等图形库。

2024-09-03



import sqlite3
 
# 连接到SQLite数据库
# 数据库文件是 test.db,如果文件不存在,会自动在当前目录创建:
conn = sqlite3.connect('test.db')
 
# 创建一个Cursor:
cursor = conn.cursor()
 
# 执行一条SQL语句,创建user表:
cursor.execute('CREATE TABLE IF NOT EXISTS user (id VARCHAR(20) PRIMARY KEY, name VARCHAR(20))')
 
# 关闭Cursor:
cursor.close()
 
# 执行查询语句,查询user表的所有数据:
cursor = conn.cursor()
cursor.execute('SELECT * FROM user')
values = cursor.fetchall()
print(values)
 
# 使用参数化查询来防止SQL注入:
cursor.execute('INSERT INTO user (id, name) VALUES (?, ?)', ('001', 'Alice'))
 
# 提交事务:
conn.commit()
 
# 关闭Cursor和Connection:
cursor.close()
conn.close()

这段代码展示了如何使用Python的sqlite3库来连接SQLite数据库、创建表格、执行查询以及防止SQL注入。代码简洁明了,并在每个关键步骤后都有解释。

2024-09-03

Django是一个开放源代码的网页框架,用Python语言编写。它最初是被开发来帮助开发者更容易地创建博客网站,但自从2005年首次发布以来,它已经成为了一个功能丰富、应用广泛的网页框架。

以下是一些Django Python后端开发的常见解决方案和示例代码:

  1. 创建一个新的Django项目:



django-admin startproject myproject
  1. 创建一个新的应用程序:



python manage.py startapp myapp
  1. 运行开发服务器:



python manage.py runserver
  1. 创建一个模型:



from django.db import models
 
class Person(models.Model):
    first_name = models.CharField(max_length=30)
    last_name = models.CharField(max_length=30)
  1. 定义视图:



from django.http import HttpResponse
 
def home(request):
    return HttpResponse("Hello, World. I am your first view.")
  1. 定义URL:



from django.urls import path
from .views import home
 
urlpatterns = [
    path('', home, name='home'),
]
  1. 创建一个数据库迁移:



python manage.py makemigrations
python manage.py migrate
  1. 创建一个超级用户:



python manage.py createsuperuser
  1. 收集静态文件:



python manage.py collectstatic
  1. 清空数据库:



python manage.py flush

这些都是Django Python后端开发的基本操作和示例代码,可以帮助开发者快速建立起一个Django网站的骨架。

2024-09-03



import psycopg2
 
# 假设数据库连接信息如下
dbname = 'your_dbname'
user = 'your_username'
password = 'your_password'
host = 'localhost'
port = '5432'
 
# 获取存在的数据库名称
def get_existing_db_names():
    # 连接到PostgreSQL数据库
    conn = psycopg2.connect(
        dbname='postgres', 
        user=user, 
        password=password, 
        host=host, 
        port=port
    )
    cursor = conn.cursor()
    
    # 执行SQL查询
    cursor.execute("SELECT datname FROM pg_database;")
    rows = cursor.fetchall()
    
    # 关闭连接
    cursor.close()
    conn.close()
    
    # 返回数据库名称列表
    return [row[0] for row in rows]
 
# 强制断开PostgreSQL数据库连接
def force_disconnect_all_connections(dbname):
    # 连接到PostgreSQL数据库
    conn = psycopg2.connect(
        dbname='postgres', 
        user=user, 
        password=password, 
        host=host, 
        port=port
    )
    cursor = conn.cursor()
    
    # 执行SQL命令来断开所有连接
    cursor.execute(f"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{dbname}';")
    
    # 提交更改并关闭连接
    conn.commit()
    cursor.close()
    conn.close()
 
# 删除数据库
def drop_database(dbname):
    # 连接到PostgreSQL数据库
    conn = psycopg2.connect(
        dbname='postgres', 
        user=user, 
        password=password, 
        host=host, 
        port=port
    )
    cursor = conn.cursor()
    
    # 执行SQL删除命令
    cursor.execute(f"DROP DATABASE IF EXISTS {dbname};")
    
    # 提交更改并关闭连接
    conn.commit()
    cursor.close()
    conn.close()
 
# 示例使用
existing_dbs = get_existing_db_names()
print("存在的数据库:", existing_dbs)
force_disconnect_all_connections(dbname)
drop_database(dbname)
print(f"数据库 {dbname} 已删除。")

这段代码展示了如何使用Python和psycopg2库来连接PostgreSQL数据库,获取存在的数据库名称、强制断开特定数据库的所有连接,以及删除数据库。这些操作通常需要具有相应权限的用户来执行。

2024-09-03



from pymongo import MongoClient
import pymysql
import time
 
# 配置MySQL数据库连接信息
mysql_config = {
    "host": "localhost",
    "user": "root",
    "password": "your_mysql_password",
    "db": "your_mysql_db",
    "charset": "utf8mb4",
    "cursorclass": pymysql.cursors.DictCursor
}
 
# 配置MongoDB连接信息
mongo_config = {
    "host": "localhost",
    "port": 27017,
    "db": "your_mongo_db",
    "collection": "your_mongo_collection"
}
 
# 连接MySQL和MongoDB
mysql_conn = pymysql.connect(**mysql_config)
mongo_client = MongoClient(mongo_config["host"], mongo_config["port"])
db = mongo_client[mongo_config["db"]]
collection = db[mongo_config["collection"]]
 
# 定义同步数据的函数
def sync_data():
    with mysql_conn.cursor() as cursor:
        # 查询MySQL数据
        sql = "SELECT * FROM your_mysql_table"
        cursor.execute(sql)
        rows = cursor.fetchall()
        
        # 插入数据到MongoDB
        for row in rows:
            collection.insert_one(row)
 
# 主程序
if __name__ == "__main__":
    while True:
        try:
            sync_data()  # 执行同步数据的操作
            time.sleep(10)  # 间隔10秒执行下次同步
        except Exception as e:
            print(f"Error: {e}")  # 打印错误信息
            time.sleep(60)  # 出现错误则等待60秒后重试

这段代码展示了如何使用Python实现MySQL到MongoDB的实时数据同步。首先配置了MySQL和MongoDB的连接信息,然后定义了一个同步数据的函数sync_data,该函数连接MySQL数据库,查询数据,并插入到MongoDB中。最后,在主程序中循环执行同步操作,并在出现异常时进行错误处理。

2024-09-03

以下是一个简单的Django模型类示例,用于创建一个Book模型,该模型具有titlepublication_date两个字段:




from django.db import models
 
class Book(models.Model):
    title = models.CharField(max_length=100)
    publication_date = models.DateField(null=True, blank=True)
 
    def __str__(self):
        return self.title

在这个例子中,我们定义了一个Book类,它继承自models.Model。这个类有两个字段:title是一个字符字段,最大长度为100个字符;publication_date是一个日期字段,可以为空(null=True),在表单中也允许不填(blank=True)。__str__方法定义了当对象被转换为字符串时的表现形式,这里返回书的标题。

要使用这个模型,你需要将其添加到你的models.py文件中,然后运行以下命令来创建和应用数据库迁移:




python manage.py makemigrations
python manage.py migrate

这样就完成了一个简单的Django模型定义和迁移的例子。

2024-09-03

Python-oracledb是一个用于连接Oracle数据库的Python库。它是新一代的Oracle数据库驱动,提供了比其他类似库更好的性能和更多的功能。

以下是一个简单的例子,展示如何使用python-oracledb连接到Oracle数据库并执行一个查询:

首先,你需要安装python-oracledb库,可以使用pip进行安装:




pip install oracledb

然后,你可以使用以下Python代码连接到Oracle数据库并执行一个查询:




import oracledb
 
# 设置数据库连接信息
user = "your_username"
password = "your_password"
connect_string = "localhost:1521/orcl"  # orcl是你的数据库服务名
 
# 连接到数据库
connection = oracledb.connect(user, password, connect_string)
 
# 创建游标对象
cursor = connection.cursor()
 
# 执行SQL查询
cursor.execute("SELECT * FROM your_table")
 
# 获取查询结果
rows = cursor.fetchall()
for row in rows:
    print(row)
 
# 关闭游标和连接
cursor.close()
connection.close()

在这个例子中,你需要替换your_username, your_password, localhost:1521/orcl, 和your_table为你的Oracle数据库的实际用户名、密码、服务名和你想要查询的表名。这个代码片段展示了如何建立连接、执行查询、获取结果和关闭连接的基本步骤。