数据库无论对于生产管理还是很多的实际应用都非常重要。小编这次聊一下数据库事件触发的应用。示例使用了postgresql和Python。在本文中,事件触发和处理大概地分为两类:
数据库的事件触发和服务器内部处理(1~4)
数据库事件触发后,客户端的程序检测到该事件的触发对应的处理(5~6)
在数据库系统中,事件触发(通常指数据库触发器)以及读取事件触发的信息用于多种场景和需求。请看两组示例(1~4)和(5~6)。
1. 数据一致性和完整性维护
当对数据库表中的数据进行插入、更新或删除操作时,需要自动验证或调整相关数据,以确保它们符合业务规则或约束。例如,在一个订单管理系统中,如果库存数量减少到一定阈值以下,可以触发一个警告或补货请求。
Step 1-1: 创建数据库表
假设我们有一个 inventory 表,它保存产品库存的信息:
CREATE TABLE inventory (
product_id SERIAL PRIMARY KEY,
product_name TEXT NOT NULL,
quantity INT NOT NULL
);
Step 1-2: 创建触发函数
创建一个 PL/pgSQL 函数,用于检查库存数量并记录警告信息:
CREATE OR REPLACE FUNCTION check_inventory_threshold()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.quantity < 10 THEN -- 假设 10 是阈值
-- 在此处记录警告或使用某种方式发送通知
RAISE WARNING 'Product % is below threshold with quantity %', NEW.product_name, NEW.quantity;
-- 可在此插入补货请求或通知操作
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Step 1-3: 创建触发器
设置一个触发器,更新 inventory 表时调用触发函数:
CREATE TRIGGER inventory_check_trigger AFTER INSERT OR UPDATE ON inventory FOR EACH ROW EXECUTE PROCEDURE check_inventory_threshold();Step 1-4: 使用 Python 进行外部操作
一个Python脚本可以用于监控警告并执行更复杂的操作,比如发送电子邮件或自动创建补货单。以下是一个简单的Python示例,假设你将警告日志记录到一个专门的日志表中:
import psycopg2
from smtplib import SMTP
def send_notification(product_name, quantity):
# 发送邮件通知逻辑(确保你已设置SMTP服务器配置)
with SMTP('smtp.example.com') as smtp:
smtp.sendmail('from@example.com', 'to@example.com',
f'Subject: Inventory Alert
Product {product_name} is below threshold with quantity {quantity}.')
def check_and_notify():
try:
# Connect to PostgreSQL database
connection = psycopg2.connect(
host="localhost",
database="your_database",
user="your_user",
password="your_password"
)
cursor = connection.cursor()
# Query to check logs for low inventory
query = """
SELECT product_name, quantity FROM inventory WHERE quantity < 10;
"""
cursor.execute(query)
low_stock_items = cursor.fetchall()
for product_name, quantity in low_stock_items:
send_notification(product_name, quantity)
except (Exception, psycopg2.DatabaseError) as error:
print(f"Error: {error}")
finally:
if connection:
cursor.close()
connection.close()
# Run the check and notify function
check_and_notify()
2. 自动化任务
自动执行某些日常任务,如记录变化、生成日志或进行审计。当某个表中的数据被修改时,触发器可以自动记录修改前后的数据以供审计,当对特定表进行插入、更新或删除操作时,触发器能够捕捉这些事件,并执行相关的处理逻辑。 下面是一个如何使用 PostgreSQL 触发器来记录数据变化的示例。假设我们有一个名为 employee_data 的表,我们希望记录每次数据更新时的操作者信息。
2-1. 创建一个用于日志记录的表
首先,需要新建一个用于存储变更日志的表。假设我们有一个名为employee_data 的表,我们希望记录每次数据更新时的操作者信息。
CREATE TABLE change_log (
id SERIAL PRIMARY KEY,
table_name TEXT,
operation VARCHAR(10),
changed_by TEXT,
change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
old_data JSONB,
new_data JSONB
);
2-2. 创建一个触发函数
接下来,创建一个触发函数。当 employee_data 表发生变化时,调用该函数来记录变更,检测并获取 old_data 和 new_data,然后通过 row_to_json 函数将其转换为 JSONB 格式存入日志表中。处理中请留意不同的操作对应的日志记录内容的差异。
CREATE OR REPLACE FUNCTION log_employee_data_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO change_log (table_name, operation, changed_by, old_data)
VALUES (
TG_TABLE_NAME,
TG_OP,
SESSION_USER,
row_to_json(OLD)
);
ELSE
INSERT INTO change_log (table_name, operation, changed_by, old_data, new_data)
VALUES (
TG_TABLE_NAME,
TG_OP,
SESSION_USER,
row_to_json(OLD),
row_to_json(NEW)
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
TG_OP 是 PostgreSQL 触发器函数中的一个特殊变量。在触发器函数中,TG_OP 用于表示触发事件的操作类型。它会被设置为以下字符串值之一,以标识触发器是由哪种数据库操作激活的:
'INSERT': 触发器是由插入操作激活的。
'UPDATE': 触发器是由更新操作激活的。
'DELETE': 触发器是由删除操作激活的.
'TRUNCATE': 触发器是由截断操作激活的。
在触发器函数中使用 TG_OP,可以根据不同的操作类型执行不同的逻辑。 2-3. 创建触发器
最后,为 employee_data 表创建一个触发器,当发生 INSERT、UPDATE 或 DELETE 操作时调用触发函数:
CREATE TRIGGER employee_data_changes AFTER INSERT OR UPDATE OR DELETE ON employee_data FOR EACH ROW EXECUTE PROCEDURE log_employee_data_changes();
2-4. 如果没有对应的表employee_data,就建一个来测试
CREATE TABLE employee_data (
employee_id SERIAL PRIMARY KEY, -- 员工唯一标识
first_name VARCHAR(50) NOT NULL, -- 员工的名字
last_name VARCHAR(50) NOT NULL, -- 员工的姓氏
email VARCHAR(100) UNIQUE NOT NULL, -- 员工的电子邮件地址
phone_number VARCHAR(15), -- 员工的联系电话
hire_date DATE NOT NULL, -- 入职日期
job_title VARCHAR(50), -- 职位名称
department VARCHAR(50), -- 所属部门
salary NUMERIC(10, 2), -- 薪水
manager_id INT, -- 上级主管ID,指向另一个员工
CONSTRAINT fk_manager
FOREIGN KEY(manager_id)
REFERENCES employee_data(employee_id)
ON DELETE SET NULL
);
2-5. 如果表中没有数据就添加一条来测试
INSERT INTO employee_data (
first_name,
last_name,
email,
phone_number,
hire_date,
job_title,
department,
salary,
manager_id
) VALUES (
'ZZZ', -- First name
'AAA', -- Last name
'ZZZ.AAA@example.com', -- Email address
'123-456-7890', -- Phone number
'2023-11-01', -- Hire date
'Engineer', -- Job title
'Engineering', -- Department
75000, -- Salary
NULL -- Manager ID (assuming no manager or manager not yet assigned)
);
3. 跨表更新或同步:
当一个表发生变化时时,触发器可以用于自动更新或同步其他表的数据。例如,在一个多表关联的系统中,有一个订单表order和一个库存表inventory,如果订单表中数据有变化,就触发更新库存表中的对应产品的数据。 3.1 建表示例
CREATE TABLE inventory (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(100),
stock_quantity INT NOT NULL
);
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
product_id INT REFERENCES inventory(product_id),
quantity INT NOT NULL
);
3.2 创建触发事件
当order表中已经发生insert,updat或者delete事件时,就触发下面的函数运行。实际数据的加减操作,请根据实际关系进行调整。这里的简单逻辑是:
有新订单添加时,就在库存表中减少产品库存数
订单数据有更新时,就把库存表中减去更新后订单表中对应产品的订单数据,然后加上更新前订单表中对应产品的数据
当订单取消(删除)时,就会在库存数据上增加之订单表中删除的旧数据
CREATE OR REPLACE FUNCTION update_inventory()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
UPDATE inventory
SET stock_quantity = stock_quantity - NEW.quantity
WHERE product_id = NEW.product_id;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE inventory
SET stock_quantity = stock_quantity - NEW.quantity + OLD.quantity
WHERE product_id = NEW.product_id;
ELSIF TG_OP = 'DELETE' THEN
UPDATE inventory
SET stock_quantity = stock_quantity + OLD.quantity
WHERE product_id = OLD.product_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
3.3 创建事件触发器
CREATE TRIGGER trigger_orders_update AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE PROCEDURE update_inventory();
(防止出现视觉疲劳)
4. 安全性检查和防护
执行安全性检查,如防止未授权的数据更改或异常数据输入。如果有可疑活动或不当数据修改,触发器可以自动拒绝操作或生成警告。假设你有一个敏感数据的表,如 sensitive_data,需要确保只有授权用户才能更新数据。 4.1 建表sensitive_data示例
CREATE TABLE sensitive_data (
id SERIAL PRIMARY KEY,
data TEXT NOT NULL,
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
4.2 创建触发函数进行安全检查
创建一个触发函数来检查是否是授权用户在做修改。
CREATE OR REPLACE FUNCTION check_user_authorization()
RETURNS TRIGGER AS $$
BEGIN
-- 简单检查:用户是否在允许的列表中(实际应该更加复杂)
IF SESSION_USER <> 'authorized_user' THEN
RAISE EXCEPTION 'Unauthorized user. Access denied.';
END IF;
-- 更新 last_modified 时间戳
NEW.last_modified := CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
4.3 为表创建触发器
CREATE TRIGGER secure_update_trigger BEFORE UPDATE ON sensitive_data FOR EACH ROW EXECUTE PROCEDURE check_user_authorization();
该事件触发器的功能说明
功能:这个示例功能是,当有人试图更新 sensitive_data 表中的数据时,触发器函数 check_user_authorization() 会自动检查发起更新的数据库用户是否有权限。如果没有权限,抛出异常并阻止操作。
扩展:在实际的生产环境中,这种安全性检查会更复杂,可能包括日志记录、详细的用户权限检查、使用角色来管理权限等。
安全性:使用触发器确保只有合适和经过验证的用户可以进行关键数据修改,这是保护数据完整性的一部分。
审计:这种自动检查可集成到更大的审计框架中,以全面监控和存储所有数据修改尝试记录。
5. 事件通知(客户端程序配合事件触发的同步处理方式)
使用事件触发器和事件通知来实现对特定数据库事件的响应和处理。使用 LISTEN 和 NOTIFY 机制,数据库客户端可以监听特定的通道,并在触发器函数中发送通知。这在需要实时监控数据库事件时非常有用。下面是一个使用 PostgreSQL 实现事件通知的示例。
假设我们希望在 orders 表中插入新订单时发送通知,以便外部系统或服务进行相应处理。
5.1 建一个orders表方便示例
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
product_id INT NOT NULL,
quantity INT NOT NULL,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
触发器可以用于事件通知,例如在数据变化时发送电子邮件通知相关人员。这在实时监控和响应系统中非常有用。
5.2 建立触发函数
CREATE OR REPLACE FUNCTION notify_new_order()
RETURNS TRIGGER AS $$
BEGIN
-- 使用 NOTIFY 发送通知,通道名为 'new_order'
PERFORM pg_notify('new_order', 'New order placed: ' || NEW.order_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
5.3 创建触发器 为 orders 表创建触发器,以在插入新记录时调用触发函数。
CREATE TRIGGER notify_order_insert AFTER INSERT ON orders FOR EACH ROW EXECUTE PROCEDURE notify_new_order();5.4 使用 Python 监听通知 我们可以使用 Python 脚本来监听并处理通知。以下是一个简单的示例,使用 psycopg2 库监听 new_order 通道。
import psycopg2
import select
def listen_for_new_orders():
try:
# Connect to your PostgreSQL database
connection = psycopg2.connect(
dbname="your_db",
user="your_user",
password="your_password",
host="localhost"
)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
# Listen for notifications on the 'new_order' channel
cursor.execute("LISTEN new_order;")
print("Waiting for notifications on channel 'new_order'...")
while True:
# Use select() to wait for notification
if select.select([connection], [], [], 5) == ([], [], []):
print("No new notifications.")
else:
connection.poll()
while connection.notifies:
notify = connection.notifies.pop(0)
print(f"Got NOTIFY: {notify.channel} - {notify.payload}")
except (Exception, psycopg2.DatabaseError) as error:
print(f"Error: {error}")
finally:
if connection:
cursor.close()
connection.close()
# Call the function to start listening for notifications
if __name__=='__main__':
listen_for_new_orders()
6. 事件通知(客户端程序异步多线程的方式进行检测和操作)
示例的数据库表和事件触发的设置或创建,和示例5中相同,不过这里我们要增加一些复杂度,毕竟,程序处理要尽可能避免堵塞的方式进行等待读取。这里设想另外一种使用场景:
一方面客户端要检测数据库的orders表中的数据变化;另一方面,客户端还在继续读取(或者其他操作)这个数据库中的数据。
import threading
import psycopg2
import select
import time
# Global flag to indicate whether the threads should continue running
running = True
def listen_for_new_orders():
try:
# Connect to your PostgreSQL database
connection = psycopg2.connect(
dbname="your_db",
user="your_user",
password="your_password",
host="localhost"
)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
# Listen for notifications on the 'new_order' channel
cursor.execute("LISTEN new_order;")
print("Waiting for notifications on channel 'new_order'...")
while running:
# Use select() to wait for notification
if select.select([connection], [], [], 5) == ([], [], []):
continue
else:
connection.poll()
while connection.notifies:
notify = connection.notifies.pop(0)
print(f"Got NOTIFY: {notify.channel} - {notify.payload}")
except (Exception, psycopg2.DatabaseError) as error:
print(f"Error: {error}")
finally:
if connection:
cursor.close()
connection.close()
def read_database_records():
while running:
try:
# Example of reading from PostgreSQL
connection = psycopg2.connect(
dbname="your_db",
user="your_user",
password="your_password",
host="localhost"
)
cursor = connection.cursor()
# Example query to periodically read data (replace with actual query)
cursor.execute("SELECT * FROM orders;")
records = cursor.fetchall()
for record in records:
print(f"Order Record: {record}")
time.sleep(10) # Wait before reading again to simulate periodic check
except (Exception, psycopg2.DatabaseError) as error:
print(f"Error: {error}")
finally:
if connection:
cursor.close()
connection.close()
def main():
try:
# Create threads for listening and reading
listener_thread = threading.Thread(target=listen_for_new_orders)
reader_thread = threading.Thread(target=read_database_records)
# Start the threads
listener_thread.start()
reader_thread.start()
# Wait for both threads to complete (or terminate on Ctrl+C)
listener_thread.join()
reader_thread.join()
except KeyboardInterrupt:
# Set the running flag to False to stop the threads
global running
running = False
print("Exiting...")
if __name__ == "__main__":
main()
请留意上面的示例python代码中,数据库的连接使用了ISOLATION_LEVEL_AUTOCOMMIT,这就意味着每次涉及到数据更改或者增加的操作,数据库将自动提交了。如果要手动方式提交,那就需要配置一个ISOLATION_LEVEL_READ_COMMITTED。 另外需要留意,前面的事件触发示例中,用了:
... FOR EACH ROW EXECUTE PROCEDURE your_trigger_func(); ...这个代码的执行是针对每条记录的发生来触发了。请根据实际应用的操作需要进行调整。
全部0条评论
快来发表一下你的评论吧 !