見出し画像

【Jaguar's Blog 2】What is an Extension?

この記事は2023年12月17日にNEM/Symbolのコア開発者であるJaguar氏による記事「What is an Extension?」をChatGPTを用いて翻訳したものです。


Symbolを一定の期間使用したことがあるか、設定ファイルを探索したことがある場合、おそらく「拡張機能」という用語に出会ったことでしょう。拡張機能とは何か、そしてそれらを使用すると何ができるのでしょうか? 🤔 これは、拡張機能を理解し、最終的には独自の拡張機能を構築できるようにするための一連の記事の第一部です。

Catapultは、拡張性を考慮してゼロから構築されました。Catapultは基本的にはプラグインと拡張機能を読み込み、それらのサービスを供給し、実行させるアプリケーションシェルです。これ以上にも少しだけ行うことがあります。例えば、データディレクトリの準備や起動時の保存データの読み込みを管理することなどですが、それほど多くありません。ほぼすべての機能は、そのプラグインと拡張機能に存在しています。

プラグインは、ブロックチェーンのコンセンサスをカスタマイズします。トランザクションタイプ、ステートの変更、およびカスタム検証ルールを追加することができます。ネットワーク内のすべてのノードは、同じ構成で同じプラグインを読み込まなければなりません。

拡張機能はクライアントの機能をカスタマイズします。コンセンサスのルールを変更する以外のあらゆることができます。ネットワーク内のノードは、異なる拡張機能や同じ拡張機能を異なる構成で読み込む可能性があります。実際、異なる種類のノード - Peer、API、Dual - の唯一の違いは、それらが読み込む拡張機能のセットです。


Which Extensions?

Catapultはどの拡張機能を読み込むかをどのように知っているのでしょうか? 🤔 それはすべて構成によるものです。拡張機能の構成ファイルは、拡張機能の名前をキーとし、真偽値(true/false)を値とするINIファイルです。値がtrueの場合、拡張機能は読み込まれます。そうでなければ、読み込まれません。

たとえば、`config-extensions-server.properties` からのこのスニペットでは、`extension.filespooling` が無効化されており、`extension.harvesting` が有効化されていることがわかります。

[extensions]

# api extensions
extension.filespooling = false
extension.partialtransaction = false

# addressextraction must be first because mongo and zeromq depend on extracted addresses
extension.addressextraction = false
extension.mongo = false
extension.zeromq = false

# p2p extensions
extension.harvesting = true
extension.syncsource = true
...

ノードの設定によって、最大で以下の3つの異なる拡張機能構成ファイルが存在します。

`config-extensions-server.properties`: catapult.serverプロセスによって読み込まれる拡張機能。これはブロックチェーンのコンセンサスを強制するために使用されます。
`config-extensions-broker.properties`: catapult.brokerプロセスによって読み込まれる拡張機能。これは緊急性の低い作業を実行するために使用されます。
`config-extensions-recovery.properties`: catapult.recoveryプロセスによって読み込まれる拡張機能。これは壊れたノードを修復するために使用されます。

⚠️ Catapultの柔軟性を考慮すると、拡張機能なしで起動することも可能です。ただし、可能であるからといって、それが良いアイデアであるとは限りません。これはあまり有用ではありません。なぜなら、ノードは何もできなくなるからです。 😬 同様に、ほとんどの有用なケースで有効にすべき他の拡張機能もあります。たとえば、`extension.sync`を読み込まないと、ノードは他のノードからブロックを受信し、処理することができません。もちろん、これらの機能も独自の実装で置き換えることができますが、ほとんどの場合でこれが必要です。

Cross Extension Communication

この時点で、これらのさまざまな部分は何らかの方法で通信する必要があるのではないかと思っているかもしれませんね。もちろん、その答えは「はい」です!

拡張機能が通信するための主な方法は2つあります:

  1. イベントに対する購読と反応

  2. サーバーフックの登録

Subscriptions

Catapultは、拡張機能間でイベントを伝播させるために独自のパブ/サブモデルを使用しています。すべてのイベントはSubscriptionManagerによって管理されます。現在、以下に関連するイベントがサポートされています: ブロック、部分的なトランザクション、未確認のトランザクション、最終化、ノード、状態遷移、およびトランザクションステータス遷移。興味津々な方のために、比較的少数のイベントがあるため、イベントを強く型付けする利点があるため、カスタムのパブサブ実装が選択されました。

使用例として、Mongo拡張から購読が行われているスニペットを以下に示します。

// register subscriptions
bootstrapper.subscriptionManager().addBlockChangeSubscriber(
        io::CreateBlockStorageChangeSubscriber(std::move(pMongoBlockStorage)));
bootstrapper.subscriptionManager().addPtChangeSubscriber(CreateMongoPtStorage(*pMongoContext, *pTransactionRegistry));
bootstrapper.subscriptionManager().addUtChangeSubscriber(
        CreateMongoTransactionStorage(*pMongoContext, *pTransactionRegistry, Ut_Collection_Name));
bootstrapper.subscriptionManager().addFinalizationSubscriber(CreateMongoFinalizationStorage(*pMongoContext));
bootstrapper.subscriptionManager().addStateChangeSubscriber(std::make_unique<ApiStateChangeSubscriber>(
        std::move(pChainScoreProvider),
        std::move(pExternalCacheStorage)));
bootstrapper.subscriptionManager().addTransactionStatusSubscriber(CreateMongoTransactionStatusStorage(*pMongoContext));

このモデルは非常にシンプルですが、同時に非常に強力でもあります。購読だけを使用して構築できる機能がかなりあります。イベントからデータを取得し、変換または転送する必要があるものは、理想的な候補です。MongoとZeroMQの両方の拡張機能は、この購読モデルをベースに構築されています。Mongo拡張は、イベントからデータを抽出してそれらをNoSQLストアに挿入し、より簡単にクエリできるようにします。ZeroMQ拡張は、イベントからデータを抽出してそれらを外部の購読者に公開します。例えば、catapult RESTプロセスのような外部購読者です。

One Way Street

注意深い読者の方々は、これらの拡張機能はイベントを読み取るだけでなく、公開はしないと気づいたかもしれませんね。それは完全に正しいです!これにより、非常に賢いことができるようになります - プロセスの分離! 🎊

もしAPIノードまたはDualノードを実行したことがあるなら、catapult.brokerプロセスが実行されているのを気づいたかもしれません。もし鷹のような目を持っているなら、MongoとZeroMQの拡張機能が読み込まれているのも気づいたかもしれませんね。それは偶然ですか?いいえ!

catapult.serverプロセスで実行できる特別なfilespooling拡張機能があります。この拡張機能はイベントに購読し、イベントデータを直列化してディスクに書き込みます。メッセージキューに慣れている場合、これをキューへの書き込みと考えることができます。

一方で、catapult.brokerプロセスはこれらのファイルを読み取り、イベントデータを再構築して読み込まれた拡張機能に転送します。これはメッセージキューから読み取るのと同等です。catapult.serverプロセスと同様に、catapult.brokerはオーケストレーション以外の何もしません。本当の重労働は、やはり拡張機能が行います。

すべてのイベントデータが別のプロセスであるcatapult.brokerに存在するため、メインのノードプロセスははるかに軽快で安全です。攻撃者がMongoDBで脆弱性を見つけた場合、攻撃者はcatapult.brokerプロセスをダウンさせることしかできませんが、catapult.serverプロセスは影響を受けません。catapult.serverプロセスはすべてのノード操作を実行しているため、catapult.brokerがダウンしてもネットワークには影響しません。さらに、MongoDBの処理が非常に集中的で時間がかかる場合でも、catapult.serverプロセスに影響を与えません。なぜなら、それが別のプロセスで非同期に行われているからです。実際、活動が活発な期間には、ディスク上に処理するためのイベントデータがたまり、catapult.brokerプロセスがcatapult.serverプロセスよりも遅れることがあります。活動が低い期間には、catapult.brokerが追いつくことができます。これは、重要な操作(catapult.serverで)を効率化し、非重要な操作(catapult.brokerで)を分離するために設計されています。クールでしょう!

Hooks

もちろん、すべての拡張機能がこれほど簡単に分離できるわけではありません。時にはデータを処理のために1つの拡張機能から別の拡張機能にプッシュする必要があります。これは事前に定義されたserver hookのセットを介して行われます。

これがどのように機能するかを理解する最も簡単な方法は、ブロックのライフサイクルに関する例を見ることです。

`ServerHooks.h` に戻って、3つの関連するスニペットを見てみましょう。

...

/// Factory for creating a CompletionAwareBlockRangeConsumerFunc bound to an input source.
using CompletionAwareBlockRangeConsumerFactoryFunc = RangeConsumerFactoryFunc<chain::CompletionAwareBlockRangeConsumerFunc>;

...

/// Sets the \a factory for creating a CompletionAwareBlockRangeConsumerFunc bound to an input source.
void setCompletionAwareBlockRangeConsumerFactory(const CompletionAwareBlockRangeConsumerFactoryFunc& factory) {
    SetOnce(m_completionAwareBlockRangeConsumerFactory, factory);
}

...

/// Gets the factory for creating a CompletionAwareBlockRangeConsumerFunc bound to an input source.
const auto& completionAwareBlockRangeConsumerFactory() const {
    return Require(m_completionAwareBlockRangeConsumerFactory);
}

...

まず、関数のプロトタイプを宣言します。次に、それを設定する方法を定義します。最後に、それにアクセスする方法を定義します。SetterおよびAccessorでそれぞれSetOnceRequireが使用されていることに注意してください。これは、このフックはすべての拡張機能全体で正確に1回だけ設定されなければならないことを意味します。他のフックは追加セマンティクスを持っており、push_backAggregateConsumersが使用されています。おそらくお気づきかもしれませんが、これらは集約前にすべての追加が行われている必要があるという注意があります。

😵 それは具体的には何ですか? `CompletionAwareBlockRangeConsumerFactoryFunc` は、`InputSource` を受け入れ、ブロックの範囲(1つ以上のブロック)とその範囲の処理が完了したときに呼び出すべき関数を受け入れる関数のプロトタイプです。さっき読んだことが正確に理解できなくても心配しないでください。これは実際の関数型プログラミングの例です。それ以外の部分はまだ理解できるはずです!

Usage Example

素晴らしい、では、フックが何であるかを見てみましたので、それをどのように使用するかを見てみましょう!

`DispatcherService` は、Completion Aware Block Range Consumer Factory を設定します:

state.hooks().setCompletionAwareBlockRangeConsumerFactory([&dispatcher = *pDispatcher, &nodes = state.nodes()](auto source) {
    return [&dispatcher, &nodes, source](auto&& range, const auto& processingComplete) {
        return disruptor::InputSource::Local == source || !nodes.view().isBanned(range.SourceIdentity)
                ? dispatcher.processElement(ConsumerInput(std::move(range), source), processingComplete)
                : 0;
    };
});

もしブロック(範囲)がローカルノードから(つまり、ローカルノードがそれらをハーベストした場合)または非禁止のリモートノードから取得された場合、それらは `ConsumerInput` 経由で `dispatcher.processElement` に渡されます。これにより、これらのブロックはブロック処理の複数の段階を経て旅立ちます。すべての段階が合格した場合、ブロックはブロックチェーンに追加されます。どれかが失敗した場合、ブロックは拒否されます。これがセットフックであるため、これはプロセス全体で唯一設定されたものです。

Completion Aware Block Range Consumer Factory は2つの異なる場所でアクセスされます。

最初に、ハーベスティングサービスでは、ハーベストされたブロックをプッシュするために使用されます(ローカルの入力ソースを使用)。

ScheduledHarvesterTaskOptions CreateHarvesterTaskOptions(extensions::ServiceState& state) {
    ScheduledHarvesterTaskOptions options;
    options.HarvestingAllowed = state.hooks().chainSyncedPredicate();
    options.LastBlockElementSupplier = [&storage = state.storage()]() {
        auto storageView = storage.view();
        return storageView.loadBlockElement(storageView.chainHeight());
    };
    options.TimeSupplier = state.timeSupplier();
    options.RangeConsumer = state.hooks().completionAwareBlockRangeConsumerFactory()(disruptor::InputSource::Local);
    return options;
}

😵 興味があれば、実際にはここでハーベストされたブロックがプッシュされています。

m_rangeConsumer(model::BlockRange::FromEntity(std::move(pBlock)), [pIsAnyBlockPending = m_pIsAnyHarvestedBlockPending](
    auto,
    auto) {
  *pIsAnyBlockPending = false;
});

コールバックとフラグ `m_pIsAnyHarvestedBlockPending` の使用に注目してください。これにより、以前にハーベストされたブロックがまだ処理中である間はハーベストをスキップします。

次に、同期サービスでは、引き出されたブロックをプッシュするために使用されます(リモートプル入力ソースを使用)。

😵 興味があれば、引き出されたブロックは実際にここでプッシュされています。

// need to use shared_from_this because dispatcher can finish processing a block after
// scheduler is stopped (and owning DefaultChainSynchronizer is destroyed)
auto newId = m_blockRangeConsumer(std::move(range), [pThis = shared_from_this()](auto id, auto result) {
    pThis->remove(id, result.CompletionStatus);
});

処理が完了した際にカスタムロジックを実行するためのコールバックの使用に注目してください。

最後に、`ServerHooks` で、おそらく `blockRangeConsumerFactory` という名前のフックに気付いたかもしれません。このフックは `completionAwareBlockRangeConsumerFactory` と非常に似ていますが、処理の完了時の通知は提供しません。それにもかかわらず、このフックは他のノードからプッシュされたブロック(リモートプッシュ入力ソースを使用)をブロックディスパッチャに転送するために使用されます。ユーティリティ関数 `CreateBlockPushEntityCallback` は、ここで同期ソースサービスによって使用される適切な `BlockRangeConsumerFunc` を作成します。


この記事が気に入ったらサポートをしてみませんか?