Skip to content

Latest commit

 

History

History
1135 lines (1014 loc) · 50.2 KB

File metadata and controls

1135 lines (1014 loc) · 50.2 KB

skytools的pgq把PostgreSQL当消息队列来使用,londiste3再此基础上实现了表级逻辑复制,同时skytools还开放了API,允许用户自定义消息订阅功能。

本文将介绍一下PGQ的触发器实现,因为有些情况下,你可能不方便安装PGQ。对于这种场景,其实我们可以使用自定义函数来实现同样的功能。

本文的最终效果和使用PostgreSQL logical decode非常类似,只是对性能影响更大,因为用到了触发器,对于不支持logical decode的版本,使用本文提供的复制方法是不错的选择。

触发器记录了所有需要用于回放的信息,包括事务号,schema,table,OP,OLD REC,NEW REC,事务提交时间。回放时,严格按照事务提交顺序进行回放。

DDL需要其他方式来实现,比如事件触发器。

设计思路,还有很多可以改进的点,本文只是一个演示。

  1. 使用HSTORE存储所有跟踪对象的记录。
  2. 记录事务号,如果要按事务来回放,这个是比较有效的。
    如果不按事务来回放,就是最终一致性。
    但是不按事务来同步的话,有一个问题,延迟检测的主外键关系,SQL在目标端操作可能导致失败。
  3. 在数据中加入消费标记,还有更好的方法(例如对记录订阅者的已读状态),使用标记这种方法只能有一个订阅者。
  4. 使用quote_nullable给文本逃逸
  5. 使用quote_ident给关键字逃逸
  6. 使用7张表来记录消息,一周中每天一张表。(如果量大,这个可以改进,比如每个小时一张表)
    (同样可以效仿PGQ,使用另一个进程ticker来切分消息,将消息分组)
  7. 事件触发器,记录DDL语句,并在队列表中记录状态,遇到DDL时,停止往下取消息,修改DDL标记位后,允许继续取消息。
  8. 打开事务提交时间戳track_commit_timestamp,按事务提交顺序取出,rebuild SQL.
    (理论上这样的回放顺序可以确保一致性)
    (track_commit_timestamp是9.5才有的特性,本文暂不支持。BDR也需要依赖事务提交时间戳)
    或者使用clock_timestamp,完全按照SQL执行顺序来回访。本文使用这种方式。
  9. 跨天事务的问题(如果事务跨天,可能导致先执行后半截,再执行前半截,有问题,调用mq.build_sql时,用repeatable read可以解决这个问题。)

创建测试表

CREATE TABLE test (id int primary key, info text, crt_time timestamp(0));

创建hstore extension

create extension hstore;

创建队列schema

CREATE SCHEMA IF NOT EXISTS mq;

创建7个消息队列记录表

CREATE TABLE mq.table_change_rec (
id serial8 primary key,
x_id int8 default txid_current(),  -- 事务号
consumed boolean not null default false,  --  是否已消费
relid oid,  --  pg_class.oid
table_schema name,  -- schema name
table_name name,  --  table name
when_tg text,  --  after or before
level text,  -- statement or row
op text,  --  delete, update, or insert or truncate
old_rec hstore,
new_rec hstore,
crt_time timestamp without time zone  not null,  -- 时间
dbname name,  --  数据库名
username name,  --   用户名
client_addr inet,  --  客户端地址
client_port int    --  客户端端口
);

在时间上创建索引

create index x_id_table_change_rec on mq.table_change_rec(x_id);
create index crt_time_id_table_change_rec on mq.table_change_rec(crt_time,id) where consumed=false;

创建子表

create table mq.table_change_rec0 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec1 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec2 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec3 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec4 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec5 (like mq.table_change_rec including all) inherits(mq.table_change_rec);
create table mq.table_change_rec6 (like mq.table_change_rec including all) inherits(mq.table_change_rec);

创建触发器函数,记录队列

CREATE OR REPLACE FUNCTION mq.dml_trace()
RETURNS trigger
LANGUAGE plpgsql
AS $BODY$
DECLARE
v_new_rec hstore;
v_old_rec hstore;
v_username name := session_user;
v_dbname name := current_database();
v_client_addr inet := inet_client_addr();
v_client_port int := inet_client_port();
v_crt_time timestamp without time zone := clock_timestamp();
v_xid int8 := txid_current();
v_crt_time_min timestamp without time zone := null;
v_dofweek int;
BEGIN

select crt_time into v_crt_time_min from mq.table_change_rec where x_id=v_xid limit 1;
if found then
  v_dofweek := EXTRACT(DOW FROM v_crt_time_min);
else
  v_dofweek := EXTRACT(DOW FROM v_crt_time);
end if;

case TG_OP
when 'DELETE' then 
  v_old_rec := hstore(OLD.*);
  case v_dofweek
  when 0 then
    insert into mq.table_change_rec0 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 1 then
    insert into mq.table_change_rec1 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 2 then
    insert into mq.table_change_rec2 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 3 then
    insert into mq.table_change_rec3 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 4 then
    insert into mq.table_change_rec4 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 5 then
    insert into mq.table_change_rec5 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 6 then
    insert into mq.table_change_rec6 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  end case;

when 'INSERT' then 
  v_new_rec := hstore(NEW.*);
  case v_dofweek
  when 0 then
    insert into mq.table_change_rec0 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 1 then
    insert into mq.table_change_rec1 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 2 then
    insert into mq.table_change_rec2 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 3 then
    insert into mq.table_change_rec3 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 4 then
    insert into mq.table_change_rec4 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 5 then
    insert into mq.table_change_rec5 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 6 then
    insert into mq.table_change_rec6 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  end case;

when 'UPDATE' then 
  v_old_rec := hstore(OLD.*);
  v_new_rec := hstore(NEW.*);
  case v_dofweek
  when 0 then
    insert into mq.table_change_rec0 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 1 then
    insert into mq.table_change_rec1 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 2 then
    insert into mq.table_change_rec2 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 3 then
    insert into mq.table_change_rec3 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 4 then
    insert into mq.table_change_rec4 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 5 then
    insert into mq.table_change_rec5 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  when 6 then
    insert into mq.table_change_rec6 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port)
      values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, coalesce(v_crt_time_min,v_crt_time), v_dbname, v_username, v_client_addr, v_client_port);
  end case;

else
  return null;
end case;

  RETURN null;
END;
$BODY$ strict;

创建触发器,跟踪DML,使用deferred,在事务结束时触发,从而获得事务的结束时间。
所以事务不要使用immediate,否则时间会变成第一条触发的时间。
一个事务有多行触发时,都记录同一个时间,即事务的结束时间。
回放时,也使用事务结束时间顺序回放。

CREATE TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON test DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace()

DDL跟踪使用事件触发器,本文未涉及,下一个版本完善。

测试

insert into test values (1,'test',now());
insert into test values (2,'你好\a\\''',now());
update test set info='new' where id=1;
delete from test where id=2;

查看跟踪信息

postgres=# select tableoid::regclass,* from mq.table_change_rec;
-[ RECORD 1 ]+---------------------------------------------------------------------
tableoid     | table_change_rec2
x_id         | 283001935
consumed     | f
relid        | 24960
table_schema | public
table_name   | test
when_tg      | AFTER
level        | ROW
op           | INSERT
old_rec      | 
new_rec      | "id"=>"1", "info"=>"test", "crt_time"=>"2016-01-05 10:29:10"
crt_time     | 2016-01-05 10:29:09.755149
dbname       | postgres
username     | postgres
client_addr  | 
client_port  | 
-[ RECORD 2 ]+---------------------------------------------------------------------
tableoid     | table_change_rec2
x_id         | 283001936
consumed     | f
relid        | 24960
table_schema | public
table_name   | test
when_tg      | AFTER
level        | ROW
op           | INSERT
old_rec      | 
new_rec      | "id"=>"2", "info"=>"你好\\a\\\\'", "crt_time"=>"2016-01-05 10:29:10"
crt_time     | 2016-01-05 10:29:09.762116
dbname       | postgres
username     | postgres
client_addr  | 
client_port  | 
-[ RECORD 3 ]+---------------------------------------------------------------------
tableoid     | table_change_rec2
x_id         | 283001937
consumed     | f
relid        | 24960
table_schema | public
table_name   | test
when_tg      | AFTER
level        | ROW
op           | UPDATE
old_rec      | "id"=>"1", "info"=>"test", "crt_time"=>"2016-01-05 10:29:10"
new_rec      | "id"=>"1", "info"=>"new", "crt_time"=>"2016-01-05 10:29:10"
crt_time     | 2016-01-05 10:29:09.776981
dbname       | postgres
username     | postgres
client_addr  | 
client_port  | 
-[ RECORD 4 ]+---------------------------------------------------------------------
tableoid     | table_change_rec2
x_id         | 283001938
consumed     | f
relid        | 24960
table_schema | public
table_name   | test
when_tg      | AFTER
level        | ROW
op           | DELETE
old_rec      | "id"=>"2", "info"=>"你好\\a\\\\'", "crt_time"=>"2016-01-05 10:29:10"
new_rec      | 
crt_time     | 2016-01-05 10:29:10.06243
dbname       | postgres
username     | postgres
client_addr  | 
client_port  | 

消费者如何订阅消息
分解步骤,你可以写成函数来实现。
获得表名
根据触发类型封装SQL

1. insert
取new_rec长度,
array_length(hstore_to_matrix(new_rec),1) = 3
列名,
quote_ident(hstore_to_matrix(new_rec))[1][1]) = 'id'
quote_ident(hstore_to_matrix(new_rec))[2][1]) = 'info'
quote_ident(hstore_to_matrix(new_rec))[3][1]) = 'crt_time'
值
quote_ident(hstore_to_matrix(new_rec))[1][2]) = '1'
quote_ident(hstore_to_matrix(new_rec))[2][2]) = 'test'
quote_ident(hstore_to_matrix(new_rec))[3][2]) = '2016-01-04 19:42:36'
......

封装SQL
insert into 表名(列名) values (值);

2. update
取new_rec和old_rec长度,列名,值

3. delete
取old_rec长度,列名,值

写成消费函数如下:
(批量取N行,并保证事务完整性。)

create or replace function mq.build_sql(n int) returns setof text as $$
declare
  m int := 0;
  v_table_change_rec mq.table_change_rec;
  v_tablename name;
  v_crt_time timestamp without time zone;
  curs1 refcursor;
  v_sql text := '';
  v_cols text := '';
  v_vals text := '';
  v_upd_set text := '';
  v_upd_del_where text :='';
  v_xid_mincmt int8 ;
  v_xid_maxcmt int8 ;
begin
  if n <=0 then
    -- raise notice 'n must be > 0.';
    return;
  end if;

  return next 'BEGIN;';

  -- 取一个最小的队列表
  select tablename,crt_time into v_tablename,v_crt_time from 
  (
  select 'table_change_rec0' as tablename,min(crt_time) as crt_time from mq.table_change_rec0 where consumed=false
    union all
  select 'table_change_rec1' as tablename,min(crt_time) as crt_time from mq.table_change_rec1 where consumed=false
    union all
  select 'table_change_rec2' as tablename,min(crt_time) as crt_time from mq.table_change_rec2 where consumed=false
    union all
  select 'table_change_rec3' as tablename,min(crt_time) as crt_time from mq.table_change_rec3 where consumed=false
    union all
  select 'table_change_rec4' as tablename,min(crt_time) as crt_time from mq.table_change_rec4 where consumed=false
    union all
  select 'table_change_rec5' as tablename,min(crt_time) as crt_time from mq.table_change_rec5 where consumed=false
    union all
  select 'table_change_rec6' as tablename,min(crt_time) as crt_time from mq.table_change_rec6 where consumed=false
  ) t 
  order by crt_time limit 1;

case v_tablename

when 'table_change_rec0' then
  -- 取提交时间分别是最大,最小XID,
  select x_id into v_xid_mincmt from mq.table_change_rec0 where consumed=false order by crt_time limit 1 offset 0;
  select x_id into v_xid_maxcmt from mq.table_change_rec0 where consumed=false order by crt_time limit 1 offset n-1;
  -- 如果xid相等,则以xid不限制limit获取 
  -- 否则不取时间最大的XID

if v_xid_mincmt = v_xid_maxcmt then
  open curs1 for select * from mq.table_change_rec0 where consumed=false and x_id=v_xid_maxcmt order by id;
else
  open curs1 for select * from mq.table_change_rec0 where consumed=false and x_id<>v_xid_maxcmt order by crt_time,id limit n;
end if;

fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
  v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;

-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec0 set consumed=true where current of curs1;
return next v_sql;

when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec0 set consumed=true where current of curs1;
return next v_sql;

when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec0 set consumed=true where current of curs1;
return next v_sql;

else
  -- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;

else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;


when 'table_change_rec1' then
  -- 取提交时间分别是最大,最小XID,
  select x_id into v_xid_mincmt from mq.table_change_rec1 where consumed=false order by crt_time limit 1 offset 0;
  select x_id into v_xid_maxcmt from mq.table_change_rec1 where consumed=false order by crt_time limit 1 offset n-1;
  -- 如果xid相等,则以xid不限制limit获取 
  -- 否则不取时间最大的XID

if v_xid_mincmt = v_xid_maxcmt then
  open curs1 for select * from mq.table_change_rec1 where consumed=false and x_id=v_xid_maxcmt order by id;
else
  open curs1 for select * from mq.table_change_rec1 where consumed=false and x_id<>v_xid_maxcmt order by crt_time,id limit n;
end if;

fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
  v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;

-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec1 set consumed=true where current of curs1;
return next v_sql;

when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec1 set consumed=true where current of curs1;
return next v_sql;

when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec1 set consumed=true where current of curs1;
return next v_sql;

else
  -- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;

else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;


when 'table_change_rec2' then
  -- 取提交时间分别是最大,最小XID,
  select x_id into v_xid_mincmt from mq.table_change_rec2 where consumed=false order by crt_time limit 1 offset 0;
  select x_id into v_xid_maxcmt from mq.table_change_rec2 where consumed=false order by crt_time limit 1 offset n-1;
  -- 如果xid相等,则以xid不限制limit获取 
  -- 否则不取时间最大的XID

if v_xid_mincmt = v_xid_maxcmt then
  open curs1 for select * from mq.table_change_rec2 where consumed=false and x_id=v_xid_maxcmt order by id;
else
  open curs1 for select * from mq.table_change_rec2 where consumed=false and x_id<>v_xid_maxcmt order by crt_time,id limit n;
end if;

fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
  v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;

-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec2 set consumed=true where current of curs1;
return next v_sql;

when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec2 set consumed=true where current of curs1;
return next v_sql;

when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec2 set consumed=true where current of curs1;
return next v_sql;

else
  -- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;

else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;


when 'table_change_rec3' then
  -- 取提交时间分别是最大,最小XID,
  select x_id into v_xid_mincmt from mq.table_change_rec3 where consumed=false order by crt_time limit 1 offset 0;
  select x_id into v_xid_maxcmt from mq.table_change_rec3 where consumed=false order by crt_time limit 1 offset n-1;
  -- 如果xid相等,则以xid不限制limit获取 
  -- 否则不取时间最大的XID

if v_xid_mincmt = v_xid_maxcmt then
  open curs1 for select * from mq.table_change_rec3 where consumed=false and x_id=v_xid_maxcmt order by id;
else
  open curs1 for select * from mq.table_change_rec3 where consumed=false and x_id<>v_xid_maxcmt order by crt_time,id limit n;
end if;

fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
  v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;

-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec3 set consumed=true where current of curs1;
return next v_sql;

when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec3 set consumed=true where current of curs1;
return next v_sql;

when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec3 set consumed=true where current of curs1;
return next v_sql;

else
  -- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;

else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;


when 'table_change_rec4' then
  -- 取提交时间分别是最大,最小XID,
  select x_id into v_xid_mincmt from mq.table_change_rec4 where consumed=false order by crt_time limit 1 offset 0;
  select x_id into v_xid_maxcmt from mq.table_change_rec4 where consumed=false order by crt_time limit 1 offset n-1;
  -- 如果xid相等,则以xid不限制limit获取 
  -- 否则不取时间最大的XID

if v_xid_mincmt = v_xid_maxcmt then
  open curs1 for select * from mq.table_change_rec4 where consumed=false and x_id=v_xid_maxcmt order by id;
else
  open curs1 for select * from mq.table_change_rec4 where consumed=false and x_id<>v_xid_maxcmt order by crt_time,id limit n;
end if;

fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
  v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;

-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec4 set consumed=true where current of curs1;
return next v_sql;

when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec4 set consumed=true where current of curs1;
return next v_sql;

when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec4 set consumed=true where current of curs1;
return next v_sql;

else
  -- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;

else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;


when 'table_change_rec5' then
  -- 取提交时间分别是最大,最小XID,
  select x_id into v_xid_mincmt from mq.table_change_rec5 where consumed=false order by crt_time limit 1 offset 0;
  select x_id into v_xid_maxcmt from mq.table_change_rec5 where consumed=false order by crt_time limit 1 offset n-1;
  -- 如果xid相等,则以xid不限制limit获取 
  -- 否则不取时间最大的XID

if v_xid_mincmt = v_xid_maxcmt then
  open curs1 for select * from mq.table_change_rec5 where consumed=false and x_id=v_xid_maxcmt order by id;
else
  open curs1 for select * from mq.table_change_rec5 where consumed=false and x_id<>v_xid_maxcmt order by crt_time,id limit n;
end if;

fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
  v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;

-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec5 set consumed=true where current of curs1;
return next v_sql;

when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec5 set consumed=true where current of curs1;
return next v_sql;

when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec5 set consumed=true where current of curs1;
return next v_sql;

else
  -- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;

else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;


when 'table_change_rec6' then
  -- 取提交时间分别是最大,最小XID,
  select x_id into v_xid_mincmt from mq.table_change_rec6 where consumed=false order by crt_time limit 1 offset 0;
  select x_id into v_xid_maxcmt from mq.table_change_rec6 where consumed=false order by crt_time limit 1 offset n-1;
  -- 如果xid相等,则以xid不限制limit获取 
  -- 否则不取时间最大的XID

if v_xid_mincmt = v_xid_maxcmt then
  open curs1 for select * from mq.table_change_rec6 where consumed=false and x_id=v_xid_maxcmt order by id;
else
  open curs1 for select * from mq.table_change_rec6 where consumed=false and x_id<>v_xid_maxcmt order by crt_time,id limit n;
end if;

fetch curs1 into v_table_change_rec;
LOOP
if found then
-- raise notice '%', v_table_change_rec;
-- build sql
-- case tg insert,update,delete,ddl
-- quote_ident 封装schema,tablename,column
-- quote_nullable 封装value
-- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一)
case v_table_change_rec.op
when 'INSERT' then
-- 组装COLUMNS, VALUES
v_cols := '' ;
v_vals := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || ',' ;
  v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
end loop;
v_cols := rtrim(v_cols, ',') ;
v_vals := rtrim(v_vals, ',') ;

-- 组装SQL
v_sql := 'insert into '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||'('||v_cols||')'||' values('||v_vals||');' ;
-- raise notice '%', v_sql;
update mq.table_change_rec6 set consumed=true where current of curs1;
return next v_sql;

when 'UPDATE' then
-- 组装COLUMNS, VALUES
v_upd_set := '' ;
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.new_rec),1) loop
  v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.new_rec))[i][2]) || ',' ;
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_set := rtrim(v_upd_set, ',') ;
v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'update '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec6 set consumed=true where current of curs1;
return next v_sql;

when 'DELETE' then
-- 组装COLUMNS, VALUES
v_upd_del_where := '' ;
for i in 1..array_length(hstore_to_matrix(v_table_change_rec.old_rec),1) loop
  if quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) = 'NULL' then
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || ' is null ' || ' and';
  else
    v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec.old_rec))[i][2]) || ' and';
  end if;
end loop;

v_upd_del_where := rtrim(v_upd_del_where, 'd') ;
v_upd_del_where := rtrim(v_upd_del_where, 'n') ;
v_upd_del_where := rtrim(v_upd_del_where, 'a') ;

-- 组装SQL
v_sql := 'delete from '||quote_ident(v_table_change_rec.table_schema)||'.'||quote_ident(v_table_change_rec.table_name)||' where '|| v_upd_del_where ||';' ;
-- raise notice '%', v_sql;
update mq.table_change_rec6 set consumed=true where current of curs1;
return next v_sql;

else
  -- raise notice 'I do not known how to deal this op: %', v_table_change_rec.op;
end case;

else
close curs1;
return next 'END;';
return;
end if;
fetch curs1 into v_table_change_rec;
END LOOP;

else
  -- raise notice 'no % queue table deal code in this function.', v_tablename;
  return;

end case;

end;
$$ language plpgsql strict ;

使用这个消费函数,进行数据测试:

postgres=# create database src;
CREATE DATABASE
postgres=# create database dest;
CREATE DATABASE

源端创建hstore,队列表,触发器函数,
(略)

创建测试表,以及触发器

src=# create table test(id int primary key,info text,crt_time timestamp);
CREATE TABLE
src=# CREATE TRIGGER tg AFTER DELETE or INSERT or UPDATE ON test FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace();
CREATE TRIGGER

目标端创建测试表

dest=# create table test(id int primary key,info text,crt_time timestamp);
CREATE TABLE

创建压测脚本

vi test.sql
\setrandom id 1 5000000
insert into test values (:id, md5(random()::text), now()) on conflict ON CONSTRAINT test_pkey do update set info=excluded.info,crt_time=excluded.crt_time;

压测

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 48 -j 48 -T 120 src

同时另外开一个会话数据同步到dest库,每次同步1万条循环

while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql(10000)) to stdout;commit;' | psql dest -f - ; done

同步结束之后,查看两边的HASH值是否一致。

dest=# \c src
src=# select count(*),now() from test;
  count  |              now              
---------+-------------------------------
 3910319 | 2016-01-05 15:26:24.046004+08
(1 row)
src=# select sum(hashtext(test.*::text)) from test;
      sum       
----------------
 -1327225009705
(1 row)

src=# \c dest
dest=# select count(*),now() from test;
  count  |              now              
---------+-------------------------------
 3910319 | 2016-01-05 15:27:27.210017+08
(1 row)
dest=# select sum(hashtext(test.*::text)) from test;
      sum       
----------------
 -1327225009705
(1 row)

已上线业务(已经有数据积累了的)的数据同步的例子:

1. 源端
首先配置以上队列跟踪。(同本文上面的过程)
然后,在一个rr隔离级别的事务中,导出数据。
# begin work isolation level REPEATABLE READ;
BEGIN
-- 记录当前事务快照,最大已提交事务号,最小未分配事务号,未提交事务数组。
# select * from txid_current_snapshot();
 txid_current_snapshot 
-----------------------
 339148965:339148965:
-- 将数据拷贝到文件
# copy table to '';
# 将mq记录表,不需要复制的记录标记为consumed=true,不需要的记录包括 
  事务 <= 最大已提交事务号  
  事务 in ( >最大已提交事务 and <最小未分配事务 and not in (为提交事务数组) )

2. 目标端
导入数据
# copy table from '';
然后就可以使用以上消费函数进行数据复制了。