PostgreSQLでPub/Subによる外部プログラム実行

PostgreSQLで、レコード追加時にそのデータを外部プログラムに渡しつつプログラムを実行したい。
二つの方法の検証をしたのでメモ。結論を言うと、二つ目の方法の、Pub/Subパターン(NOTIFY/LISTEN)を利用したプログラム実行が良いと思う。

COPYコマンドをトリガーで利用

このブログで紹介されていたので検証してみた。
●方法
概要としては、レコード追加時、プログラムを直接PostgreSQLに実行してもらう(正確には、PostgreSQLのユーザーに実行してもらう)。
手順は下記。
1.PostgreSQLで関数(function)を作成
2.functionをトリガーに設定
なおfunction作成の際に、プログラム実行にCOPYコマンドを利用する(下記説明)。

COPYコマンドのPROGRAMパラメータを利用するとコマンド実行が出来ます。 本来は返り値をテーブルにコピーするために利用するものなのでしょうが、 今回は任意のコマンドを叩くためだけに利用します。
出典:PostgreSQL SQLで外部プログラムを実行する方法

●具体例
function
下記では、program_001という関数を作成し、関数の中身としては、インサートされたレコードのnameとid(NEW.nameとNEW.id)を連結して、/var/lib/pgsql/program/hoge.rbに渡している。

CREATE OR REPLACE FUNCTION program_001() RETURNS trigger AS $BODY$
BEGIN
  EXECUTE 'COPY (select '''||NEW.name|| ',' ||NEW.id||''') TO PROGRAM ''ruby -Ku /var/lib/pgsql/program/hoge.rb''';
RETURN NEW;
END;
$BODY$ LANGUAGE plpgsql VOLATILE COST 100;

トリガー
下記では、test_tableというテーブルへのインサート時に、先ほど作成したfuncion:program_001()を起動するトリガーを設定。これにより、インサート時に、データを渡しつつプログラムを実行できる。 

CREATE TRIGGER program_run_trg
AFTER INSERT ON test_table
FOR EACH ROW EXECUTE PROCEDURE program_001(); 

●検証結果:動作はするが不可
DBのロールの問題がある。
COPY TOは基本的に管理者権限が必要になる(下記は公式解説)。

COPYコマンドで指定するファイルは、クライアントアプリケーションではなく、サーバが直接読み込み/書き込みを行います。 したがって、それらのファイルは、クライアントではなく、データベースサーバマシン上に存在するか、または、データベースサーバマシンからアクセス可能である必要があります。 さらに、クライアントではなく、PostgreSQLユーザ(サーバを実行しているユーザID)が、アクセス権限と読み書き権限を持っている必要があります。 同様に、PROGRAMで指定されたコマンドは、クライアントアプリケーションではなくサーバにより直接実行されるため、PostgreSQLユーザによって実行可能でなければなりません。 ファイル名またはコマンドを指定したCOPYの実行は、データベースのスーパーユーザのみに許可されています。このコマンドによって、サーバがアクセス権限を持つ全てのファイルの読み込み、書き込みが可能になってしまうためです。
(出典:公式ドキュメント

しかし今回、データをインサートするユーザは、管理者権限がないユーザを想定している。 よって、セキュリティの観点から却下。
なお、ある記事で「COPY」ではなく「\COPY」を使えば良い、などの話もあった。しかしそもそも、COPYをプログラム実行に使うのは、本来の用途から外れているのでは?と思ったため、ここで調査打ち切りにした。

Pub/Subパターン(NOTIFY/LISTEN)

●方法
PostgreSQLでは、Pub/Subパターンの一種であるNOTIFY/LISTEN機能がある。これを使うと、トランザクション時に、LISTEN(Subscribe)しているプログラムにペイロードデータを渡すことが出来る。それにより、データを受け取りつつプログラムを実行できる。
(参考: PostgreSQLのPub/Sub機能とJavaのクライアント実装
今回は実行プログラムはRubyなので、データベースドライバとしてSequelを使った。そしてこのブログの方法をそのまま踏襲した。
手順は下記。
1.PostgreSQLで関数(function)作成
2.functionをトリガーに設定
3.受け取るプログラムの起動

●具体例

ほぼブログ通りだが、プログラム例を掲載する。
function
下記では、notify_eventという関数を作成し、関数の中身としては、レコードをjson形式でペイロード化し、イベントと共に通知する。

 CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
  DECLARE
    record RECORD;
    payload JSON;
  BEGIN
    IF (TG_OP = 'DELETE') THEN
      record = OLD;
    ELSE
      record = NEW;
    END IF;
    payload = json_build_object('table', TG_TABLE_NAME,
                                'action', TG_OP,
                                'data', row_to_json(record));
    PERFORM pg_notify('events', payload::text);
    RETURN NULL;
  END;
$$ LANGUAGE plpgsql;

トリガー
test_tableというテーブルへのインサート時に、先ほど作成したfuncion:notify_event()を起動するトリガーを設定。

CREATE TRIGGER notify_order_event
AFTER INSERT ON test_table
  FOR EACH ROW EXECUTE PROCEDURE notify_event();

受け取るプログラムの例(起動しておく)

# coding: utf-8
require 'sequel'
DB = Sequel.connect(adapter: 'postgres', host: '~~~', database: 'homepage', user: '~~~~', password: '~~~')
puts 'Listening for DB events...'
DB.listen(:events, loop: true) do |_channel, _pid, payload|
  # payloadからidを取得
  hash = JSON.parse(payload, symbolize_names: true)
  id =  hash[:data][:id]
  # 取得したidを元に情報をselect(なお、直接payloadから取得しないのは、文字数制限があるため(8000バイト))
  select_data = DB[:test_table].where(:id=>id).select(:name,:age, :profile).first
  name = select_data[:name]
  age = select_data[:age]
  profile = select_data[:profile]
# このあとごにょごにょ加工する。
end

※ペイロード受信では、いくつか注意点がある(下記)

・ペイロードに乗せられるデータはテキストのみで、バイナリは送受信できません。
・バイナリデータを乗せる場合はencode関数でテキスト形式に変換したり、呼出元アプリでJSON文字列等にシリアライズしてあげる必要があります。
・ペイロードのサイズ上限は8000バイト未満で、これを超えると次のエラーが返ります。
(出典: PostgreSQLのPub/Sub機能とJavaのクライアント実装 )

特に注意するのは、ペイロードサイズの8000バイト制限。テーブル設計によっては、レコード容量が大きくなってしまい8000バイトを越えてエラーになる。なので、推定容量が大きい場合、そのままペイロードから取得するのではなく、idを取得して、selectし直す、など別途取得しなければならない。上記プログラムでは、そのようにしている。

この記事が気に入ったらサポートをしてみませんか?