[ロボ実験記録] pythonによるpipetty proの制御

概要

シリアル通信でリモート制御できるマイクロピペットの使い方を勉強します

https://www.icomes.co.jp/product/pipetty/


ハードウェア

  • pipetty pro, usbによるシリアル通信バージョンを購入しました

    • bluethooth版もあるようですが、電源の安定性を鑑み、今回はUSBの有線接続にしました(microUSBでつなぎます)

    • 特注のようです。値段は会社に問い合わせ

セットアップ

以下のサイトからUSBドライバをインストール

通信周り

  • usb/bluetoothを介したシリアル通信で制御します

  • 具体的なコマンドは、購入すると付属するpdfについてきます

    • シリアル通信のことを知っている、専門家向けの資料という印象です

  • コマンド自体はconfidentialとのことなので、本記事でも触れ(られ)ません

  • Serial port monitorというソフトウェアを用い、pipettyを購入すると付属するソフトの通信状況を確認しながらプログラムを作りました

プログラム

マニュアルをもとに、制御コマンドを作っていきます (以下、serial_messages.pyの抜粋となります。コマンドはマニュアルを見ながら作ります)

ascii_dict = {
    "NUL": 0x00,
    "SOH": 0x01,
    # ...
}


r_ascii_dict = {v: k for k, v in ascii_dict.items()}


ask_version_message = [ascii_dict["SOH"],  # ...
                       ]

home_message = [ascii_dict["SOH"],#...
                ]

"""
同様に、
get_timeout_setting_message
get_vacuum_command
を定義
"""

Pipetty.pyクラス

import serial
from .serial_messages import get_timeout_setting_message, home_message, get_vacuum_command, ascii_dict, r_ascii_dict
import time
import numpy as np


class Pipetty:
    def __init__(self, port="COM4") -> None:
        self.port = port
        self.initilized = False
        self.max_volume = 1000
        self.min_volume = 5
        self.current_volume = 0
        self.init()

    def init(self, force=False) -> None:
        if self.initilized:
            print("pipetty is already initilized")
            if not force:
                print("force init port")
            else:
                return
        self.ser = serial.Serial(self.port, 31250, timeout=1)
        self.initilized = True

    def check_connect(self) -> None:
        check_connect(self.ser)

    def set_timeout(self):
        # set timeout = inf. (avoid sleep)
        message = get_timeout_setting_message()

        r = send_message(self.ser, message,)
        return

    def home(self):
        # do homing twice, just in case
        # _ = send_message(self.ser, home_message)
        # time.sleep(1)
        res = send_message(self.ser, home_message)
        self.current_volume = 0
        return


    def vacuum(self, volume):
        if volume < self.min_volume or volume > self.max_volume+volume:
            raise ValueError(
                f"volume (={volume}) should be between 5 and 1000")

        message = get_vacuum_command(volume)
        res = send_message(self.ser, message,)

        self.current_volume += volume
        return res


def check_connect(ser):
    ser.write(serial.to_bytes([ascii_dict["ENQ"]]))

    incoming = ser.read(1)
    if incoming == serial.to_bytes([ascii_dict["ACK"]]):
        print("Success!: ACK received")
    elif incoming == serial.to_bytes([ascii_dict["NAK"]]):
        print("Error!: NAK received")


def calculate_checksum(buffer):
    sum = 0
    for byte in buffer:
        sum ^= byte
    sum |= 0x80
    return sum


def send_message(ser, message, add_sum=True,
                 sleep_time=1,
                 response_start=0):
    if add_sum:
        checksum = calculate_checksum(message)
        message.append(checksum)
        message.append(ascii_dict["EOT"])

    # メッセージの送信
    ser.write(serial.to_bytes(message))

    # 応答の待機と表示
    received_bytes = []
    time.sleep(sleep_time)
    count = 0
    while ser.in_waiting > 0:
        count += 1
        if count < response_start:
            continue

        incoming = ser.read(1)
        received_bytes.append(incoming)

    return received_bytes


def decode_bytes(received_bytes):
    for byte in received_bytes:
        value = int.from_bytes(byte, byteorder='big')
        if value in r_ascii_dict:
            print(f"Received: {r_ascii_dict[value]}")
        else:
            print(f"Received unknown value: {value}")


def int_to_hex_message(message):
    message_hex = [hex(byte) for byte in message]
    return message_hex
  • init

    • シリアルポートの初期化

  • check_connect

    • 通信ができているかどうかの確認

  • home

    • 全ての液体を吐き出して、ピペットをホームポジションに戻す

  • vacuum

    • 指定量の液体を吸い取る

  • set_timeout

    • デフォルトだと、しばらくコマンドを送らないとsleep状態になるので、timeout=∞ に設定

本当は、シリアル通信のresponseをもとに、ピペット操作がうまく行ったかどうかの確認を行うべきですが、今回は省略しました

実行

from pipetty.Pipetty import Pipetty

#初期化
pipetty = Pipetty()

#通信確認
pipetty.check_connect()

#タイムアウト設定
pipetty.set_timeout()

#10 uL、吸い取り
pipetty.vacuum(10)

#吐き出し
pipetty.home()

動作の様子は以下の感じです。


Dobot magicianなどのロボットアームと組み合わせれば、最低限の実験操作を行えます


おまけ

ピペットの脱着を検討中です。 追加のモーター類がなくても、どうにかなりそうです。


この記事が気に入ったらサポートをしてみませんか?