割烹エディター 小説家になろうの活動報告収集ツールを作ってみる

こんにちは。次の試みとして小説家になろうの活動報告のアーカイブとか簡易検索を作ってみたいと思います。

活動報告は新着100件はあるのですが、すぐにながれてなかなか使い勝手が悪いです。検索もできなくて不便なので使い勝手の良いものを作ってみようと思います。

データ取得は活動報告Atomを使います。


とりあえず仕様

  • 7日分の活動報告を見られる(各日毎)

  • 更新は30分ごとにstaticの場所へ保存する

  • 曜日ごとに文字を割り当て、URLで判断する。

  • 活動報告ファイルはJSONファイル。中身は日付と各データ

  • 統計情報は一日ごとに保存

  • 活動報告の投稿時間を見られる 各日付

  • 活動報告の日毎の投稿数をみられる 

  • その日のデータの中から検索(ユーザー名、タイトル、ユーザーID)ができる

こんな感じに実装しようと思います。

割烹保存パラメータ

活動報告atomnには<summary><title><comment><published><updated>がありますがそれのうち<summary>以外を取ろうと思います。<summary>はデータ数多くなるし、基本的にタイトルとユーザーで検索できればいいので。

というので取るのは以下

  • タイトル

  • 名前

  • userID

  • blogkey

  • 公開日

  • 更新時間

<title type="html">
<![CDATA[ 「JKアイドルと新人オッサン作家」更新しました! ]]>
</title>
<summary type="html">
<![CDATA[ 61話を投稿しました。 お時間のある時にでも読んでやってください。 では、次回投稿で。 ]]>
</summary>
<published>2022-03-20T15:42:36+09:00</published>
<updated>2022-03-20T15:42:36+09:00</updated>
<link rel="alternate" type="text/html" href="https://mypage.syosetu.com/mypageblog/view/userid/706039/blogkey/2959337/"/>
<id>https://mypage.syosetu.com/mypageblog/view/userid/706039/blogkey/2959337/</id>
<author>
<name>コップの空</name>
<uri>https://mypage.syosetu.com/706039/</uri>
</author>
</entry>

記事取得と加工

記事の取得はfeedparserを使います。

参考

feedparserでパース、それをJSON形式になおして保存。それを読み込んでリスト化してます。

データベースに1個ずつ保存するのも考えたのですが、データ数が膨大になってパフォーマンス落ちそう。また古いものを残しても削除とか非公開があるので整合性がとれなくなりなりそうなのでやめました。静的ファイルで新しい奴だけ残せばいいかなと。

書いたコード

import feedparser
import json
from datetime import datetime, timedelta, timezone
from time import mktime
import os


def main():
    file_prifix = 'kappo_atom_'
    folder_path = ''
    url = "https://api.syosetu.com/blogatom.Atom"

    f = feedparser.parse(url)

    if f['bozo'] == 0:   # エラー時はbozo=1  
        temp_blogs = parse_blogs(f['entries']) # id, title, author, user_id, published, date, updated

        # print(blogs[0]) 最新
        date1 = temp_blogs[0]['date']
        blog1_data = list(filter(lambda item: item['date'] == date1, temp_blogs))

        data_update(blog1_data, temp_blogs[0], file_prifix, folder_path)


        # print(blogs[-1]) 最後
        date2 = temp_blogs[-1]['date']
        blog2_data = list(filter(lambda item: item['date'] == date2, temp_blogs))

        if temp_blogs[0]['date'] != temp_blogs[-1]['date']: # 日をまたぐ場合はもう一個追加する
            data_update(blog2_data, temp_blogs[-1], file_prifix, folder_path)
            


def data_update(temp_blogs, blog, file_prifix, folder_path):
    data = {}
    # パースファイルから該当する曜日データを取ってくる。
    yobi = datetime.strptime(blog['published'], '%Y/%m/%d %H:%M:%S').strftime('%a')
    # 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'

    file_name = file_prifix + yobi + '.json'

    flg_update = True

    if os.path.exists(folder_path + file_name):

        temp_data = get_blogs_data(folder_path + file_name)
        # 保存されたデータと日が違う場合は上書き。そうでない場合は保存されていない追加して保存する

        if blog['date'] == temp_data['date']:
            #dateが同じな場合は追加する。IDで判定
            max_id = int(max(temp_data['blogs'], key=lambda x: int(x['id']))['id'])

            # filter
            blog_data = list(filter(lambda x: int(x['id']) > max_id, temp_blogs))

            if blog_data:
            #配列の連結
                data['blogs'] = blog_data + temp_data['blogs']
            else:
                flg_update = False

        else:
            data['blogs']=temp_blogs

    data['date'] = blog['date']

    # update日を更新    
    data['update'] =  datetime.now().strftime('%Y/%m/%d %H:%M:%S')

    if flg_update: # 更新分あれば
        a = open(folder_path + file_name, 'w', encoding='UTF-8')
        a.write( json.dumps(data, ensure_ascii=False))
        a.close()


def parse_blogs(raw_data):
    blogs = [] 
    # パースする
    for article in raw_data:
        blog = {}
        blog['id'] = article['link'].split('/')[8]
        blog['title'] = article['title']
        blog['author'] = article['author']
        blog['user_id'] = article['href'].split('/')[3]

        t = datetime.fromtimestamp(mktime(article['published_parsed'])) + timedelta(hours=9)
        blog['published'] = t.strftime('%Y/%m/%d %H:%M:%S')
        blog['date'] = t.strftime('%Y%m%d')

        t = datetime.fromtimestamp(mktime(article['updated_parsed'])) + timedelta(hours=9)
        blog['updated'] = t.strftime('%Y/%m/%d %H:%M:%S')
        
        blogs.append(blog)

    return blogs

def get_blogs_data(path):
    f = open(path, 'r', encoding="utf-8a")
    data_text = f.read()
    f.close()
    temp_data = json.loads(data_text)    

    return temp_data

if __name__ == "__main__":
    main()


で、このファイルを定期的に実行すればデータが取れます。今は30分ごとに取得してます。

日毎の投稿数は見た感じ1日あたり400-600件、1時間当たりでの最大は53なので30分ごとで問題ないと思います。

ということで検索機能も付けるとこんな感じ。


これで活動報告の巡回もやりやすくなりそうです。

良ければサポートお願いします。サポート費用はサーバー維持などの開発費に使わせていただきます。