見出し画像

DP.16:紐づけたオブジェクトに値を一斉伝播させる - Observerパターン -【Python】

【1】Observerパターン

Observerパターンでは

「値・状態変化」などで「影響を受ける複数のオブジェクト(Subscriber)」の更新は「Publisherオブジェクト(※)」を経由させる。
それにより「一斉通知(notify)」や「一斉更新(update)」を実現する。

※「Publisher」は「subject」や「observable」とよばれることもある

というもの。

■イメージ図
Observer パターン」は
 ・1つのPublisher
 ・1つ以上のSubscriber (Observer)
で構成される。

画像1

▲「Publisher役のオブジェクト」が「値・状態変化」を受け取っておき、「事前に紐づけておいたオブジェクト(Subscriber、Observer)」達に「通知(notity)」したり、「更新(update)」を起動させたりする。

もう少し具体的な動作イメージで言うと、

例えば、「DB上の同一のデータ」を参照して、
 ①グラフやチャートを表示するView
 ②表計算形式で画面上に表示するView
を持つ「MVCモデルのアプリ」で、
(Modelを経由して)DB上のデータを更新したら①、②どちらのViewも一斉に更新をかける。

みたいな感じ。
※この場合、Model:Publisher相当、Views:Subscriber相当という構造になる。

こうすることでPublisher側はSubscriber側の具体的な中身どうなっているかは知らなくて済む
→「Soc(Separation of concerns)/関心の分離」相当。

【2】実際の利用例

Observerパターンは様々なところで利用されている。

【利用例1】SNSの更新通知
Facebook、twitterなどのSNSにおいて、「フォローしているお気に入りのユーザ」が何らかの更新をした時に、自分宛に通知が届く。

---------------------

【利用例2】Kivy:pythonのNUI(GUI)アプリ用ライブラリ
Kivy」はpythonで「NUI(GUI)アプリ」を開発する際に利用できるライブラリ。

「Kivy」には「Properties」という仕組み(※)を存在しており、これがObserverパターンになっている。
(※あらかじめ関数を結び付けておくと、値(属性)が変化した時にその関数を呼び出してくれる仕組み)

Kivyの他、様々な「イベントドリブンなシステム」でObserverパターンは使われている。(1つのイベントがPublisher相当、それに反応する複数のリスナーがObserver相当になる構造)

----------------------------

【利用例3】RabbitMQ(メッセージ・キューイングに使うソフトウェア)

「RabbitMQ」はメッセージキューイング処理を行うことができるオープンソースソフトウェア。
(「message broker:メッセージブローカー」、「queue manager:キューマネジャー」ともよばれる)

pythonでは「pika」というクライアントライブラリで「RabbitMQ」を操作することができる。これを使うことで比較的簡単に「publish - subscribeパターン」を作ることができる。

RabbitMQとpikaに関してはこっち。

【3】例1:10進数(dec)ー2進数(bin)ー16進数(hex)変換表示プログラム

例題として

『Publisherオブジェクトに値を設定して、通知(notify)すると10進数、2進数、16進数の形式でコマンドプロンプトに表示するプログラム』

を作成する。

■動作イメージ

画像2

# 出力イメージ
----- set value 250 ------
DecObserver: has now decimal data = 250
HexObserver: has now hex data = 0xfa
BinObserver: has now bin data = 0b11111010

■Publisherオブジェクトの作成
今回の「一斉通知(notify_all)の部分」については、紐づける予定のObserver側が「notifyメソッド」を持っている前提としている

class Publisher:
   def __init__(self):
       self.observers = [] # Publisherに紐づけるObserver達を格納
       self.data = 0 # publisherが受け付けて伝播する値


   # observerの登録
   def register(self, observer):
       if observer not in self.observers:
           self.observers.append(observer)
       else:
           print(f'Failed to add: {observer}')


   # observerの紐づけ削除
   def unresister(self, observer):
       try:
           self.observers.remove(observer)
       except ValueError:
           print(f'Failed to remove: {observer}')


   # 紐づけているobserverに一斉通知(notify)
   def notify_all(self):
       # notifyの引数をself、つまりPublisherオブジェクト自身にしている
       [o.notify(self) for o in self.observers]

■Observer(Subscriber)の作成
実装する「notifyメソッドの引数」として「Publisherオブジェクト」を渡されていることに注意する。

#### Observer(Subscriber)側となるオブジェクト
class DecObserver:
   def notify(self, publisher):
       value = int(publisher.data)
       print(f"{type(self).__name__}:  has now decimal data = {value}") 


class HexObserver:
   def notify(self, publisher):
       value = hex(publisher.data)
       print(f"{type(self).__name__}:  has now hex data = {value}") 


class BinObserver:
   def notify(self, publisher):
       value= bin(publisher.data)
       print(f"{type(self).__name__}:  has now bin data = {value}") 

10進数、2進数、16進数の変換に関してはそれぞれ「int()」「bin()」「hex()」を使用した。

■使い方

# Observer オブジェクト
dec_data = DecObserver()
hex_data = HexObserver()
bin_data = BinObserver()

# publish オブジェクト
my_publisher = Publisher()
my_publisher.register(dec_data)
my_publisher.register(hex_data)
my_publisher.register(bin_data)

my_publisher.data = 250
my_publisher.notify_all()

【4】全体コード

浮動小数点などエラーとなる値が入った場合の処理は略。

class Publisher:
   def __init__(self):
       self.observers = [] # Publisherに紐づけるObserver達を格納
       self.data = 0


   # observerの登録
   def register(self, observer):
       if observer not in self.observers:
           self.observers.append(observer)
       else:
           print(f'Failed to add: {observer}')


   # observerの紐づけ削除
   def unresister(self, observer):
       try:
           self.observers.remove(observer)
       except ValueError:
           print(f'Failed to remove: {observer}')


   # 紐づけているobserverに一斉通知(notify)
   def notify_all(self):
       [o.notify(self) for o in self.observers]

   


#### Observer(Subscriber)側となるオブジェクト
class DecObserver:
   def notify(self, publisher):
       value = int(publisher.data)
       print(f"{type(self).__name__}:  has now decimal data = {value}") 


class HexObserver:
   def notify(self, publisher):
       value = hex(publisher.data)
       print(f"{type(self).__name__}:  has now hex data = {value}") 


class BinObserver:
   def notify(self, publisher):
       value= bin(publisher.data)
       print(f"{type(self).__name__}:  has now bin data = {value}") 


######### 動作確認
def main():

   # Observer オブジェクト
   dec_data = DecObserver()
   hex_data = HexObserver()
   bin_data = BinObserver()

   # publish オブジェクト
   my_publisher = Publisher()
   my_publisher.register(dec_data)
   my_publisher.register(hex_data)
   my_publisher.register(bin_data)

   my_publisher.notify_all()


   print("----- set value 250 ------")
   my_publisher.data = 250
   my_publisher.notify_all()


   print("----- unresister BinObserver,set x0db ------")
   my_publisher.unresister(bin_data) # bin_dataオブジェクトの登録を削除
   my_publisher.data = 0xdb  # 16進数の値をいれてみた場合
   my_publisher.notify_all()

if __name__ == "__main__":
   main()

# 実行結果例
DecObserver: has now decimal data = 0
HexObserver: has now hex data = 0x0
BinObserver: has now bin data = 0b0
----- set value 250 ------
DecObserver: has now decimal data = 250
HexObserver: has now hex data = 0xfa
BinObserver: has now bin data = 0b11111010
----- unresister BinObserver,set x0db ------
DecObserver: has now decimal data = 219
HexObserver: has now hex data = 0xdb

【5】例2:ゲーム内の称号(トロフィー)システム風プログラム

別例として、『ゲームのやりこみ具合によって称号(トロフィー)を取得するような感じのプログラム』を作ってみる。

ざっくりいうと

・ユーザは何らかの「おつかいタスク」をする
・1つの「タスク」が終わるとユーザの
  - 経験値(作業回数)を+1
  - 対象作業のやりこみスコアを+2(スコア6以上で称号取得)
  - おこづかいを+5
する。
・最後に全体の結果を出力する

というプログラム。「おつかいタスク」がPublisher役となり、紐づくオブジェクトに結果を伝播していくようにする

■イメージ図

画像3

【6】全体コード

from abc import ABCMeta, abstractmethod


# updateメソッドの実装漏れ防止にabcを導入
class IObserver(metaclass = ABCMeta):
   @abstractmethod
   def update(self, observed):
       pass


#### Observerオブジェクト
class Wallet(IObserver):
   def __init__(self):
       self.amount = 0

   def increase_balance(self, amount):
       self.amount += amount
   
   # 今回は未使用
   #def decrease_balance(self, amount):
   #    self.amount -= amount
   
   def update(self, observed):
       self.increase_balance(5) # お小遣いを+5


class Badge(IObserver):
   def __init__(self, name, type, description):
       self.points = 0
       self.name = name
       self._type = type
       self.description = description
       self.awarded = False

   def add_points(self, amount):
       self.points += amount
       if self.points > 5:
           self.awarded = True
   
   def update(self, observed):
       if observed._type == self._type:
           self.add_points(2) # 対象の作業のやりこみスコアを+2
   

class User(IObserver):
   def __init__(self, wallet:Wallet):
       self.wallet = wallet # おこづかい
       self.badges = [] # 称号
       self.experience = 0 # 経験値(作業回数)
   
   def add_experience(self, amount):
       self.experience += amount
   
   def update(self, observed):
       self.add_experience(1)



########  Publisherオブジェクト役  ########
class Task:
   def __init__(self, user, type):
       self.observers = set() # observerをsetオブジェクトで積んでいく
       self.user = user
       self._type = type
   
   def register(self, observer):
       self.observers.add(observer)
   
   def unregister(self, observer):
       self.observers.discard(observer)
   
   def unregister_all(self):
       self.observers = set()

   def update_all(self):
       for observer in self.observers:
           observer.update(self)



###### 以下動作確認 ######

def main():
   
   # observerオブジェクトをそれぞれ生成
   wallet = Wallet()

   user = User(wallet) # ユーザにWalletオブジェクト紐づけ
   
   # 適当にバッジデータ作成
   badges = [
       Badge("ムシキング", 1, "虫を3回集める"),
       Badge("野草マニア", 2, "野草を3回集める"),
       Badge("釣りバカ", 3, "釣りを3回する"),
   ]
   user.badges.extend(badges) # ユーザにbadgeオブジェクト紐づけ


   # お使いタスク(Publisherオブジェクト)生成 今回は4つ
   tasks = [Task(user, 1), Task(user, 1), Task(user, 3),Task(user, 1),]

   # 各タスク(Publisher)にobserver登録
   for task in tasks:
       task.register(wallet)
       task.register(user)
       for badge in badges:
           task.register(badge)
   
   print("--- start task ---")

   for task in tasks:
       print(task)
       task.update_all() # 1つのタスクが終わったら都度一斉更新
   
   print("--- finished task ---")
   

   print(f"現在のお小遣い:{user.wallet.amount}")
   print(f"作業経験回数:{user.experience}")
   #print(f"{user.wallet.amount}, {user.experience}, {[(badge.points, badge.name) for badge in user.badges]}")

   print("内訳")
   for badge in user.badges:
       print(f" 称号名:{badge.name}, スコア:{badge.points}")
       
   
   print("取得済み称号")
   for badge in user.badges:
       if badge.awarded:
           print(f"{badge.name}{badge.description}")


if __name__ == "__main__":
   main()

# 実行結果例
--- start task ---
<__main__.Task object at 0x000002DE2C45CF70>
<__main__.Task object at 0x000002DE2C45CF10>
<__main__.Task object at 0x000002DE2C45CEB0>
<__main__.Task object at 0x000002DE2C45CE50>
--- finished task ---
現在のお小遣い:20
作業経験回数:4
内訳
 称号名:ムシキング, スコア:6
 称号名:野草マニア, スコア:0
 称号名:釣りバカ, スコア:2
取得済み称号
ムシキング:虫を3回集める

なお今回observerはおこづかい等が二重登録されないように、「setオブジェクト」を使って積むようにした。


もっと応援したいなと思っていただけた場合、よろしければサポートをおねがいします。いただいたサポートは活動費に使わせていただきます。