Twitchの配信開始をDiscordに通知するアプリ


1.経緯

前回はYouTubeの配信予約をDiscordに通知するアプリを作成した。

それならTwitchもいけるんじゃね?と調べていたらtwitchAPIなるものがあったので試しに作ってみた。
YouTubeとの違いは、

  • リクエストの制限が無いっぽい?ちゃんと調べていない(とりあえずは良識的な間隔で問い合わせる。60秒ごととか)

  • 配信枠というものが無いため(一応スケジュールはあるが…)、配信開始/終了の状態を見る程度でよい。そのためYouTube版よりはだいぶシンプルなコードになった。


2.環境

OS: Ubuntu 22.04.4
言語: Python 3.10.12
API: twitchAPI 4.2.1(4.2系の記事がなかなか見つからなかった)
       Discord.py 2.3.2


3.事前準備

  • Twitch Client ID、Twitch Client Secret
    Twitch Developersに登録。
    参考:

  • Discord Bot Token
    参考:

  • 通知するDiscordのチャンネルID
    Discord Bot Tokenを作成する際にどこかのサーバーに参加させているはず。そのサーバー内のテキストチャンネルのIDをコピーする。


4.アプリ

Discordアプリのサンプルを基にする。
my_background_taskメソッドの中でDiscordのチャンネル情報の取得と、twitchAPIを呼び出す。一連の処理が終わったらcloseを呼び出す。

import datetime
import discord
from discord.ext import tasks
from twitchAPI.twitch import Twitch
from twitchAPI.helper import first

...

@tasks.loop(seconds=fetch_interval_sec)  # task runs every 60 seconds
async def my_background_task(self):
    self.channel = self.get_channel(discord_channel_id)
    twitch = await Twitch(twitch_client_id, twitch_client_secret)
    ...
    await twitch.close()

Twitchの対象ユーザーの配信情報を取得。

user_info = await first(twitch.get_streams(user_login=twitch_user))

get_streamsの結果(user_info)が空であれば配信情報が取れない(=配信していない)。
逆にuser_infoにデータが入っていれば配信中となる。

 # check streaming
 if user_info is not None:
     # streaming
     send_to_discord_message = f"{twitch_user} が配信を開始しました。"
     await self.channel.send(send_to_discord_message)

後はuser_info内に配信情報が入っているので各々欲しい情報を取り出す。(ここは公式APIリファレンス見た方が早い)

このままだと@tasks.loop(seconds=XX)のXX秒の度にDiscordへメッセージが飛ぶので、配信開始した時だけ通知をするようにする。
方法は色々あるが、1つは送信前にDiscordのメッセージを検索する処理を入れておき、最初の配信開始検知時にDiscordのメッセージに配信開始時間をくっつけて送信しておく。メッセージ検索で該当する日時のメッセージが無ければ送信、あれば送信しないという判別方法。
他には送信フラグをユーザー+配信開始時間ごとに管理する方法などなど…

    # check streaming
    if user_info is not None:
        # streaming
        starttime_jst = user_info.started_at + datetime.timedelta(hours=9)
        starttime_jst_str = starttime_jst.strftime('%Y/%m/%d %H:%M:%S')
        message = await self.check_discord_channel_history(starttime_jst_str)
        if message is None:
            # send to discord
            send_to_discord_message = f"[{starttime_jst_str}] {twitch_user} が配信を開始しました。"
            await self.channel.send(send_to_discord_message)


async def check_discord_channel_history(self, search_text) -> discord.Message:
    messages = [message async for message in self.channel.history(limit=50)]
    for message in messages:
        if search_text in message.content:
            return message
    return None

以下参考。


dockerで動かしたり、GCEで動かしたりは前回の記事そのまま参考に出来るので割愛。

以上。

この記事が気に入ったらサポートをしてみませんか?