見出し画像

protolink開発秘話、ROSConJPの裏側

はじめに

OUXT Polaris社会人メンバーの片岡です。
皆様ROSCONJP2024お疲れ様でした!
今回も多数の興味深い発表が聞けて大変楽しい一日でした。

今回学生メンバーの松崎と二見がROSCONJP2024に参加し発表してくれた内容は10分では少し物足りないのでより深い内容や今後それをどのようにロボット開発に活かしていくかについて記事にまとめます。

protolink

今回の発表では、マイコンとROS 2の新しい接続方法を提案しました。
従来ROS 2とマイコンをつなぐ方法には様々な方法が提案されています。
これらの手法は主に2つに大別されます。

いわゆる組み込みROS 2を使うもの

この方法のメリットはPlug & PlayでROS 2とつながることです。
ROS 2はDDS規格に準拠した通信を行うため規格に準拠したパケットを出せればマイコンだろうと通信に参加することが可能です。
これによってmROS 2やmicro-ROSで作られたロボットのファームウェアはROS 2 topicを受信してモーターを回したりすることができます。

TCPやUDPなどで通信するもの

この方法は、最も適切な通信プロトコルを自分で選定できるメリットがあり、デバイスや使用条件などに合わせて最適なスタックを構築できます。
LiDARなどのセンサデバイスもUDPなどでPCにデータを送っていてそれをROS 2ドライバで受信、Topicとして点群情報を出力しているのである意味これと同種の構造をしています。
この手法には「通信自体はROS 2に依存しない」というメリットがあります。
そのためROS 2では一筋縄では行かないNAT超えなどもかんたんに実装できますし、ファームウェアを可能な限り薄いスタックでつくることでバグを減らすことにも繋がります。
この利点を最大限活用するため、OUXT Polarisでは当初採用していたmROS 2から独自開発した通信ライブラリ「protolink」に移行しました。

OUXT製通信ライブラリ、protolink

protolinkはROS 2とprotobufをつなぐライブラリです。
protocol buffersはGoogleによって開発されたOSSでROS 2のようにスキーマを定義して様々なプログラミング言語間でデータをやり取りするものです。

protocol buffersは様々なWebサービスで使用されている実績のあるOSSであり、nanopbというマイコン向け実装もあることから今回採用に至りました。

protolinkの通信ルールは極めてシンプルです。
ただprotobufでシリアライズしたデータを何らかのプロトコルで投げるだけです。
そしてROS 2からprotocol buffersのスキーマ定義を自動生成できればROS 2からtopicを出せば簡単にマイコンに命令ができるという発想です。
また、マイコン間でprotocol buffersに準拠したバイナリを投げ合うことができればマイコン間通信もより簡単にデコードできるようになり開発が加速することを期待しています。

protolinkで想定しているユースケース

protolinkで想定しているユースケースは主に下記の4つです。

想定しているユースケース

マイコン間通信にUDPを使用し、ブロードキャストパケットを用いれば制御PCでそのパケットをキャプチャ、解釈してマイコンからの情報を簡単にROS 2の世界に持ってこれるようになります。
ファームウェア開発で難しい「機能を後付する」行為もやりやすくなります。どういうことかといいますとROS 2ベースの操船システムをやめてラジコンプロポによる操船システムを導入したくなったらラジコンプロポの信号受信回路をマイコンにつなぎ、ROS 2ベースの操船システムのときに使っていたprotocol buffersと同じデータを出力するようにすれば良いのです。
こうすれば少しずつ機能を改良したり後付したりすることができます。

ロボット間通信においてもROS 2はROS_DOMAIN_IDを設定するだけでつながる非常に便利なツールですがあまりにも便利につながりすぎるツールはヒューマンエラーの原因になりますし、tfに大量のprefixがついてしまい難解になります。

OUXT PolarisではMini-VとWAM-Vの2種類の船体を開発していますが、それらはハードウェアが異なります。
しかし、ソフトウェアは可能な限り一本化したいです。
そこで共通のインターフェース設計を行い、WAM-VとMini-Vに同じprotocol buffersのメッセージを送れば同じ挙動をするようにファームを実装しておけばROS 2のソースコードはほぼ共通化可能です。

遠隔操作やテレメトリに関してもprotolinkは活躍します。
protolinkはprotocol buffersの定義ファイル(拡張子は.proto)を生成しているので送受信対象はマイコンやPCにとどまりません。
protocol buffersはWeb業界でも幅広く使われているため情報をインターネットに送ったり、遠隔地から船にコマンドを送ったりと様々な目的に使えます。

protolinkによるモータ制御の裏側

上の動画はROSConJP 2024で上映されたデモ映像になります。
このときどのようなハードウェア、ソフトウェアでモーターが回っていたかを説明します。

ハードウェア構成

使用しているモーター

OUXT Polarisで使用しているモーターLACOMETA社製24V 最大50AのDCモーターです。
DCモーターなので回転速度の調整はPWMで実現しています。
このモーターを後部に2つ取り付けることで前進後退と左右旋回の動きを実現します。
モーター1つで最大1200Wと非常に強い力が発生してフルパワーをだすと船体が転覆してしまう恐れがあるためファームウェア内部でPWMのDuty比に制限を設け30%以上の出力を出さないようにしています。

モーター駆動Box全体像

モータードライバーはモータ駆動Boxに収められています。
モータ駆動Boxは大会で使う船体の浮きの上に取り付けられるようになっています。
タカチ製防水Boxの中に駆動電源に使う12V鉛蓄電池が2つ収められており、非常にコンパクトな構造になっています。

船体にモーター駆動Boxを組み付けた様子

RobotX Challenge 2018のときの船体では船体中央に緑のラッシングベルトで吊り下げられた黒い箱に大量(記憶に間違いがなければ10個数以上)の鉛蓄電池が収められており、交換するだけで数時間必要でしたが、この改善により整備性が大幅に高まった上重心が下がったため制御性が大きく向上しました。

RobotX 2018参加時の船体

モーター駆動Boxには制御PCからのハートビートが途切れた場合に緊急停止リレーを操作するM5 Stackとモータードライバー基板にPWM波形を入力するSTM32 F767ZIの2つのマイクロコントローラが取り付けられています。
OUXT Polarisで開発しているロボットは大型ロボットであるため体内ネットワークはすべてイーサネットで統一しています。
制御PCからのハートビートが切れて非常停止装置が働くとモーター駆動系の電源が切断され船は安全に停止するよう設計されています。
回路周りの工夫に関しても伝えたいことがいっぱいあるのでまた後日記事にしたいと思います。

STM32とモータドライバ基板

ROSConJPで発表したprotolinkという通信ライブラリはこのSTM32 F767ZIに書き込まれるファームウェアと深く関連します。

ソフトウェア構成

実験時の計算機の接続関係はこのようになっています。

実験時の計算機の接続関係

ROS 2の入ったノートPCにsensor_msgs::msg::Joyを受信してprotolinkに対応したUDPパケットを出力する構造をしています。
joystickの入力取得の部分にはROS 2のjoyパッケージを使用していますが、autorepeat_rateを設定し一定時間ごとに必ずJoy Stickの情報が送信されることを保証することでジョイスティックの死活監視と緊急停止も実現しています。

ロボットの遠隔操作システムと安全

今回の実験で遠隔操作系は工期短縮のためROS 2で実装しましたが本当に遠隔操作系はROS 2で実装されるべきでしょうか?
絶対にそんなことはありません。
ROS 2ベースのシステムが立ち上がっていないとオーバーライドすらできないのはいざというときに非常停止や緊急回避が遅れることにつながるため安全に大きな問題があると言えます。
ですので、「船体を静止させての実験などでは素早く研究開発するためにROS 2と簡単に通信できる」が「船体が動いている実験では安全のためにROS 2依存がない方法でマイコンと通信ができる」という2つの条件を同時に満たす必要があります。
これを組み込みROS 2で満たすのは二種類の通信プロトコルを実装せねばならず非常に大変です。
この両立のためOUXT PolarisではROS 2メッセージからマイコン通信で使えるスキーマ定義ファイルを生成できないか、というアプローチでこの問題に対処しました。

protolinkの仕組み

メッセージ生成と変換

protolinkは上記リポジトリでApache-2.0ライセンスで公開されています。
このパッケージに含まれるのはほぼpythonスクリプトとcmakeスクリプトです。
ユーザーとしてprotolinkを使うのは使い方は簡単です。このcmake関数を実行すれば自動的にprotocol buffersのメッセージが生成されます。

add_protolink_message_from_ros_message("geometry_msgs" "PoseStamped")

非常に簡単ですね!第1引数にパッケージ名、第2引数を渡せば終わりです。
この関数はこちらのcmakeスクリプトをチェックすれば挙動がわかります。
内部挙動は非常にシンプルでROS 2のrosidl_generator_pyを使ってprotocol buffersのメッセージ定義を自動生成するpythonスクリプトをcmakeから実行しています。
変換過程でROS 2のメッセージからprotocol buffersのメッセージの相互変換のためのC++ソースコードを生成する必要がある箇所があり、pythonのテンプレートエンジンであるJinja2を用いたソースコード生成が行われます。
たったこれだけの記述で任意のROS 2 <=> protocol buffersの相互型変換ができてしまうのは便利ですね。

#ifndef {{ include_guard }}
#define {{ include_guard }}

#include <rclcpp/rclcpp.hpp>

#include <{{ ros2_header }}>
#include <{{ proto_header }}>

{% for conversion in conversions %}
{{ conversion.ros2 }} convert(const {{ conversion.proto }} & message);
{{ conversion.proto }} convert(const {{ conversion.ros2 }} & message);

template <>
struct rclcpp::TypeAdapter<{{ conversion.proto }}, {{ conversion.ros2 }}>
{
  using is_specialized = std::true_type;
  using custom_type = {{ conversion.proto }};
  using ros_message_type = {{ conversion.ros2 }};

  static void convert_to_ros_message(const custom_type & source, ros_message_type & destination)
  {
    destination = convert(source);
  }

  static void convert_to_custom(const ros_message_type & source, custom_type & destination)
  {
    destination = convert(source);
  }
};

{% endfor %}

#endif // {{ include_guard }}
#include <{{ conversion_header }}>

{% for conversion in conversions %}
{{ conversion.ros2 }} convert(const {{ conversion.proto }} & message)
{
    {{ conversion.ros2 }} ros2_message;
    {% for builtin_type in conversion.members.builtin_types %}
    ros2_message.{{builtin_type}} = message.{{builtin_type}}();
    {% endfor %}

    {% for user_type in conversion.members.user_types %}
    ros2_message.{{user_type}} = convert(message.{{user_type}}());
    {% endfor %}

    return ros2_message;
}

{{ conversion.proto }} convert(const {{ conversion.ros2 }} & message)
{
    {{ conversion.proto }} proto_message;
    {% for builtin_type in conversion.members.builtin_types %}
    proto_message.set_{{builtin_type}}(message.{{builtin_type}});
    {% endfor %}

    {% for user_type in conversion.members.user_types %}
    *proto_message.mutable_{{user_type}}() = convert(message.{{user_type}});
    {% endfor %}
    return proto_message;
}
{% endfor %}

今後はあまり使うこともないかもしれませんが、念の為protocol buffersのメッセージ定義をマニュアルで行えるモードも存在します。
実は今回の実験はそちらのモードで操作させていたりします。

マイコン向けソースコード生成

ここまでくればprotocol buffersの定義ファイルをnanopbに入力し、マイコン環境向けのソースコード生成を行います。
こうすることでマイコンファーム開発者は生成されたソースコードをコピペすればROS 2と通信を行うことができます。
あとはマイコンファーム内部で外部から受信したprotocol buffersのデータを処理してモーターをまわしたり緊急停止したりするプログラムを書けば完璧です。モーターのファームウェアも当然OSSです。
ぜひ実装の参考にしてください!

受信と結果の反映処理を行っているのは下記の部分です。
UDP通信についてはほぼこの手順のままです。

  for(;;)
  {
	uint8_t raw_message_buffer[communication_Command_size];
	memset(raw_message_buffer, 0, sizeof(raw_message_buffer));
	lwip_recvfrom(socket, (uint8_t*) raw_message_buffer, sizeof(raw_message_buffer), (int) NULL, (struct sockaddr*) &rxAddr, &len);


	int index = -1;
	for (int i = communication_Command_size - 1; i >= 0; i--) {
	  if (raw_message_buffer[i] != 0) {
	    index = i;
	    break;
	  }
	}
	communication_Command message = communication_Command_init_zero;
	// If this case, it means all fields in message is zero.
	if(index == -1) {
	  motorSetSpeed(&motor, 0.0, 0.3);
	}
	else {
	  uint8_t *message_buffer = (uint8_t *)malloc((index + 1) * sizeof(uint8_t));
	  memcpy(message_buffer, raw_message_buffer, (index + 1) * sizeof(uint8_t));
	  pb_istream_t istream = pb_istream_from_buffer(message_buffer, (index + 1) * sizeof(uint8_t));
	  if (pb_decode(&istream, communication_Command_fields, &message)) {
		if(message.emergency_stop) {
		  motorSetSpeed(&motor, 0.0, 0.0);
		}
	    else {
		  motorSetSpeed(&motor, message.thrust, 0.3);
	    }
      }
	  free(message_buffer);
	}
	osDelay(10);
  }

各所でわずかに不可解な箇所がありますが一つ一つ解説していきます。
まず、最初に

uint8_t raw_message_buffer[communication_Command_size];

でprotocol buffersの定義からわかる最大サイズのメモリの大きさをもとに通信用バッファを用意します。
しかしこれは「最大」のサイズであり常にその長さで受信されるわけではありません。
protocol buffersでは0を出力する場合は出力されないという挙動をするため必要な受信バッファのサイズは常に一定ではありません。
また、すべてのフィールドが0であった場合長さ0の配列が出力されます。
そしてnanopbはピッタリの長さの配列を入力しないと正しくデコードできないため、pb_decode関数でデコードを行う前に「今回のprotocol buffersで送られてきた配列で使用しているメモリはいくつなんだ」というのを求める必要があります。
そのための行が下記になります。

	int index = -1;
	for (int i = communication_Command_size - 1; i >= 0; i--) {
	  if (raw_message_buffer[i] != 0) {
	    index = i;
	    break;
	  }
	}
	communication_Command message = communication_Command_init_zero;
	// If this case, it means all fields in message is zero.
	if(index == -1) {
	  motorSetSpeed(&motor, 0.0, 0.3);
	}

筆者はこれを実装するときに時間がなかったので規格書を熟読するのではなくpythonのprotocol buffers実装で様々な値をシリアライズしてみてバイトの値がどうなるかを確認しました。
その結果、protocol buffersは値とメモリのオフセットが交互になったようなデータ構造をしており最後に0が来ることはありえなさそうということを突き止めたためこのようなナイーブな実装となっています。
ROSConJPも終わってある程度落ち着いたら規格書を読みたいところですね。
配列の長さ推定が終わったらいよいよデコードです。
pb_decode関数を利用してデコード部分を実装します。

pb_decode(&istream, communication_Command_fields, &message)

これでデコードが完了し、変数messageに目標スラストなどの情報が入りました。

ROS 2側のソースコード

void UsvControllerComponent::controlFunction()
{
  std::lock_guard<std::mutex> lock(mtx_);
  if (!joy_subscribed_) {
    return;
  }
  const auto send_command = [this](const double left_thrust, const double right_thrust) {
    const auto build_command = [](const double thrust) {
      communication::Command command;
      if (std::isnan(thrust)) {
        command.set_thrust(0.0);
        command.set_emergency_stop(true);
      } else {
        command.set_thrust(thrust);
        command.set_emergency_stop(false);
      }
      return command;
    };


    left_thruster_publisher_.send(build_command(left_thrust));
    right_thruster_publisher_.send(build_command(right_thrust));
  };


  switch (control_mode_) {
    case ControlMode::MANUAL:
      send_command(joy_interface_.tiltedStickLY(), joy_interface_.tiltedStickRY());
      break;
    case ControlMode::AUTONOMOUS:
      // @todo input thrust value from velocity controller.
      send_command(0, 0);
      break;
    case ControlMode::EMERGENCY_STOP:
      send_command(std::nan(""), std::nan(""));
      break;
  }
}

ROS 2のソースコードはとてもシンプルです。
ただprotobufの型に詰めてpublishして終わりですね!
これで実際にモーターを遠隔操作できました。

実験中ハマったこと

MACアドレス設定を忘れていた

実験中ハマった問題は「複数台のマイコンを使ってイーサネット通信する場合はイーサネットMACアドレスもちゃんと設定しないといけない」ということでした。
自宅でテストしたときは1台だったため気が付きませんでした。
これをきっちりやらないと混線して右と左を同時に操作できなくなります。
この2つの設定はイーサネットMACアドレスはConnectivity => ETHのタブ、IPアドレスの方はMiddleware & Software Packs => LWIPのタブにあるため全然違う場所にあります。
実験時、pingも正常に帰ってきていたのでIPアドレスさえ設定していれば大丈夫と思っていましたが全く大丈夫ではなかったです。
当たり前といえば当たり前なのですが、忘れているとわかりにくいエラーを発生させるという事象でした。

IPアドレスの設定
イーサネットMACアドレスの設定

boost::asioのポート設定ミス

boost::asioはシリアル通信などで何度もお世話になるライブラリだが、1024番より若い番号のポートを指定した場合ランタイムエラーになってしまいます。

OUXTでは当初1000番ポートで通信していたためエラーとなりました。
boost::asioで機器間通信を実装するときの制約として頭に入れておくとトラブルが少なそうです。

まとめ

今回はROS 2とマイコンを簡単につなぐprotolinkに関してROSConJP2024で時間の都合で言えなかったことをまとめました。
OUXT Polarisでは今後もこのような技術記事や実験レポート等を定期的に執筆し情報発信を行っていく予定です。

メンバーも常時募集しております。
もし話を聞いてみたいことなどございましたらメンバーや公式Xアカウントまでよろしくお願いします。

また、OUXT PolarisではMaritime RobotX ChallengeおよびRoboBoatへの参加に必要な開発費や旅費、輸送費を賄うためスポンサー企業様を募集しております。

現在RoboBoat 2025に向けてスポンサー条件の詳細を決定中であるため少し内容が変わってしまうかもしれませんが、ユニフォームや船体にロゴを入れたり学生メンバーとの交流会などを返礼として想定しております。

OUXT Polarisではロボット業界等で働く社会人メンバーが学生メンバーを育成する体制をとることで、自律航行システム開発を通してロボット工学に明るい人材を増やす活動に取り組んでおります。
もしご賛同いただけましたら現在個人スポンサーシップも条件を整備を行っていますのでぜひご検討いただけますと幸いです。
今後ともOUXT Polarisをよろしくお願いいたします。

この記事が気に入ったらサポートをしてみませんか?