數據庫無論對于生產管理還是很多的實際應用都非常重要。小編這次聊一下數據庫事件觸發的應用。示例使用了postgresql和Python。在本文中,事件觸發和處理大概地分為兩類:
數據庫的事件觸發和服務器內部處理(1~4)
數據庫事件觸發后,客戶端的程序檢測到該事件的觸發對應的處理(5~6)
在數據庫系統中,事件觸發(通常指數據庫觸發器)以及讀取事件觸發的信息用于多種場景和需求。請看兩組示例(1~4)和(5~6)。
1. 數據一致性和完整性維護
當對數據庫表中的數據進行插入、更新或刪除操作時,需要自動驗證或調整相關數據,以確保它們符合業務規則或約束。例如,在一個訂單管理系統中,如果庫存數量減少到一定閾值以下,可以觸發一個警告或補貨請求。
Step 1-1: 創建數據庫表
假設我們有一個inventory表,它保存產品庫存的信息:
CREATE TABLE inventory ( product_id SERIAL PRIMARY KEY, product_name TEXT NOT NULL, quantity INT NOT NULL );
Step 1-2: 創建觸發函數
創建一個 PL/pgSQL 函數,用于檢查庫存數量并記錄警告信息:
CREATE OR REPLACE FUNCTION check_inventory_threshold() RETURNS TRIGGER AS $$ BEGIN IF NEW.quantity < 10 THEN -- 假設 10 是閾值 -- 在此處記錄警告或使用某種方式發送通知 RAISE WARNING 'Product % is below threshold with quantity %', NEW.product_name, NEW.quantity; -- 可在此插入補貨請求或通知操作 END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
Step 1-3: 創建觸發器
設置一個觸發器,更新inventory表時調用觸發函數:
CREATE TRIGGER inventory_check_trigger AFTER INSERT OR UPDATE ON inventory FOREACHROWEXECUTEPROCEDUREcheck_inventory_threshold();Step 1-4: 使用 Python 進行外部操作
一個Python腳本可以用于監控警告并執行更復雜的操作,比如發送電子郵件或自動創建補貨單。以下是一個簡單的Python示例,假設你將警告日志記錄到一個專門的日志表中:
import psycopg2 from smtplib import SMTP def send_notification(product_name, quantity): # 發送郵件通知邏輯(確保你已設置SMTP服務器配置) with SMTP('smtp.example.com') as smtp: smtp.sendmail('from@example.com', 'to@example.com', f'Subject: Inventory Alert
Product {product_name} is below threshold with quantity {quantity}.') def check_and_notify(): try: # Connect to PostgreSQL database connection = psycopg2.connect( host="localhost", database="your_database", user="your_user", password="your_password" ) cursor = connection.cursor() # Query to check logs for low inventory query = """ SELECT product_name, quantity FROM inventory WHERE quantity < 10; """ cursor.execute(query) low_stock_items = cursor.fetchall() for product_name, quantity in low_stock_items: send_notification(product_name, quantity) except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() # Run the check and notify function check_and_notify()
2.自動化任務
自動執行某些日常任務,如記錄變化、生成日志或進行審計。當某個表中的數據被修改時,觸發器可以自動記錄修改前后的數據以供審計,當對特定表進行插入、更新或刪除操作時,觸發器能夠捕捉這些事件,并執行相關的處理邏輯。 下面是一個如何使用 PostgreSQL 觸發器來記錄數據變化的示例。假設我們有一個名為employee_data的表,我們希望記錄每次數據更新時的操作者信息。
2-1. 創建一個用于日志記錄的表
首先,需要新建一個用于存儲變更日志的表。假設我們有一個名為employee_data的表,我們希望記錄每次數據更新時的操作者信息。
CREATE TABLE change_log ( id SERIAL PRIMARY KEY, table_name TEXT, operation VARCHAR(10), changed_by TEXT, change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, old_data JSONB, new_data JSONB );
2-2. 創建一個觸發函數
接下來,創建一個觸發函數。當employee_data表發生變化時,調用該函數來記錄變更,檢測并獲取old_data和new_data,然后通過row_to_json函數將其轉換為 JSONB 格式存入日志表中。處理中請留意不同的操作對應的日志記錄內容的差異。
CREATE OR REPLACE FUNCTION log_employee_data_changes() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'DELETE' THEN INSERT INTO change_log (table_name, operation, changed_by, old_data) VALUES ( TG_TABLE_NAME, TG_OP, SESSION_USER, row_to_json(OLD) ); ELSE INSERT INTO change_log (table_name, operation, changed_by, old_data, new_data) VALUES ( TG_TABLE_NAME, TG_OP, SESSION_USER, row_to_json(OLD), row_to_json(NEW) ); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;TG_OP是 PostgreSQL 觸發器函數中的一個特殊變量。在觸發器函數中,TG_OP用于表示觸發事件的操作類型。它會被設置為以下字符串值之一,以標識觸發器是由哪種數據庫操作激活的:
'INSERT': 觸發器是由插入操作激活的。
'UPDATE': 觸發器是由更新操作激活的。
'DELETE': 觸發器是由刪除操作激活的.
'TRUNCATE': 觸發器是由截斷操作激活的。
在觸發器函數中使用TG_OP,可以根據不同的操作類型執行不同的邏輯。 2-3. 創建觸發器
最后,為employee_data表創建一個觸發器,當發生INSERT、UPDATE或DELETE操作時調用觸發函數:
CREATE TRIGGER employee_data_changes AFTER INSERT OR UPDATE OR DELETE ON employee_data FOR EACH ROW EXECUTE PROCEDURE log_employee_data_changes();
2-4.如果沒有對應的表employee_data,就建一個來測試
CREATE TABLE employee_data ( employee_id SERIAL PRIMARY KEY, -- 員工唯一標識 first_name VARCHAR(50) NOT NULL, -- 員工的名字 last_name VARCHAR(50) NOT NULL, -- 員工的姓氏 email VARCHAR(100) UNIQUE NOT NULL, -- 員工的電子郵件地址 phone_number VARCHAR(15), -- 員工的聯系電話 hire_date DATE NOT NULL, -- 入職日期 job_title VARCHAR(50), -- 職位名稱 department VARCHAR(50), -- 所屬部門 salary NUMERIC(10, 2), -- 薪水 manager_id INT, -- 上級主管ID,指向另一個員工 CONSTRAINT fk_manager FOREIGN KEY(manager_id) REFERENCES employee_data(employee_id) ON DELETE SET NULL );
2-5. 如果表中沒有數據就添加一條來測試
INSERT INTO employee_data ( first_name, last_name, email, phone_number, hire_date, job_title, department, salary, manager_id ) VALUES ( 'ZZZ', -- First name 'AAA', -- Last name 'ZZZ.AAA@example.com', -- Email address '123-456-7890', -- Phone number '2023-11-01', -- Hire date 'Engineer', -- Job title 'Engineering', -- Department 75000, -- Salary NULL -- Manager ID (assuming no manager or manager not yet assigned) );
3. 跨表更新或同步:
當一個表發生變化時時,觸發器可以用于自動更新或同步其他表的數據。例如,在一個多表關聯的系統中,有一個訂單表order和一個庫存表inventory,如果訂單表中數據有變化,就觸發更新庫存表中的對應產品的數據。 3.1建表示例
CREATE TABLE inventory ( product_id SERIAL PRIMARY KEY, product_name VARCHAR(100), stock_quantity INT NOT NULL ); CREATE TABLE orders ( order_id SERIAL PRIMARY KEY, product_id INT REFERENCES inventory(product_id), quantity INT NOT NULL );
3.2 創建觸發事件
當order表中已經發生insert,updat或者delete事件時,就觸發下面的函數運行。實際數據的加減操作,請根據實際關系進行調整。這里的簡單邏輯是:
有新訂單添加時,就在庫存表中減少產品庫存數
訂單數據有更新時,就把庫存表中減去更新后訂單表中對應產品的訂單數據,然后加上更新前訂單表中對應產品的數據
當訂單取消(刪除)時,就會在庫存數據上增加之訂單表中刪除的舊數據
CREATE OR REPLACE FUNCTION update_inventory() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'INSERT' THEN UPDATE inventory SET stock_quantity = stock_quantity - NEW.quantity WHERE product_id = NEW.product_id; ELSIF TG_OP = 'UPDATE' THEN UPDATE inventory SET stock_quantity = stock_quantity - NEW.quantity + OLD.quantity WHERE product_id = NEW.product_id; ELSIF TG_OP = 'DELETE' THEN UPDATE inventory SET stock_quantity = stock_quantity + OLD.quantity WHERE product_id = OLD.product_id; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
3.3 創建事件觸發器
CREATE TRIGGER trigger_orders_update AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE PROCEDURE update_inventory();
(防止出現視覺疲勞)
4. 安全性檢查和防護
執行安全性檢查,如防止未授權的數據更改或異常數據輸入。如果有可疑活動或不當數據修改,觸發器可以自動拒絕操作或生成警告。假設你有一個敏感數據的表,如sensitive_data,需要確保只有授權用戶才能更新數據。 4.1建表sensitive_data示例
CREATE TABLE sensitive_data ( id SERIAL PRIMARY KEY, data TEXT NOT NULL, last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
4.2 創建觸發函數進行安全檢查
創建一個觸發函數來檢查是否是授權用戶在做修改。
CREATE OR REPLACE FUNCTION check_user_authorization() RETURNS TRIGGER AS $$ BEGIN -- 簡單檢查:用戶是否在允許的列表中(實際應該更加復雜) IF SESSION_USER <> 'authorized_user' THEN RAISE EXCEPTION 'Unauthorized user. Access denied.'; END IF; -- 更新 last_modified 時間戳 NEW.last_modified := CURRENT_TIMESTAMP; RETURN NEW; END; $$ LANGUAGE plpgsql;
4.3為表創建觸發器
CREATE TRIGGER secure_update_trigger BEFORE UPDATE ON sensitive_data FOR EACH ROW EXECUTE PROCEDURE check_user_authorization();
該事件觸發器的功能說明
功能:這個示例功能是,當有人試圖更新sensitive_data表中的數據時,觸發器函數check_user_authorization()會自動檢查發起更新的數據庫用戶是否有權限。如果沒有權限,拋出異常并阻止操作。
擴展:在實際的生產環境中,這種安全性檢查會更復雜,可能包括日志記錄、詳細的用戶權限檢查、使用角色來管理權限等。
安全性:使用觸發器確保只有合適和經過驗證的用戶可以進行關鍵數據修改,這是保護數據完整性的一部分。
審計:這種自動檢查可集成到更大的審計框架中,以全面監控和存儲所有數據修改嘗試記錄。
5.事件通知(客戶端程序配合事件觸發的同步處理方式)
使用事件觸發器和事件通知來實現對特定數據庫事件的響應和處理。使用LISTEN和NOTIFY機制,數據庫客戶端可以監聽特定的通道,并在觸發器函數中發送通知。這在需要實時監控數據庫事件時非常有用。下面是一個使用 PostgreSQL 實現事件通知的示例。
假設我們希望在orders表中插入新訂單時發送通知,以便外部系統或服務進行相應處理。
5.1建一個orders表方便示例
CREATE TABLE orders ( order_id SERIAL PRIMARY KEY, product_id INT NOT NULL, quantity INT NOT NULL, order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); 觸發器可以用于事件通知,例如在數據變化時發送電子郵件通知相關人員。這在實時監控和響應系統中非常有用。5.2 建立觸發函數
CREATE OR REPLACE FUNCTION notify_new_order() RETURNS TRIGGER AS $$ BEGIN -- 使用 NOTIFY 發送通知,通道名為 'new_order' PERFORM pg_notify('new_order', 'New order placed: ' || NEW.order_id); RETURN NEW; END; $$ LANGUAGE plpgsql;5.3創建觸發器 為orders表創建觸發器,以在插入新記錄時調用觸發函數。
CREATE TRIGGER notify_order_insert AFTER INSERT ON orders FOR EACH ROW EXECUTE PROCEDURE notify_new_order();5.4使用 Python 監聽通知 我們可以使用 Python 腳本來監聽并處理通知。以下是一個簡單的示例,使用psycopg2庫監聽new_order通道。
import psycopg2 import select def listen_for_new_orders(): try: # Connect to your PostgreSQL database connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() # Listen for notifications on the 'new_order' channel cursor.execute("LISTEN new_order;") print("Waiting for notifications on channel 'new_order'...") while True: # Use select() to wait for notification if select.select([connection], [], [], 5) == ([], [], []): print("No new notifications.") else: connection.poll() while connection.notifies: notify = connection.notifies.pop(0) print(f"Got NOTIFY: {notify.channel} - {notify.payload}") except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() # Call the function to start listening for notifications if__name__=='__main__': listen_for_new_orders()
6. 事件通知(客戶端程序異步多線程的方式進行檢測和操作)
示例的數據庫表和事件觸發的設置或創建,和示例5中相同,不過這里我們要增加一些復雜度,畢竟,程序處理要盡可能避免堵塞的方式進行等待讀取。這里設想另外一種使用場景:
一方面客戶端要檢測數據庫的orders表中的數據變化;另一方面,客戶端還在繼續讀取(或者其他操作)這個數據庫中的數據。
import threading import psycopg2 import select import time # Global flag to indicate whether the threads should continue running running = True def listen_for_new_orders(): try: # Connect to your PostgreSQL database connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() # Listen for notifications on the 'new_order' channel cursor.execute("LISTEN new_order;") print("Waiting for notifications on channel 'new_order'...") while running: # Use select() to wait for notification if select.select([connection], [], [], 5) == ([], [], []): continue else: connection.poll() while connection.notifies: notify = connection.notifies.pop(0) print(f"Got NOTIFY: {notify.channel} - {notify.payload}") except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def read_database_records(): while running: try: # Example of reading from PostgreSQL connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) cursor = connection.cursor() # Example query to periodically read data (replace with actual query) cursor.execute("SELECT * FROM orders;") records = cursor.fetchall() for record in records: print(f"Order Record: {record}") time.sleep(10) # Wait before reading again to simulate periodic check except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def main(): try: # Create threads for listening and reading listener_thread = threading.Thread(target=listen_for_new_orders) reader_thread = threading.Thread(target=read_database_records) # Start the threads listener_thread.start() reader_thread.start() # Wait for both threads to complete (or terminate on Ctrl+C) listener_thread.join() reader_thread.join() except KeyboardInterrupt: # Set the running flag to False to stop the threads global running running = False print("Exiting...") if __name__ == "__main__": main()請留意上面的示例python代碼中,數據庫的連接使用了ISOLATION_LEVEL_AUTOCOMMIT,這就意味著每次涉及到數據更改或者增加的操作,數據庫將自動提交了。如果要手動方式提交,那就需要配置一個ISOLATION_LEVEL_READ_COMMITTED。 另外需要留意,前面的事件觸發示例中,用了:
... FOR EACH ROW EXECUTE PROCEDURE your_trigger_func(); ...這個代碼的執行是針對每條記錄的發生來觸發了。請根據實際應用的操作需要進行調整。
-
數據庫
+關注
關注
7文章
3799瀏覽量
64388 -
觸發器
+關注
關注
14文章
2000瀏覽量
61153 -
函數
+關注
關注
3文章
4331瀏覽量
62608 -
python
+關注
關注
56文章
4797瀏覽量
84682
原文標題:數據庫事件觸發的設置和應用,及客戶端程序對事件的同步、異步讀取操作
文章出處:【微信號:安費諾傳感器學堂,微信公眾號:安費諾傳感器學堂】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論