PostgreSQL 事务与并发控制
2026/3/20大约 15 分钟
PostgreSQL 事务与并发控制
一、事务基础
1.1 什么是事务
事务(Transaction)是数据库操作的基本单位,它是一组不可分割的操作序列,要么全部执行成功,要么全部不执行。
1.2 事务基本操作
-- 开始事务
BEGIN;
-- 或
START TRANSACTION;
-- 执行操作
INSERT INTO accounts (user_id, balance) VALUES (1, 1000);
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1;
UPDATE accounts SET balance = balance + 100 WHERE user_id = 2;
-- 提交事务
COMMIT;
-- 回滚事务
ROLLBACK;
-- 设置事务特性
BEGIN ISOLATION LEVEL SERIALIZABLE;
BEGIN READ ONLY;
BEGIN ISOLATION LEVEL READ COMMITTED READ WRITE;
-- 保存点(Savepoint)
BEGIN;
INSERT INTO users (name) VALUES ('Alice');
SAVEPOINT sp1;
INSERT INTO users (name) VALUES ('Bob');
-- 发现错误,回滚到保存点
ROLLBACK TO SAVEPOINT sp1;
-- Bob 的插入被回滚,Alice 保留
INSERT INTO users (name) VALUES ('Charlie');
COMMIT;
-- 释放保存点
RELEASE SAVEPOINT sp1;
1.3 自动提交模式
-- PostgreSQL 默认自动提交
-- 每条语句都是一个独立事务
INSERT INTO users (name) VALUES ('Alice'); -- 自动提交
-- 关闭自动提交(psql)
\set AUTOCOMMIT off
-- 查看当前事务状态
SELECT txid_current(); -- 当前事务 ID
SELECT pg_backend_pid(); -- 当前进程 ID
-- 在应用程序中控制事务
-- Java: connection.setAutoCommit(false);
-- Python: connection.autocommit = False
二、事务隔离级别
2.1 并发问题
2.2 PostgreSQL 隔离级别
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 序列化异常 |
|---|---|---|---|---|
| Read Uncommitted | 不可能* | 可能 | 可能 | 可能 |
| Read Committed | 不可能 | 可能 | 可能 | 可能 |
| Repeatable Read | 不可能 | 不可能 | 不可能* | 可能 |
| Serializable | 不可能 | 不可能 | 不可能 | 不可能 |
*PostgreSQL 的 Read Uncommitted 实际等同于 Read Committed
*PostgreSQL 的 Repeatable Read 已经防止了幻读
-- 设置隔离级别
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- 会话级别设置
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- 查看当前隔离级别
SHOW transaction_isolation;
-- 全局默认设置(postgresql.conf)
-- default_transaction_isolation = 'read committed'
2.3 Read Committed(默认)
-- Read Committed 是 PostgreSQL 的默认隔离级别
-- 每条 SQL 语句看到的是该语句开始时已提交的数据
-- 会话 1
BEGIN;
SELECT * FROM accounts WHERE id = 1; -- balance = 1000
-- 此时会话 2 更新并提交
SELECT * FROM accounts WHERE id = 1; -- balance = 900(看到新值)
COMMIT;
-- 会话 2
BEGIN;
UPDATE accounts SET balance = 900 WHERE id = 1;
COMMIT;
-- 特点:
-- 1. 同一事务中,不同时间点的查询可能看到不同数据
-- 2. 适合大多数 OLTP 场景
-- 3. 不会阻塞读操作
2.4 Repeatable Read
-- Repeatable Read 保证同一事务中看到一致的快照
-- 快照是在事务开始(第一条查询)时确定的
-- 会话 1
BEGIN ISOLATION LEVEL REPEATABLE READ;
SELECT * FROM accounts WHERE id = 1; -- balance = 1000
-- 此时会话 2 更新并提交
SELECT * FROM accounts WHERE id = 1; -- 仍然是 1000(快照一致)
-- 尝试更新被其他事务修改的行
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 错误:could not serialize access due to concurrent update
ROLLBACK;
-- 会话 2
BEGIN;
UPDATE accounts SET balance = 900 WHERE id = 1;
COMMIT;
-- 特点:
-- 1. 事务看到的数据是事务开始时的快照
-- 2. 如果要更新的行被其他事务修改,会报错
-- 3. 需要在应用层处理重试逻辑
2.5 Serializable
-- Serializable 是最严格的隔离级别
-- 保证并发事务的结果等同于某种串行执行顺序
-- 经典的写偏斜问题
-- 表:doctors (id, name, on_call)
-- 规则:至少要有一个医生值班
-- 会话 1
BEGIN ISOLATION LEVEL SERIALIZABLE;
SELECT COUNT(*) FROM doctors WHERE on_call = true; -- 返回 2
UPDATE doctors SET on_call = false WHERE id = 1;
COMMIT; -- 可能失败
-- 会话 2
BEGIN ISOLATION LEVEL SERIALIZABLE;
SELECT COUNT(*) FROM doctors WHERE on_call = true; -- 返回 2
UPDATE doctors SET on_call = false WHERE id = 2;
COMMIT; -- 可能失败
-- 其中一个事务会失败:
-- ERROR: could not serialize access due to read/write dependencies
-- 特点:
-- 1. 使用 SSI(Serializable Snapshot Isolation)
-- 2. 检测到潜在的序列化冲突时会回滚事务
-- 3. 应用需要处理重试逻辑
-- 4. 性能开销较大,但比传统锁定方式好
2.6 隔离级别选择建议
-- 1. Read Committed(默认,推荐大多数场景)
-- 适用:普通 OLTP 应用、简单的增删改查
-- 优点:并发性好,不易阻塞
-- 注意:同一事务中多次读取可能得到不同结果
-- 2. Repeatable Read
-- 适用:需要一致性读取的报表、批量处理
-- 优点:事务内数据一致
-- 注意:更新冲突时会报错,需要重试
-- 3. Serializable
-- 适用:金融交易、库存管理等对一致性要求极高的场景
-- 优点:完全隔离,无并发异常
-- 注意:性能开销大,需要重试机制
-- 重试逻辑示例(伪代码)
DO $$
DECLARE
retry_count INTEGER := 0;
max_retries INTEGER := 3;
BEGIN
LOOP
BEGIN
-- 业务逻辑
PERFORM some_operation();
RETURN;
EXCEPTION
WHEN serialization_failure THEN
retry_count := retry_count + 1;
IF retry_count > max_retries THEN
RAISE;
END IF;
-- 等待随机时间后重试
PERFORM pg_sleep(random() * 0.1);
END;
END LOOP;
END $$;
三、锁机制
3.1 锁类型概览
3.2 表级锁冲突矩阵
| 请求\持有 | AS | RS | RE | SUE | S | SRE | E | AE |
|---|---|---|---|---|---|---|---|---|
| ACCESS SHARE (AS) | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✗ |
| ROW SHARE (RS) | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✗ | ✗ |
| ROW EXCLUSIVE (RE) | ✓ | ✓ | ✓ | ✓ | ✗ | ✗ | ✗ | ✗ |
| SHARE UPDATE EXCL (SUE) | ✓ | ✓ | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ |
| SHARE (S) | ✓ | ✓ | ✗ | ✗ | ✓ | ✗ | ✗ | ✗ |
| SHARE ROW EXCL (SRE) | ✓ | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ |
| EXCLUSIVE (E) | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ |
| ACCESS EXCLUSIVE (AE) | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ |
3.3 显式锁定
-- 表级锁
LOCK TABLE users IN ACCESS SHARE MODE;
LOCK TABLE users IN ROW SHARE MODE;
LOCK TABLE users IN ROW EXCLUSIVE MODE;
LOCK TABLE users IN SHARE UPDATE EXCLUSIVE MODE;
LOCK TABLE users IN SHARE MODE;
LOCK TABLE users IN SHARE ROW EXCLUSIVE MODE;
LOCK TABLE users IN EXCLUSIVE MODE;
LOCK TABLE users IN ACCESS EXCLUSIVE MODE;
-- 不等待锁
LOCK TABLE users IN EXCLUSIVE MODE NOWAIT;
-- 行级锁
SELECT * FROM users WHERE id = 1 FOR UPDATE;
SELECT * FROM users WHERE id = 1 FOR SHARE;
SELECT * FROM users WHERE id = 1 FOR NO KEY UPDATE;
SELECT * FROM users WHERE id = 1 FOR KEY SHARE;
-- 不等待行锁
SELECT * FROM users WHERE id = 1 FOR UPDATE NOWAIT;
SELECT * FROM users WHERE id = 1 FOR UPDATE SKIP LOCKED;
-- 只锁定特定表(多表连接时)
SELECT * FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.id = 1
FOR UPDATE OF users;
3.4 行级锁使用场景
-- 1. FOR UPDATE - 独占锁,用于更新
-- 场景:转账操作
BEGIN;
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE;
-- 其他事务无法修改或锁定该行
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
-- 2. FOR SHARE - 共享锁,阻止更新但允许读取
-- 场景:检查数据完整性
BEGIN;
SELECT * FROM users WHERE id = 1 FOR SHARE;
-- 确保在事务期间用户不被删除或修改
INSERT INTO orders (user_id, amount) VALUES (1, 100);
COMMIT;
-- 3. FOR NO KEY UPDATE - 不涉及键的更新
-- 场景:更新非键字段,减少锁竞争
BEGIN;
SELECT * FROM users WHERE id = 1 FOR NO KEY UPDATE;
UPDATE users SET last_login = NOW() WHERE id = 1;
COMMIT;
-- 4. SKIP LOCKED - 跳过已锁定的行
-- 场景:任务队列
BEGIN;
SELECT * FROM tasks
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED;
-- 如果该行被锁定,跳过选择下一行
UPDATE tasks SET status = 'processing' WHERE id = ?;
COMMIT;
3.5 咨询锁(Advisory Locks)
-- 咨询锁是应用程序自定义的锁
-- 数据库只负责管理,不关联具体资源
-- 会话级咨询锁
SELECT pg_advisory_lock(123); -- 获取排他锁
SELECT pg_advisory_lock_shared(123); -- 获取共享锁
SELECT pg_advisory_unlock(123); -- 释放锁
SELECT pg_advisory_unlock_all(); -- 释放所有
-- 尝试获取(非阻塞)
SELECT pg_try_advisory_lock(123); -- 成功返回 true
-- 事务级咨询锁(事务结束自动释放)
SELECT pg_advisory_xact_lock(123);
SELECT pg_try_advisory_xact_lock(123);
-- 两个参数版本(int4, int4)
SELECT pg_advisory_lock(1, 100); -- 锁定资源类型 1,ID 100
-- 使用场景示例
-- 1. 防止并发执行同一任务
DO $$
BEGIN
IF pg_try_advisory_lock(hashtext('my_task')) THEN
PERFORM my_long_running_task();
PERFORM pg_advisory_unlock(hashtext('my_task'));
ELSE
RAISE NOTICE 'Task already running';
END IF;
END $$;
-- 2. 分布式锁(单机数据库内)
-- 锁定用户 ID 为 1001 的资源
SELECT pg_advisory_lock(1001);
-- 执行操作
SELECT pg_advisory_unlock(1001);
-- 查看当前咨询锁
SELECT * FROM pg_locks WHERE locktype = 'advisory';
四、死锁处理
4.1 死锁产生条件
4.2 死锁检测与配置
-- 死锁检测超时(默认 1 秒)
SHOW deadlock_timeout;
SET deadlock_timeout = '1s';
-- 锁等待超时(默认无限等待)
SHOW lock_timeout;
SET lock_timeout = '10s'; -- 等待 10 秒后放弃
-- 语句超时
SHOW statement_timeout;
SET statement_timeout = '30s';
-- 查看当前锁等待
SELECT
blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_query,
blocking_activity.query AS blocking_query
FROM pg_locks blocked_locks
JOIN pg_stat_activity blocked_activity
ON blocked_activity.pid = blocked_locks.pid
JOIN pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_stat_activity blocking_activity
ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;
-- 查看所有锁
SELECT
l.locktype,
l.relation::regclass,
l.mode,
l.granted,
l.pid,
a.usename,
a.query,
a.state
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.relation IS NOT NULL
ORDER BY l.relation, l.pid;
4.3 避免死锁的策略
-- 1. 固定加锁顺序
-- 总是按相同顺序访问资源(如按 ID 升序)
-- 不好的做法
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 2;
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
COMMIT;
-- 好的做法
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- 先锁 id=1
UPDATE accounts SET balance = balance + 100 WHERE id = 2; -- 再锁 id=2
COMMIT;
-- 2. 使用 SELECT ... FOR UPDATE 预先获取锁
BEGIN;
SELECT * FROM accounts WHERE id IN (1, 2) ORDER BY id FOR UPDATE;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
-- 3. 缩短事务时间
-- 尽快完成事务,减少锁持有时间
-- 4. 使用较低的隔离级别
-- 如果业务允许,使用 Read Committed
-- 5. 使用 NOWAIT 或 SKIP LOCKED
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE NOWAIT;
-- 如果锁不可用,立即返回错误
EXCEPTION WHEN lock_not_available THEN
-- 处理锁冲突
END;
-- 6. 使用咨询锁进行粗粒度控制
SELECT pg_advisory_lock(hashtext('transfer_' || least(1, 2) || '_' || greatest(1, 2)));
-- 执行转账
SELECT pg_advisory_unlock(hashtext('transfer_' || least(1, 2) || '_' || greatest(1, 2)));
4.4 死锁日志分析
-- 配置死锁日志
-- postgresql.conf
-- log_lock_waits = on
-- deadlock_timeout = 1s
-- 日志示例:
-- LOG: process 12345 detected deadlock while waiting for ShareLock
-- on transaction 1234 after 1000.123 ms
-- DETAIL: Process holding the lock: 12346.
-- Wait queue: 12345.
-- STATEMENT: UPDATE accounts SET balance = 900 WHERE id = 2
-- 从日志分析死锁
-- 1. 找到涉及的进程 ID
-- 2. 查看等待的锁类型
-- 3. 分析 SQL 语句
-- 4. 确定资源访问顺序
-- 5. 修改应用逻辑避免死锁
五、MVCC 与可见性
5.1 MVCC 工作原理
5.2 可见性判断
-- 查看行的事务信息
SELECT xmin, xmax, ctid, * FROM users;
-- 可见性规则(简化版):
-- 1. xmin 已提交 AND xmin < 当前快照
-- 2. xmax = 0 OR xmax 未提交 OR xmax > 当前快照
-- 查看当前快照信息
SELECT txid_current_snapshot();
-- 返回格式:xmin:xmax:xip_list
-- 例如:100:105:102,104
-- xmin=100: 小于此值的事务都已完成
-- xmax=105: 大于等于此值的事务还未开始
-- xip_list: 100-105 之间仍在运行的事务
-- 事务 ID 函数
SELECT txid_current(); -- 当前事务 ID
SELECT txid_snapshot_xmin(txid_current_snapshot()); -- 快照最小 xid
SELECT txid_snapshot_xmax(txid_current_snapshot()); -- 快照最大 xid
SELECT txid_snapshot_xip(txid_current_snapshot()); -- 进行中的 xid
5.3 事务 ID 回卷问题
-- PostgreSQL 使用 32 位事务 ID(约 40 亿)
-- 事务 ID 会回卷,需要 VACUUM 冻结旧元组
-- 查看事务 ID 使用情况
SELECT datname,
age(datfrozenxid) AS age,
2^31 - age(datfrozenxid) AS remaining
FROM pg_database
ORDER BY age DESC;
-- 查看表的年龄
SELECT relname,
age(relfrozenxid) AS age,
pg_size_pretty(pg_relation_size(oid)) AS size
FROM pg_class
WHERE relkind = 'r'
ORDER BY age DESC
LIMIT 20;
-- 相关配置
SHOW vacuum_freeze_min_age; -- 默认 50000000
SHOW vacuum_freeze_table_age; -- 默认 150000000
SHOW autovacuum_freeze_max_age; -- 默认 200000000
-- 强制冻结
VACUUM FREEZE users;
-- 事务 ID 回卷警告
-- 当 age 接近 2^31 时,数据库会拒绝新事务
-- ERROR: database is not accepting commands to avoid wraparound data loss
六、并发控制最佳实践
6.1 连接池配置
-- 查看当前连接数
SELECT count(*) FROM pg_stat_activity;
-- 查看最大连接数
SHOW max_connections;
-- 连接池建议配置
-- max_connections = 100-200(通常够用)
-- 使用 PgBouncer 等连接池中间件
-- PgBouncer 配置示例
-- [databases]
-- mydb = host=127.0.0.1 port=5432 dbname=mydb
--
-- [pgbouncer]
-- pool_mode = transaction
-- max_client_conn = 1000
-- default_pool_size = 20
6.2 长事务问题
-- 查看长时间运行的事务
SELECT
pid,
now() - xact_start AS duration,
state,
query
FROM pg_stat_activity
WHERE state != 'idle'
AND xact_start IS NOT NULL
ORDER BY duration DESC;
-- 查看阻止 VACUUM 的事务
SELECT
pid,
now() - xact_start AS duration,
query
FROM pg_stat_activity
WHERE backend_xid IS NOT NULL
OR backend_xmin IS NOT NULL
ORDER BY xact_start;
-- 终止长事务
SELECT pg_terminate_backend(pid);
-- 设置空闲事务超时
SET idle_in_transaction_session_timeout = '5min';
-- 全局配置
-- idle_in_transaction_session_timeout = 300000 # 5分钟
6.3 监控与告警
-- 创建锁等待监控视图
CREATE VIEW v_lock_waits AS
SELECT
waiting.pid AS waiting_pid,
waiting.query AS waiting_query,
waiting_locks.mode AS waiting_mode,
blocking.pid AS blocking_pid,
blocking.query AS blocking_query,
blocking_locks.mode AS blocking_mode,
waiting.wait_event_type,
waiting.wait_event,
now() - waiting.query_start AS wait_duration
FROM pg_stat_activity waiting
JOIN pg_locks waiting_locks ON waiting.pid = waiting_locks.pid
JOIN pg_locks blocking_locks ON
waiting_locks.locktype = blocking_locks.locktype
AND waiting_locks.database IS NOT DISTINCT FROM blocking_locks.database
AND waiting_locks.relation IS NOT DISTINCT FROM blocking_locks.relation
AND waiting_locks.pid != blocking_locks.pid
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT waiting_locks.granted;
-- 定期检查长锁等待
SELECT * FROM v_lock_waits WHERE wait_duration > interval '10 seconds';
-- 检查表膨胀(死元组过多)
SELECT
schemaname || '.' || relname AS table_name,
n_live_tup,
n_dead_tup,
round(n_dead_tup * 100.0 / NULLIF(n_live_tup + n_dead_tup, 0), 2) AS dead_ratio,
last_vacuum,
last_autovacuum
FROM pg_stat_user_tables
WHERE n_dead_tup > 10000
ORDER BY n_dead_tup DESC;
七、总结
隔离级别选择
| 场景 | 推荐隔离级别 |
|---|---|
| 普通 OLTP | Read Committed |
| 报表查询 | Repeatable Read |
| 金融交易 | Serializable |
| 批量导入 | Read Committed |
锁使用原则
- 最小化锁范围:只锁定必要的行
- 固定加锁顺序:避免死锁
- 缩短事务时间:减少锁持有时间
- 使用合适的锁类型:FOR SHARE vs FOR UPDATE
- 考虑 SKIP LOCKED:适用于任务队列场景
性能优化要点
- 避免长事务:设置超时,定期监控
- 定期 VACUUM:清理死元组,防止 ID 回卷
- 使用连接池:减少连接开销
- 监控锁等待:及时发现阻塞问题
- 合理设置超时:lock_timeout, statement_timeout