見出し画像

リファクタリング的な作業(3)

(Python学習初心者の試行錯誤・備忘録です)
前回

で、AIのCopilot先生からは、
・SQLでテーブル名や要素名を指定するのにプレースホルダは使えない
・「(pythonの)f-stringを使ってテーブル名をSQL文に直接埋め込」むことはできるが、「SQLインジェクションのリスクを高める可能性がある」
とアドバイスがありました。
 ならば無理せず、テーブル名は固定でいい、と昨日の段階では思っていました。が、実際に始めて見るとSQL文中に「 t_cards 」と直書きのテーブル名が何度も出てくることになって、どうも嫌だなと感じます。後でちょっと変えたくなったとき大変でしょう。(それを言っちゃうと要素名はどうするんだとも思いますが、例えば要素名はほぼ共通ながらテーブル名だけ変えるようなケースもあるかもしれないので・・)

table変数に信頼できる値のみが入ることを確認するか、適切なサニタイズ処理を行うことが重要です。安全なコードを書くためには、ユーザーからの入力を直接SQL文に組み込むことは避け、事前に定義された値や安全な方法でのみテーブル名を指定するようにしてください。

Copilotの回答から

これって、「ユーザーの入力がSQL文に直接組み込まれるような場合」がまずいのであって、「自分で定義したテーブル名が入るような場合は問題ないよね?」と思うに至り、あらためて試してみることにしました。

mvctest_model.py

import sqlite3

class Cards:
    #データベースのt_cardテーブル
    def __init__(self, mydb="mydb.sqlite3",table="t_cards") -> None:
        self.mydb = mydb
        self.table = table
        #テーブルがなければ作成する。
        self.createsql = f""" CREATE TABLE IF NOT EXISTS {self.table} (
                    id INTEGER PRIMARY KEY,
                    face TEXT NOT NULL,
                    back TEXT,
                    level INTEGER DEFAULT 0,
                    timestamp TEXT DEFAULT CURRENT_TIMESTAMP)
                    """
        self._create_table()

    def _create_table(self):
        with sqlite3.connect(self.mydb) as con:
            cur = con.cursor()
            cur.execute(self.createsql)
            con.commit()

改めてテスト

#テストコード            
def main():
    mycards1 = Cards()
    mycards2 = Cards("db2.sqlite3")
    mycards3 = Cards("db3.sqlite3","t_anothercards")
 
if __name__  == "__main__":
    main()

db2, db3のファイルファイル、t_anothercards テーブル、などなど自動生成されました。大丈夫そうです。( anothercards 単・複が変かな?(^_^;) )
 ではDB操作の機能を追加していきます。

データの取り込み

まず、DBは最初にデータを入れておかなければ話が始まりません。
・リスト形式でデータを追加する関数 loadfromlist(datalist:list)
・CSVファイルからデータを読み込んで追加する関数 loadfromcsv(file)
・テーブルをクリアする関数cleartable() も用意します。

(抜粋)
    def loadfromcsv(self, csvfilename) -> None:
        with sqlite3.connect(self.mydb) as con:
            cur = con.cursor()
            with open(csvfilename, newline='', encoding='utf-8') as csvfile:
                csvreader = csv.reader(csvfile, delimiter = ",")
                for row in csvreader:
                    con.execute(f"INSERT INTO {self.table}(face, back) VALUES(?,?)",
                                 (row[0],row[1]))
            con.commit()

    def loadfromlist(self, datalist:list) -> None:
        with sqlite3.connect(self.mydb) as con:
            cur = con.cursor()
            for row in datalist:
                con.execute(f"INSERT INTO {self.table}(face, back) VALUES(?,?)",
                            (row[0],row[1]))
            con.commit()

    def cleartable(self)-> None:
        with sqlite3.connect(self.mydb) as con:
            cur = con.cursor()
            cur.execute(f"DELETE FROM {self.table}")
            con.commit()

テストします。

#テストコード            
def main():
    mycards1 = Cards()
    mylist =[("opinion","意見"),("environment","環境"),("tax","税"),("degree","程度"),("response","回答"),]
    mycards1.loadfromlist(mylist)
    mycards2 = Cards("mydb2.sqlite3")
    mycards2.loadfromcsv("eng0430.csv")
 
if __name__  == "__main__":
    main()

リストでもCSVも読み込めていることが確認できました。デフォルトと違うDBファイル名(mydb2.sqlite3)を指定することもできています。

    mycards2 = Cards("mydb2.sqlite3")
    mycards2.cleartable()

テーブルの中身のクリアもできました。

SELECTの基本

データのセットが取り込めたところで、次はデータを抽出する関数ですが、ここでまたCopilotに聞いてみました。(複数の結果を取得する場合)「辞書型で結果を取れるか」と。そしたらこんな情報が。

import sqlite3

# データベースに接続
con = sqlite3.connect('mydb.sqlite3')
con.row_factory = sqlite3.Row  # 行を辞書のように扱うための設定

# カーソルを作成
cur = con.cursor()

# SELECT文を実行
cur.execute("SELECT * FROM my_table WHERE id = ?", (1,))

# 1行のデータを取得
row = cur.fetchone()

# rowがNoneでないことを確認(結果がある場合)
if row is not None:
    # 辞書型としてデータにアクセス
    data_dict = dict(row)
    print(data_dict)  # 例: {'id': 1, 'name': 'Alice', 'age': 30}

# データベース接続を閉じる
con.close()

・・・だそうです。いいことを聞きました。一行読んではいちいち
self.id, self.hanzi, self.pynyin, self.level, self.timestamp = temprow
みたいな面倒なことをやらなくていいわけですね。
Cardsクラスに新しいメンバ関数を追加

    def select(self):
        with sqlite3.connect(self.mydb) as con:
            con.row_factory = sqlite3.Row # 行を辞書のように扱うための設定
            cur = con.cursor()
            cur.execute(f"SELECT * FROM {self.table} LIMIT 1")
            row = cur.fetchone()
            #rowがNoneのときdict(row)はNoneではなく空の辞書型を返すので
            #Noneを返したければ次のようにする必要がある。
            if row is not None:
                self.topcard = dict(row)
            else: 
                self.topcard = None

テストします。

#テストコード            
def main():
    mycards = Cards()
    mycards.select()
    if mycards.topcard is not None:
        print(mycards.topcard)
if __name__  == "__main__":
    main()

結果

{'id': 1, 'face': 'opinion', 'back': '意見', 'level': 0, 'timestamp': '2024-05-06 08:50:18'}

ちゃんと取れています。この仕様はイイですね。要素名もそのまま取れる。すごく扱いが楽になります。
 なお、ここで作ったtopcard という辞書型のメンバ変数は、何かの条件でならび変えた際に一番上にきたカード、といったイメージです。

SELECT応用

 元のコード(リファクタリング的なことを始める前のコード)を見ていくと、SQLのSELECT文を使っているところは
SELECT COUNT(*)
・トータルのデータ数を求める。
・レベル、0,1,2,3のデータ数を求める
SELECT 要素 
・WHEREとORDER BYで優先順位を設定して最優先の要素を持ってくる
ですね。

"SELECT COUNT(*) FROM t_shengci"
"SELECT COUNT(*) FROM t_shengci WHERE level = 0"
"SELECT COUNT(*) FROM t_shengci WHERE level = 1"
"SELECT COUNT(*) FROM t_shengci WHERE level = 2"
"SELECT COUNT(*) FROM t_shengci WHERE level = 3"

"SELECT id, hanzi, pinyin, level, timestamp FROM t_shengci \
                 ORDER BY level ASC, timestamp ASC"

といった内容です。COUNTに関してはぱっと見、WHERE以外全部同じなのに、「くどい」なと思ってCopilotに改善策を尋ねたら、こんなコードをくれました。

    def select_where(self, condition, *params):
        with sqlite3.connect(self.mydb) as con:
            cur = con.cursor()
            query = f"SELECT * FROM {self.table} WHERE {condition}"
            cur.execute(query, *params)
            return cur.fetchall()

呼び出し方の例を聞くと

cards = Cards()
result = cards.select_where("level >= ?", (1,))
result = cards.select_where("face = ?", ("King",))

といった例を出してくれました。パラメータはタプル渡し。AIほんとすごいですね。Cardsクラスだから、トランプカードだと思われて "King"とか出てきたのでしょうね。単語カードなんですけどね。

 さて、自分の用途を考えるとfetchall より fetchone が良さそう。まず、カウントを求める関数から。

    def select_count_where(self,condition, *params):
        with sqlite3.connect(self.mydb) as con:
            #SELECT COUNT(*)なので返るのは1行、 LIMIT不要。
            #結果は必ずあるからNoneの場合分けも不要
            cur = con.cursor()
            query = f"SELECT COUNT(*) FROM {self.table} WHERE {condition}"
            cur.execute(query, *params) 
            return(cur.fetchone()[0])

データベースファイルが無い状態からのテスト

#テストコード            
def main():
    mylist =[("opinion","意見"),("environment","環境"),("tax","税"),("degree","程度"),("response","回答"),]
    mycard = Cards()
    mycard.loadfromlist(mylist)
    print(mycard.select_count_where("level = 0"))
    print(mycard.select_count_where("level = 1"))
    print(mycard.select_count_where("level = ?",(0,)))
    print(mycard.select_count_where("level = ?",(1,)))

if __name__  == "__main__":
    main()

結果は

5
0
5
0

直書きでもプレースホルダでもばっちりです。
いいですね、気分が乗ってきました。リファクタリングハイ(?(笑))

この記事が気に入ったらサポートをしてみませんか?