9.3 9.4 9.5 9.6 10 11 12 13
阿里云PostgreSQL 问题报告 纠错本页面

46.6. 逻辑解码输出插件

一个示例输出插件可以在PostgreSQL源码树的 contrib/test_decoding 子目录中找到。

46.6.1. 初始化函数

输出插件是通过使用输出插件的名字作为库基本名字动态加载共享库加载的。 使用普通库搜索路径定位该库。为了提供需要的输出插件回调函数,和表明该库就是它需要的输出插件, 提供一个名为_PG_output_plugin_init的函数。 这个函数传递一个需要用回调函数指针填充的结构,以单独动作。

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);

begin_cbchange_cbcommit_cb 回调函数是必需的,startup_cbshutdown_cb是可选的。

46.6.2. 功能

要解码、格式化和输出修改,输出插件可以使用大多数的后端普通框架,包括调用输出函数。 只要只是访问initdbpg_catalog模式中创建的关系, 或是使用下列代码标记为用户提供的目录表,那么只读访问关系就是允许的。

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

任何导致事务ID分配的动作都是禁止的。其中包括写入表、执行DDL修改和调用txid_current()

46.6.3. 输出模式

输出插件回调可以以最近任意的格式传递数据给消耗者。对于一些使用情况,像通过SQL查看修改, 以一种可以包含任意数据的数据类型(比如bytea)返回数据是麻烦的。 如果输出插件只以服务器的编码输出文本数据,可以在通过在 启动回调中设置 OutputPluginOptions.output_modeOUTPUT_PLUGIN_TEXTUAL_OUTPUT 而不是OUTPUT_PLUGIN_BINARY_OUTPUT来声明。在这种情况下, 所有数据必须是服务器的编码,所以一个text数据就可以包含它。 这是以启用断言的编译检查的。

46.6.4. 输出插件回调

输出插件获取关于它需要提供的各种回调中发生的修改的通知。

并发事务以提交的顺序解码,只有属于指定事务的修改在begincommit之间解码。显式或隐式回滚的事务用不解码。 成功的保存点折叠到包含它们的事务中,以它们在该事务中执行的顺序。

注意: 只有已经安全冲刷到磁盘的事务将被解码。这会导致在synchronous_commit 设置为off时,直接跟着pg_logical_slot_get_changes()COMMIT不会立即解码。

46.6.4.1. 启动回调

可选的startup_cb回调在创建复制槽和请求流修改时调用, 独立于准备推出的修改的数量。

typedef void (*LogicalDecodeStartupCB) (
    struct LogicalDecodingContext *ctx,
    OutputPluginOptions *options,
    bool is_init
);

is_init参数在创建复制槽时为真,其他时候为假。 options指向一个选项结构,输出插件可以设置为:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
} OutputPluginOptions;

output_type必须设置为OUTPUT_PLUGIN_TEXTUAL_OUTPUTOUTPUT_PLUGIN_BINARY_OUTPUT。又见第 46.6.3 节

启动回调应该验证ctx->output_plugin_options中的选项。 如果输出插件需要有一个状态,它可以使用ctx->output_plugin_private存储它。

46.6.4.2. 关闭回调

当前一个活动的复制槽不再使用,并且可以用来释放输出插件私有的资源时, 调用可选的shutdown_cb回调。没有必要删除该槽,只是停止了流。

typedef void (*LogicalDecodeShutdownCB) (
    struct LogicalDecodingContext *ctx
);

46.6.4.3. 事务开始回调

在已提交事务开始被解码时,调用需要的begin_cb回调。 中止的事务和它们的内容永远得不到解码。

typedef void (*LogicalDecodeBeginCB) (
    struct LogicalDecodingContext *,
    ReorderBufferTXN *txn
);

txn参数包含关于该事务的元信息,比如它提交的时间戳和它的XID。

46.6.4.4. 事务结束回调

当事务提交被解码时,调用需要的commit_cb回调。 如果已经有任何已经修改了的行,那么用于所有已修改行的change_cb 回调将在此之前调用。

typedef void (*LogicalDecodeCommitCB) (
    struct LogicalDecodingContext *,
    ReorderBufferTXN *txn
);

46.6.4.5. 修改回调

为事务内部每个单独行的修改调用需要的change_cb回调, 它可能是一个INSERTUPDATEDELETE。即使原始命令一次修改了几行,该回调也将为每一行单独调用。

typedef void (*LogicalDecodeChangeCB) (
    struct LogicalDecodingContext *ctx,
    ReorderBufferTXN *txn,
    Relation relation,
    ReorderBufferChange *change
);

ctxtxn参数与begin_cbcommit_cb回调的内容相同,但是加上了关系描述符 relation指向该行的所属关系和一个结构change 描述传递进来的行修改。

注意: 只用用户定义表中的修改是非日志的(参阅UNLOGGED), 并且不是临时(参阅TEMPORARYTEMP)可以使用逻辑解码提取的。

46.6.5. 产生输出的函数

要实际产生输出,输出插件在begin_cbcommit_cbchange_cb回调内部时,可以在ctx->out 中写数据到StringInfo输出缓冲区中。在写入输出缓冲区之前, 必须先调用OutputPluginPrepareWrite(ctx, last_write), 然后完成到缓冲区的写入,必须调用OutputPluginWrite(ctx, last_write) 执行该写入。last_write指示一个特殊的写入是否是该回调的最后写入。

下面的示例显示了如何输出数据到输出插件的消耗者:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);

<
/BODY >