python 基于 sqlite 实现消息队列
import sqlite3
import uuid
class SQLiteQueue:
def __init__(self, db_path, queue_name):
self.db_path = db_path
self.queue_name = queue_name
self.conn = sqlite3.connect(db_path)
self.init_queue()
def init_queue(self):
"""初始化消息队列表"""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS queue (
id TEXT PRIMARY KEY,
data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
def put(self, data):
"""添加一个新的消息到队列"""
data_id = str(uuid.uuid4())
self.conn.execute("INSERT INTO queue (id, data) VALUES (?, ?)", (data_id, data))
self.conn.commit()
return data_id
def get(self):
"""从队列中取出一个消息"""
cursor = self.conn.execute("SELECT id, data FROM queue LIMIT 1")
row = cursor.fetchone()
if row:
self.conn.execute("DELETE FROM queue WHERE id = ?", (row[0],))
self.conn.commit()
return row[1]
def size(self):
"""获取队列中消息的数量"""
cursor = self.conn.execute("SELECT COUNT(*) FROM queue")
return cursor.fetchone()[0]
# 使用示例
queue = SQLiteQueue('queue.db', 'example_queue')
# 添加消息
msg_id = queue.put('Hello, World!')
print(f'Message added with ID: {msg_id}')
# 获取消息
message = queue.get()
print(f'Message received: {message}')
# 查看队列大小
queue_size = queue.size()
print(f'Queue size: {queue_size}')
这段代码定义了一个名为SQLiteQueue
的类,它提供了一个基于SQLite数据库实现的简单消息队列。它包括添加消息(put
)、获取消息(get
)和查看队列大小(size
)的方法。使用时,首先创建一个队列对象,然后可以添加消息并获取它们。这个示例提供了一个简单的消息队列实现,并展示了如何使用它。
评论已关闭