| 特性 | 流复制 | 逻辑复制 |
|---|
| 复制单位 | 整个实例 | 表/数据库 |
| 数据格式 | WAL 物理日志 | 逻辑变更 |
| 从库可写 | 否 | 是 |
| 跨版本 | 需相同主版本 | 可以 |
| 跨平台 | 需相同平台 | 可以 |
| DDL 复制 | 自动 | 不复制 |
| 性能影响 | 低 | 较高 |
| 复杂度 | 低 | 中等 |
sudo -u postgres psql << EOF
CREATE USER replicator WITH REPLICATION ENCRYPTED PASSWORD 'replica_password';
EOF
cat >> /var/lib/postgresql/data/postgresql.conf << EOF
# 复制配置
wal_level = replica # 必须为 replica 或 logical
max_wal_senders = 10 # 最大 WAL 发送进程数
wal_keep_size = 1GB # 保留的 WAL 大小
max_replication_slots = 10 # 最大复制槽数
hot_standby = on # 从库可读
# 同步复制(可选)
# synchronous_commit = on
# synchronous_standby_names = 'standby1'
EOF
cat >> /var/lib/postgresql/data/pg_hba.conf << EOF
# 复制连接
host replication replicator 192.168.1.0/24 scram-sha-256
EOF
systemctl restart postgresql
systemctl stop postgresql
rm -rf /var/lib/postgresql/data/*
pg_basebackup -h 192.168.1.100 -p 5432 -U replicator -D /var/lib/postgresql/data \
-Fp -Xs -P -R
cat /var/lib/postgresql/data/postgresql.auto.conf
systemctl start postgresql
SELECT
client_addr,
usename,
application_name,
state,
sync_state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
pg_size_pretty(pg_wal_lsn_diff(sent_lsn, replay_lsn)) AS replication_lag
FROM pg_stat_replication;
SELECT
slot_name,
slot_type,
active,
restart_lsn,
confirmed_flush_lsn
FROM pg_replication_slots;
SELECT
pg_is_in_recovery() AS is_standby,
pg_last_wal_receive_lsn() AS receive_lsn,
pg_last_wal_replay_lsn() AS replay_lsn,
pg_last_xact_replay_timestamp() AS last_replay_time,
EXTRACT(EPOCH FROM (NOW() - pg_last_xact_replay_timestamp())) AS lag_seconds;
SELECT CASE
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()
THEN 0
ELSE EXTRACT(EPOCH FROM NOW() - pg_last_xact_replay_timestamp())
END AS replication_lag_seconds;
SELECT
application_name,
sync_state,
sync_priority
FROM pg_stat_replication;
BEGIN;
SET LOCAL synchronous_commit = 'remote_apply';
COMMIT;
SELECT pg_create_physical_replication_slot('standby1_slot');
SELECT * FROM pg_replication_slots;
SELECT pg_drop_replication_slot('standby1_slot');
CREATE PUBLICATION my_publication FOR ALL TABLES;
CREATE PUBLICATION my_publication FOR TABLE users, orders;
CREATE PUBLICATION my_publication FOR TABLE users
WITH (publish = 'insert, update');
pg_dump -h publisher_host -U postgres -s -t users -t orders mydb > schema.sql
psql -f schema.sql
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=192.168.1.100 port=5432 user=replicator password=xxx dbname=mydb'
PUBLICATION my_publication;
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=192.168.1.100 port=5432 user=replicator password=xxx dbname=mydb'
PUBLICATION my_publication
WITH (
copy_data = true,
create_slot = true,
enabled = true,
synchronous_commit = 'off'
);
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
ALTER PUBLICATION my_publication ADD TABLE new_table;
ALTER PUBLICATION my_publication DROP TABLE old_table;
ALTER PUBLICATION my_publication SET (publish = 'insert, update, delete');
DROP PUBLICATION my_publication;
SELECT * FROM pg_subscription;
SELECT
subname,
pid,
relid::regclass,
received_lsn,
last_msg_send_time,
last_msg_receipt_time,
latest_end_lsn,
latest_end_time
FROM pg_stat_subscription;
ALTER SUBSCRIPTION my_subscription DISABLE;
ALTER SUBSCRIPTION my_subscription ENABLE;
ALTER SUBSCRIPTION my_subscription REFRESH PUBLICATION;
DROP SUBSCRIPTION my_subscription;
ALTER SUBSCRIPTION my_subscription REFRESH PUBLICATION WITH (copy_data = true);
ALTER TABLE my_table REPLICA IDENTITY FULL;
ALTER TABLE my_table REPLICA IDENTITY USING INDEX my_unique_index;
SELECT * FROM pg_stat_subscription_stats;
[databases]
mydb_rw = host=192.168.1.100 port=5432 dbname=mydb
mydb_ro = host=192.168.1.101,192.168.1.102 port=5432 dbname=mydb
[pgbouncer]
listen_addr = *
listen_port = 6432
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
default_pool_size = 20
min_pool_size = 5
max_client_conn = 1000
server_connect_timeout = 3
server_login_retry = 3
query_timeout = 300
client_idle_timeout = 0
logfile = /var/log/pgbouncer/pgbouncer.log
log_connections = 1
log_disconnections = 1
"app_user" "SCRAM-SHA-256$..."
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
import random
master_engine = create_engine('postgresql://user:pass@master:5432/mydb')
slave_engines = [
create_engine('postgresql://user:pass@slave1:5432/mydb'),
create_engine('postgresql://user:pass@slave2:5432/mydb'),
]
MasterSession = sessionmaker(bind=master_engine)
SlaveSession = sessionmaker(bind=random.choice(slave_engines))
class RoutingSession:
def __init__(self):
self.master = MasterSession()
self.slave = SlaveSession()
self._use_master = False
def using_master(self):
self._use_master = True
return self
@property
def session(self):
return self.master if self._use_master else self.slave
def query(self, *args, **kwargs):
return self.session.query(*args, **kwargs)
def add(self, instance):
return self.master.add(instance)
def commit(self):
return self.master.commit()
db = RoutingSession()
users = db.query(User).filter(User.status == 1).all()
db.add(User(name='Alice'))
db.commit()
user = db.using_master().query(User).filter(User.id == 1).first()
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSource routingDataSource() {
RoutingDataSource routingDataSource = new RoutingDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
public class RoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}
@Aspect
@Component
public class DataSourceAspect {
@Before("@annotation(ReadOnly)")
public void setReadDataSource() {
DataSourceContextHolder.setDataSourceType("slave");
}
@After("@annotation(ReadOnly)")
public void clearDataSource() {
DataSourceContextHolder.clear();
}
}
scope: postgres-cluster
namespace: /db/
name: node1
restapi:
listen: 0.0.0.0:8008
connect_address: 192.168.1.101:8008
etcd:
hosts:
- 192.168.1.10:2379
- 192.168.1.11:2379
- 192.168.1.12:2379
bootstrap:
dcs:
ttl: 30
loop_wait: 10
retry_timeout: 10
maximum_lag_on_failover: 1048576
postgresql:
use_pg_rewind: true
use_slots: true
parameters:
wal_level: replica
hot_standby: on
max_connections: 200
max_wal_senders: 10
max_replication_slots: 10
wal_keep_size: 1GB
initdb:
- encoding: UTF8
- data-checksums
pg_hba:
- host replication replicator 192.168.1.0/24 scram-sha-256
- host all all 192.168.1.0/24 scram-sha-256
users:
admin:
password: admin_password
options:
- createrole
- createdb
replicator:
password: replica_password
options:
- replication
postgresql:
listen: 0.0.0.0:5432
connect_address: 192.168.1.101:5432
data_dir: /var/lib/postgresql/data
bin_dir: /usr/lib/postgresql/15/bin
pgpass: /tmp/pgpass
authentication:
replication:
username: replicator
password: replica_password
superuser:
username: postgres
password: postgres_password
parameters:
unix_socket_directories: "/var/run/postgresql"
tags:
nofailover: false
noloadbalance: false
clonefrom: false
nosync: false
patronictl -c /etc/patroni/patroni.yml list
patronictl -c /etc/patroni/patroni.yml switchover
patronictl -c /etc/patroni/patroni.yml failover
patronictl -c /etc/patroni/patroni.yml reinit node2
patronictl -c /etc/patroni/patroni.yml pause
patronictl -c /etc/patroni/patroni.yml resume
patronictl -c /etc/patroni/patroni.yml edit-config
patronictl -c /etc/patroni/patroni.yml restart node1
patronictl -c /etc/patroni/patroni.yml reload node1
global
maxconn 1000
log /dev/log local0
defaults
mode tcp
log global
retries 2
timeout connect 3s
timeout server 30s
timeout client 30s
frontend postgres_frontend
bind *:5432
default_backend postgres_master
backend postgres_master
option httpchk GET /master
http-check expect status 200
default-server inter 3s fall 3 rise 2 on-marked-down shutdown-sessions
server node1 192.168.1.101:5432 check port 8008
server node2 192.168.1.102:5432 check port 8008
server node3 192.168.1.103:5432 check port 8008
frontend postgres_replica_frontend
bind *:5433
default_backend postgres_replicas
backend postgres_replicas
option httpchk GET /replica
http-check expect status 200
balance roundrobin
default-server inter 3s fall 3 rise 2 on-marked-down shutdown-sessions
server node1 192.168.1.101:5432 check port 8008
server node2 192.168.1.102:5432 check port 8008
server node3 192.168.1.103:5432 check port 8008
listen stats
bind *:7000
stats enable
stats uri /
patronictl -c /etc/patroni/patroni.yml switchover
pg_ctl promote -D /var/lib/postgresql/data
SELECT pg_promote();
archive_mode = on
archive_command = 'cp %p /archive/%f'
pg_basebackup -h localhost -D /backup/base -Ft -z -P
tar -xzf /backup/base/base.tar.gz -C /var/lib/postgresql/data
cat > /var/lib/postgresql/data/postgresql.auto.conf << EOF
restore_command = 'cp /archive/%f %p'
recovery_target_time = '2024-01-15 10:30:00'
recovery_target_action = 'promote'
EOF
touch /var/lib/postgresql/data/recovery.signal
pg_ctl start -D /var/lib/postgresql/data
pg_basebackup -h master_host -D /var/lib/postgresql/data -U replicator -Fp -Xs -P -R
pg_rewind --target-pgdata=/var/lib/postgresql/data \
--source-server='host=master_host user=replicator'
patronictl -c /etc/patroni/patroni.yml reinit standby_node
| 方案 | 适用场景 | 复杂度 | RTO | RPO |
|---|
| 流复制 + 手动切换 | 小规模、成本敏感 | 低 | 分钟级 | 接近 0 |
| Patroni + etcd | 中大规模生产环境 | 中 | 秒级 | 接近 0 |
| PgBouncer | 读写分离、连接池 | 低 | - | - |
| 逻辑复制 | 跨版本迁移、部分同步 | 中 | - | 可能丢失 |
- 同步复制:关键业务使用同步复制,确保 0 数据丢失
- 复制槽:始终使用复制槽,防止 WAL 被删除
- 监控:监控复制延迟、WAL 积压
- 定期演练:定期进行故障转移演练
- 备份:即使有复制,也要定期备份
- 网络:主从库使用专用网络,减少延迟