見出し画像

240306 株ドラゴン値上がりランキングの自動スクレイピングツール


1.概要

このブログ記事では、特定の期間にわたって株価が上昇した銘柄のランキングを自動で収集し、分析するためのPythonスクリプトについて紹介します。このスクリプトは、株式市場における値上がりランキングをスクレイピングし、データをCSV形式で保存する機能を備えています。
株式市場の動向分析や投資判断のための有効なツールです。

2.目的

  1. 効率的なデータ収集: このスクリプトは、期間内の値上がりランキングを自動で取得し、一括でまとめることを目的としています。株式市場のデータ収集の手間を大幅に削減します。

  2. 市場全体の予測: 値上がりランキングの推移から、株式市場全体の動向やトレンドを分析し、将来の市場予測に役立てることを目指しています。市場の健全性や投資環境の評価に有効な情報を提供します。

  3. 連続ランクイン株の追跡: 連続して値上がりランキングに名を連ねる銘柄を効率的に把握することができます。この情報は、安定したパフォーマンスを持つ銘柄を見つけ出すのに役立ちます。

3.コード全文

以下はコードの全文です。
各コードの解説は全文以降に記載します

import tkinter as tk
from tkinter import messagebox, ttk
from threading import Thread
import pandas as pd
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from datetime import datetime, date  # ここでdateをインポート

def on_click():
    sy = start_year_var.get()
    sm = start_month_var.get()
    sd = start_day_var.get()
    ey = end_year_var.get()
    em = end_month_var.get()
    ed = end_day_var.get()

    empty_fields = []
    if not sy:
        empty_fields.append("開始年")
    if not sm:
        empty_fields.append("開始月")
    if not sd:
        empty_fields.append("開始日")
    if not ey:
        empty_fields.append("終了年")
    if not em:
        empty_fields.append("終了月")
    if not ed:
        empty_fields.append("終了日")

    if empty_fields:
        messagebox.showerror("エラー", "\n".join(empty_fields) + "\nが空白です。")
        return
    try:
        sy = int(sy)
        sm = int(sm)
        sd = int(sd)
        ey = int(ey)
        em = int(em)
        ed = int(ed)

        progress_var.set(0)  # プログレスバーをリセット
        thread = Thread(target=lambda: fetch_data(sy, sm, sd, ey, em, ed, progress_var))
        thread.start()
    except ValueError:
        messagebox.showerror("エラー", "年、月、日には数値を入力してください。")
        
        
def fetch_data(start_year, start_month, start_day, end_year, end_month, end_day, progress_var):
    total_days = (datetime(end_year, end_month, end_day) - datetime(start_year, start_month, start_day)).days + 1
    processed_days = 0

    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
    wait = WebDriverWait(driver, 10)

    Increase_Master_Table01 = pd.DataFrame()

    for y in range(start_year, end_year + 1):
        month_start = start_month if y == start_year else 1
        month_end = end_month if y == end_year else 12

        for m in range(month_start, month_end + 1):
            day_start = start_day if m == start_month and y == start_year else 1
            day_end = end_day if m == end_month and y == end_year else 31

            for d in range(day_start, day_end + 1):
                current_date = datetime(y, m, d)
                if current_date > datetime(end_year, end_month, end_day):
                    break

                try:
                    d_str = str(d).rjust(2, "0")
                    url = f"https://www.kabudragon.com/ranking/{y}/{str(m).rjust(2, '0')}/{d_str}/age.html"
                    driver.get(url)
                    wait.until(EC.presence_of_element_located((By.TAG_NAME, "html")))

                    if driver.page_source:
                        data = pd.read_html(driver.page_source)

                        if len(data) > 1 and all(col in data[1].columns for col in [0, 1, 2, 3, 4]):
                            processed_days += 1
                            progress = int((processed_days / total_days) * 100)
                            progress_var.set(progress)
                            continue

                        Increase_Table01 = data[1].iloc[4:].copy()
                        Increase_Table01 = Increase_Table01.iloc[:50]
                        Increase_Table01['日付'] = current_date
                        Increase_Master_Table01 = pd.concat([Increase_Master_Table01, Increase_Table01], ignore_index=True)

                except Exception as e:
                    print(f"Error processing {url}: {e}")

                processed_days += 1
                progress = int((processed_days / total_days) * 100)
                progress_var.set(progress)

    driver.quit()
    
    
    # 現在の日付と時刻を取得
    now = datetime.now().strftime("%Y%m%d_%H%M")
    # ファイル名を生成
    output_file = f"{now}_株ドラゴン値上がりランキング_{start_year}{str(start_month).zfill(2)}{str(start_day).zfill(2)}-{end_year}{str(end_month).zfill(2)}{str(end_day).zfill(2)}.csv"
    Increase_Master_Table01.to_csv(output_file, index=False, encoding='utf-8_sig')
    messagebox.showinfo("完了", "データの取得が完了しました。")

# 以下のGUI部分は変更なし
root = tk.Tk()
root.title("Webスクレイピングアプリ")

main_frame = ttk.Frame(root)
main_frame.pack(padx=10, pady=10, fill='x', expand=True)

# Start Date Section
start_date_label = ttk.Label(main_frame, text="開始期間")
start_date_label.pack()

start_year_frame = ttk.Frame(main_frame)
start_year_frame.pack(fill='x', expand=True)
ttk.Label(start_year_frame, text="年").pack(side=tk.LEFT)
start_year_var = tk.StringVar()
ttk.Entry(start_year_frame, textvariable=start_year_var).pack(fill='x', expand=True, side=tk.LEFT)

start_month_frame = ttk.Frame(main_frame)
start_month_frame.pack(fill='x', expand=True)
ttk.Label(start_month_frame, text="月").pack(side=tk.LEFT)
start_month_var = tk.StringVar()
ttk.Entry(start_month_frame, textvariable=start_month_var).pack(fill='x', expand=True, side=tk.LEFT)

start_day_frame = ttk.Frame(main_frame)
start_day_frame.pack(fill='x', expand=True)
ttk.Label(start_day_frame, text="日").pack(side=tk.LEFT)
start_day_var = tk.StringVar()
ttk.Entry(start_day_frame, textvariable=start_day_var).pack(fill='x', expand=True, side=tk.LEFT)

# End Date Section
end_date_label = ttk.Label(main_frame, text="終了期間")
end_date_label.pack()

end_year_frame = ttk.Frame(main_frame)
end_year_frame.pack(fill='x', expand=True)
ttk.Label(end_year_frame, text="年").pack(side=tk.LEFT)
end_year_var = tk.StringVar()
ttk.Entry(end_year_frame, textvariable=end_year_var).pack(fill='x', expand=True, side=tk.LEFT)

end_month_frame = ttk.Frame(main_frame)
end_month_frame.pack(fill='x', expand=True)
ttk.Label(end_month_frame, text="月").pack(side=tk.LEFT)
end_month_var = tk.StringVar()
ttk.Entry(end_month_frame, textvariable=end_month_var).pack(fill='x', expand=True, side=tk.LEFT)

end_day_frame = ttk.Frame(main_frame)
end_day_frame.pack(fill='x', expand=True)
ttk.Label(end_day_frame, text="日").pack(side=tk.LEFT)
end_day_var = tk.StringVar()
ttk.Entry(end_day_frame, textvariable=end_day_var).pack(fill='x', expand=True, side=tk.LEFT)


# プログレスバーの追加
progress_var = tk.IntVar()
progress_bar = ttk.Progressbar(main_frame, maximum=100, length=200, variable=progress_var)
progress_bar.pack(pady=10)

# Execute Button
execute_button = ttk.Button(main_frame, text="実行", command=on_click)
execute_button.pack(pady=5)

root.mainloop()

4.コードの解説

●インポート
必要な各種ライブラリをインポートします。

import tkinter as tk
from tkinter import messagebox, ttk
from threading import Thread
import pandas as pd
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from datetime import datetime, date  

●日付データの検証と処理開始
関数は、GUIから収集した開始日と終了日の情報を検証し、問題がなければスクレイピングの処理を開始します。入力された年、月、日のデータが適切であるかを確認し、不適切な場合はエラーメッセージを表示します。すべての入力が正しい場合は、別のスレッドでfetch_data関数を実行し、データ収集のプロセスを開始します。

def on_click():
    sy = start_year_var.get()
    sm = start_month_var.get()
    sd = start_day_var.get()
    ey = end_year_var.get()
    em = end_month_var.get()
    ed = end_day_var.get()

    empty_fields = []
    if not sy:
        empty_fields.append("開始年")
    if not sm:
        empty_fields.append("開始月")
    if not sd:
        empty_fields.append("開始日")
    if not ey:
        empty_fields.append("終了年")
    if not em:
        empty_fields.append("終了月")
    if not ed:
        empty_fields.append("終了日")

    if empty_fields:
        messagebox.showerror("エラー", "\n".join(empty_fields) + "\nが空白です。")
        return
    try:
        sy = int(sy)
        sm = int(sm)
        sd = int(sd)
        ey = int(ey)
        em = int(em)
        ed = int(ed)

        progress_var.set(0)  # プログレスバーをリセット
        thread = Thread(target=lambda: fetch_data(sy, sm, sd, ey, em, ed, progress_var))
        thread.start()
    except ValueError:
        messagebox.showerror("エラー", "年、月、日には数値を入力してください。")
        

●スクレイピングでデータ収集
fetch_data関数は、指定された期間の株価上昇ランキングデータをウェブスクレイピングによって収集します。この関数はまず、指定された開始日と終了日の間の合計日数を計算します。その後、Selenium WebDriverを使用してウェブブラウザを操作し、特定のURLにアクセスして必要なデータを取得します。この過程で、取得したデータはPandas DataFrameに格納されます。

def fetch_data(start_year, start_month, start_day, end_year, end_month, end_day, progress_var):
    total_days = (datetime(end_year, end_month, end_day) - datetime(start_year, start_month, start_day)).days + 1
    processed_days = 0

    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
    wait = WebDriverWait(driver, 10)

    Increase_Master_Table01 = pd.DataFrame()

●forループで日付を走査
このコードブロックは、ユーザーが指定した開始日から終了日までの各日付を順番に走査するためのものです。まず、開始年から終了年までの各年(`y`)についてループを行います。その中で、指定された開始月と終了月を考慮しながら、各月(`m`)に対するループを実行します。さらに、各月において指定された開始日と終了日を基に、日(`d`)に対するループを行います。このプロセスを通じて、指定された期間内のすべての日付をシステマティックに走査し、それぞれの日に対してスクレイピング処理を適用する準備をします。

    for y in range(start_year, end_year + 1):
        month_start = start_month if y == start_year else 1
        month_end = end_month if y == end_year else 12

        for m in range(month_start, month_end + 1):
            day_start = start_day if m == start_month and y == start_year else 1
            day_end = end_day if m == end_month and y == end_year else 31

            for d in range(day_start, day_end + 1):
                current_date = datetime(y, m, d)
                if current_date > datetime(end_year, end_month, end_day):
                    break

●Webスクレイピングで日毎のデータ取得
このコードブロックは、スクレイピングを実行するためにURLを構築し、指定された日付のウェブページからデータを取得する処理を行っています。まず、年(y)、月(m)、日(d)の値を使用して特定のURLを形成します。このURLは、株ドラゴンの特定の日付に関する株価上昇ランキングページを指します。次に、Selenium WebDriverを使用してこのURLにアクセスし、ページが読み込まれるのを待ちます。ページが正常に読み込まれた場合、Pandasのread_htmlメソッドを使用してページ内の表データを取得します。処理が成功し、期待される列が存在する場合、進捗状況が更新されます。

               try:
                    d_str = str(d).rjust(2, "0")
                    url = f"https://www.kabudragon.com/ranking/{y}/{str(m).rjust(2, '0')}/{d_str}/age.html"
                    driver.get(url)
                    wait.until(EC.presence_of_element_located((By.TAG_NAME, "html")))

                    if driver.page_source:
                        data = pd.read_html(driver.page_source)

                        if len(data) > 1 and all(col in data[1].columns for col in [0, 1, 2, 3, 4]):
                            processed_days += 1
                            progress = int((processed_days / total_days) * 100)
                            progress_var.set(progress)
                            continue

●取得データの抽出と日付付加
このコードブロックでは、スクレイピングによって取得したデータの中から必要な情報を抽出し、さらにそれを統合しています。具体的には、スクレイピングしたページから得られた表データ(data)の中の特定の部分を選択しています。まず、data[1].iloc[4:]により、二番目の表から5行目以降のデータを取得します。これにより、ページ上部の不要な情報を除外します。次に、.iloc[:50]を用いて上位50位までのデータのみを抽出します。これにより、ランキングの上位50位の銘柄に焦点を絞ります。その後、Increase_Table01['日付'] = current_dateで各行に現在の日付を追加し、最終的にpd.concatを使用してこれらのデータを一つの大きなDataFrame(Increase_Master_Table01)に統合しています。

                        Increase_Table01 = data[1].iloc[4:].copy()
                        Increase_Table01 = Increase_Table01.iloc[:50]
                        Increase_Table01['日付'] = current_date
                        Increase_Master_Table01 = pd.concat([Increase_Master_Table01, Increase_Table01], ignore_index=True)

●進捗バーの更新で進行状況表示
このコードブロックは、スクレイピング処理の進捗状況をユーザーに視覚的に表示するために使用されます。processed_days += 1は、処理された日数を追跡するために、現在のカウンターに1を加算します。このカウンターは、スクレイピングが完了した各日付についてインクリメントされます。次に、progress = int((processed_days / total_days) * 100)を用いて、全体の処理日数に対する進捗の割合を計算し、これを整数に変換します。最後に、progress_var.set(progress)により、この進捗割合をプログレスバーに反映させます。このプログレスバーは、GUI上に表示され、ユーザーが現在の処理進行状況を容易に把握できるようにします。

               processed_days += 1
                progress = int((processed_days / total_days) * 100)
                progress_var.set(progress)

●最終データの保存と完了通知

   # 現在の日付と時刻を取得
    now = datetime.now().strftime("%Y%m%d_%H%M")
    # ファイル名を生成
    output_file = f"{now}_株ドラゴン値上がりランキング_{start_year}{str(start_month).zfill(2)}{str(start_day).zfill(2)}-{end_year}{str(end_month).zfill(2)}{str(end_day).zfill(2)}.csv"
    Increase_Master_Table01.to_csv(output_file, index=False, encoding='utf-8_sig')
    messagebox.showinfo("完了", "データの取得が完了しました。")

GUIの実装部分

# GUIアプリケーションのメインウィンドウを作成
root = tk.Tk()
# ウィンドウのタイトルを設定
root.title("Webスクレイピングアプリ")

# メインフレームを作成し、アプリケーションウィンドウに配置
main_frame = ttk.Frame(root)
# メインフレームのサイズと位置を調整
main_frame.pack(padx=10, pady=10, fill='x', expand=True)

# 開始日時を入力するためのセクションを作成
# Start Date Section
start_date_label = ttk.Label(main_frame, text="開始期間")
# ラベルをメインフレームに配置
start_date_label.pack()

# 開始年を入力するためのフレームを作成
start_year_frame = ttk.Frame(main_frame)
# フレームのサイズを調整し、左寄せで配置
start_year_frame.pack(fill='x', expand=True)
# 開始年のラベルを作成し、フレームに配置
ttk.Label(start_year_frame, text="年").pack(side=tk.LEFT)
# 開始年を入力するための変数を定義
start_year_var = tk.StringVar()
# 開始年を入力するテキストボックスを作成し、フレームに配置
ttk.Entry(start_year_frame, textvariable=start_year_var).pack(fill='x', expand=True, side=tk.LEFT)

# 開始月を入力するためのフレームを作成
start_month_frame = ttk.Frame(main_frame)
# フレームのサイズを調整し、左寄せで配置
start_month_frame.pack(fill='x', expand=True)
# 開始月のラベルを作成し、フレームに配置
ttk.Label(start_month_frame, text="月").pack(side=tk.LEFT)
# 開始月を入力するための変数を定義
start_month_var = tk.StringVar()
# 開始月を入力するテキストボックスを作成し、フレームに配置
ttk.Entry(start_month_frame, textvariable=start_month_var).pack(fill='x', expand=True, side=tk.LEFT)

# 開始日を入力するためのフレームを作成
start_day_frame = ttk.Frame(main_frame)
# フレームのサイズを調整し、左寄せで配置
start_day_frame.pack(fill='x', expand=True)
# 開始日のラベルを作成し、フレームに配置
ttk.Label(start_day_frame, text="日").pack(side=tk.LEFT)
# 開始日を入力するための変数を定義
start_day_var = tk.StringVar()
# 開始日を入力するテキストボックスを作成し、フレームに配置
ttk.Entry(start_day_frame, textvariable=start_day_var).pack(fill='x', expand=True, side=tk.LEFT)

# "終了期間"というラベルの設定
end_date_label = ttk.Label(main_frame, text="終了期間")
end_date_label.pack()

# 終了年の入力フィールドを設定
end_year_frame = ttk.Frame(main_frame)
end_year_frame.pack(fill='x', expand=True)
ttk.Label(end_year_frame, text="年").pack(side=tk.LEFT)
end_year_var = tk.StringVar()
ttk.Entry(end_year_frame, textvariable=end_year_var).pack(fill='x', expand=True, side=tk.LEFT)

# 終了月の入力フィールドを設定
end_month_frame = ttk.Frame(main_frame)
end_month_frame.pack(fill='x', expand=True)
ttk.Label(end_month_frame, text="月").pack(side=tk.LEFT)
end_month_var = tk.StringVar()
ttk.Entry(end_month_frame, textvariable=end_month_var).pack(fill='x', expand=True, side=tk.LEFT)

# 終了日の入力フィールドを設定
end_day_frame = ttk.Frame(main_frame)
end_day_frame.pack(fill='x', expand=True)
ttk.Label(end_day_frame, text="日").pack(side=tk.LEFT)
end_day_var = tk.StringVar()
ttk.Entry(end_day_frame, textvariable=end_day_var).pack(fill='x', expand=True, side=tk.LEFT)

# プログレスバーの追加
progress_var = tk.IntVar()
progress_bar = ttk.Progressbar(main_frame, maximum=100, length=200, variable=progress_var)
progress_bar.pack(pady=10)

# Execute Button
execute_button = ttk.Button(main_frame, text="実行", command=on_click)
execute_button.pack(pady=5)

root.mainloop()

5.まとめ

自分は値上がりランキングから次に上がる銘柄の選定や、全体相場の感覚を把握できませんでしたので、皆様がこれを使って何かを見いだせてくださると幸いです。

この記事が気に入ったらサポートをしてみませんか?