見出し画像

ADH APIを効率的に呼び出すために開発したHooksの紹介

こんにちは。電通デジタルでデータサイエンティストをしている長島です。
Advent Calendar 21日目となる本記事では、GoogleのAds Data Hub(以下ADH)を利用するためのRest APIを効率的に呼び出すために開発したPython Hooksをご紹介したいと思います。

ADHが何かというと、Googleが提供する、ユーザープライバシーに配慮したクラウドベースの分析基盤です。
詳しい内容は、公式サイトをご参照ください。

ADHにはWebUIとAPIが提供されていますが、弊社では、ADHのREST APIをhttprequest経由でPythonから効率的に呼び出せるよう、hook化して利用しています。
そのために必要なcredential取得方法も最後に記載しています。
公式のAPIリファレンスはこちら

初期化

init

import httplib2
import tempfile
import json
import oauth2client
from os import path
class AdhHook:
   def __init__(self,
                credential: str,
                customer_id: int,
                time_zone: str = 'Asia/Tokyo') -> None:
       """
       Args:
           credential: credential for ADH
           customer_id: adh client id
           time_zone: time zone
       """
       cred = oauth2client.client.Credentials.new_from_json(credential)
       self.http = cred.authorize(httplib2.Http())
       self.customer_id = customer_id
       self.time_zone = time_zone
       self.prefix = 'https://adsdatahub.googleapis.com/v1/'

パラメータ

初期化の際は、Credentialの読み込みcustomer_idタイムゾーンの設定を行っています。

・credentialはGCPのADH用Credentialです。取得方法は最後にご紹介します。
・customer_idはADHアカウントの顧客IDで、利用社毎に割り振られたものです。REST API利用時のURLに使われます。
・time_zoneはクエリ集計時に利用するタイムゾーンです。ADHでは集計対象の日付を指定するのですが、この際にどのタイムゾーンを基準とするかを設定します。

クエリ登録

store_query

ADHアカウントに新規クエリを登録します。

def store_query(self,
                   query_title: str,
                   query_text: str,
                   parameter_types: dict = None,
                   suffix: str = 'customers/{}/analysisQueries'):
       query = {
           'title': query_title,
           'queryText': query_text
       }
       if parameter_types:
           query['parameterTypes'] = self.format_param_value(parameter_types)

       url = path.join(self.prefix, suffix.format(self.customer_id))
       res = self.http.request(url, 'POST', json.dumps(query))
       return res

Web UIから登録する際と同様に、クエリタイトルとクエリ文字列、パラメータを設定します。
ただし、クエリのValidateは行われないので注意が必要です。
Validateは実行時に行われますが、APIからのリクエスト時にエラー検知はできません。
実行前にvalidateのAPIを実行するか、実行後にget jobなどのAPIを実行することで確認ができますが、基本的にはクエリ文字列の作成はWeb UIで行い、API利用時は作成したクエリの一部replace程度で留めることをお勧めします。

・query_title:クエリタイトル。同一顧客ID内で一意である必要があります。
・query_text:クエリ本体
・parameterTypes:パラメータの型を指定するdict。
   スカラー値の場合は{パラメータ名: 型名}
    配列型の場合は{パラメータ名: [型名]}のように書きます。

query_idの取得

戻り値に登録されたクエリIDが含まれるので、登録したものを実行する際は取得しておきましょう。

## 例
res = adh_hook.store_query(title, text, param)
name = json.loads(res[1])['name']
query_id = path.basename(name)

validate_query

クエリ文字列が文法的に問題ないか検証します。
利用できないテーブルを参照しようとしている場合もエラーが返ってきます。

def validate_query(self,
                  query_title: str,
                  query_text: str,
                  parameter_types: dict = None,
                suffix: str = 'customers/{}/analysisQueries:validate'):
   query = {
       'title': query_title,
       'queryText': query_text
   }
   if parameter_types:
       query['parameterTypes'] = self.format_param_value(parameter_types)

   url = path.join(self.prefix, suffix.format(self.customer_id))
   res = self.http.request(url, 'POST', json.dumps({'query': query}))
   return res

与えるパラメータはstore_queryと同じなので省略します。
request bodyがstore_queryの場合と違い、queryをキーとするdict形式になるので注意してください。
エラーの場合はres[1]にエラー文が入ってくるのでそれを参照しましょう。
正常の場合は空のjson形式として入ってくるので、

if json.loads(res[1]):
     raise Exception
else: 
   store_query()

のようにすると良さそうです。

delete_query

登録済みのクエリを削除します。

def delete_query(self, query_id: str,
                suffix: str = 'customers/{}/analysisQueries/{}'):
   url = path.join(self.prefix, suffix.format(self.customer_id, query_id))
   res = self.http.request(url, 'DELETE')
   return res

パラメータ

・query_id:クエリID。
クエリ名では削除できないので、クエリIDが分からなくなってしまった場合は、Web UIから確認するか、listのAPIで取得するようにします。

クエリ実行

run_stored_query

登録済みのクエリを実行します。
基本的にWeb UIから実行する際と同じパラメータを設定するイメージです。

   def run_stored_query(self,
                     query_id: str,
                     dest_table: str,
                     start_date: list,
                     end_date: list,
                     parameter_values: dict = None,
                     suffix: str = 'customers/{}/analysisQueries/{}:start'
                     ):
     query_spec = {
         'startDate': {
             'year': start_date[0],
             'month': start_date[1],
             'day': start_date[2]
         },
         'endDate': {
             'year': end_date[0],
             'month': end_date[1],
             'day': end_date[2]
         },
         'timeZone': self.time_zone
     }
     if parameter_values:
         query_spec['parameterValues'] = self.format_param_value(parameter_values)

     params = {
         'spec': query_spec,
         'destTable': dest_table
     }
     url = path.join(self.prefix, suffix.format(self.customer_id, query_id))
     res = self.http.request(url, 'POST', body=json.dumps(params))
     return res

実行するクエリはURLで指定し、実行条件をbodyで与える形式になっています。
クエリURLは、Web UIから見るURLとは異なるので注意が必要です。
REST API用のアドレスにcustomer_idquery_idを埋め込む形で記述されます。

実行条件は2つの要素を持つdictで与えらます。
一つはdestTableで、集計結果を書き出すBigQueryテーブルをstring型で指定します。
もう一つはspecとして、クエリ集計期間やタイムゾーン、実行パラメータを指定します。

パラメータ

・query_id:保存したクエリのID
ブラウザでクエリを開いた際、以下のような形式で記述されているので、query_id部をコピーして利用します。
https://adsdatahub.google.com/u/1/#/queries/analysis/{customer_id}-{query_id}
また、store_queryで登録した場合は、戻り値から得られます(store_queryの項参照)。
・dest_table:結果を書き出すBigQueryテーブル名。
{project_id}:{dataset_id}.{table_id}の形式で記述します。
・start_date, end_date:[year, month, day]の配列で与えます。
・parameter_values:{key1: value1, key2:[value2-1, value2-2]}のような形式のdictで与えます。(詳しくはformat_param_valueの項目参照)

戻り値

リクエストが正常に通った際、戻り値にはOperationが含まれます。
取得は以下のように行います。

operation = json.loads(res[1])

また、Operationの例を以下に示します。

{
   "name": "operations/{job_id}",
   "metadata": {"@type": "type.googleapis.com/google.ads.adsdatahub.v1.QueryMetadata",
   "queryResourceName": "customers/{customer_id}/analysisQueries/{query_id}",
   "queryTitle": "query_title",
   "customerId": "123",
   "adsDataCustomerId": "123",
   "matchDataCustomerId": "123",
   "parameterValues": {
       "campaign_ids": {
           "arrayValue": {
               "values": [
                   {"value": "1234"},
                   {"value": "5678"}
               ]
           }
       },
       "adgroup_ids": {
           "arrayValue": {
               "values": [
                   {"value": "12345"},
                   {"value": "67890"}
               ]
           }
       },
       "start_date": {"value": "2020-09-01"},
       "end_date": {"value": "2020-09-30"},
       "time_zone": {"value": "Asia/Tokyo"}},
       "startTime": "2020-10-01T00:00:00.000Z",
       "endTime": "2020-10-01T00:01:00.000Z",
       "destTable": "project_for_adh.dataset_for_adh.table_for_adh"},
       "done": true,
       "response": {
           "@type": "type.googleapis.com/google.ads.adsdatahub.v1.QueryResponse",
           "rowCount": "100",
           "outputArtifacts": {}
       }
}

大きく分けて、リクエスト時に渡した情報ジョブの情報で構成されます。
ジョブ情報は、以下のようなものがあります。

・name:ジョブIDです。その後の実行完了確認などに利用できるので、控えておくことをお勧めします。
・startTime, endTime:ジョブの開始・終了時間です。
・done:終了したかどうかがboolで入っています。
・response:終了している場合の結果概要です。response["rowCount"]で行数が確認できます。

job_id取得

job_id = json.loads(res[1]).get('name', '')

上記のようにして、job_idが無い場合にはエラーと判断することもできます。

jobのエラーについて

実行したjobがエラーだった場合、Operationsがerrorキーを含むようになります。
このときのvalueはboolではなく、エラー詳細を含むStatusになるので注意が必要です。
弊社では、Statusからエラーメッセージを取得し、Slackで通知する仕組みも取り入れています。

エラーの発生タイミングについて

クエリリクエストに対するエラーは、発生タイミングによって二種類に分けられます。

リクエスト送信時に発生するエラー

run系の戻り値で検知することができるエラーです。
主に以下のような種類があります。

・パラメータ不備
・クエリエラー(validateが通らないなど)

リクエスト処理後に発生するエラー

run系の戻り値で検知することができないエラーです。
リクエストを受け付けたが実行時/実行後に問題があったケースで、doneキーがtrueになったタイミングでOperatorを取得すると観測することができます。
主に以下のような種類があります。

・メモリエラー
・プライバシーチェック

リクエスト送信時だけでエラーなしと判断はできないので、後に説明するwait_jobget_jobsなどで確認する仕組みを入れると良いと思います。

run_transient_query

管理画面に登録されていないクエリを実行します。
store_queryrun_stored_queryに使われる引数をそのまま使えます。

def run_transient_query(self,
               query_title: str,
               query_text: str,
               dest_table: str,
               start_date: list,
               end_date: list,
               parameter_types: dict = None,
               parameter_values: dict = None,
               suffix: str = 'customers/{}/analysisQueries:startTransient'):
   query_spec = {
       'startDate': {
           'year': start_date[0],
           'month': start_date[1],
           'day': start_date[2]
       },
       'endDate': {
           'year': end_date[0],
           'month': end_date[1],
           'day': end_date[2]
       },
       'timeZone': self.time_zone
   }
   if parameter_values:
       query_spec['parameterValues'] = self.format_param_value(parameter_values)

   query = {
       'title': query_title,
       'queryText': query_text
   }
   if parameter_types:
       query_spec['parameterTypes'] = self.format_param_value(parameter_types)

   params = {
       'spec': query_spec,
       'destTable': dest_table,
       'query': query
   }
   url = path.join(self.prefix, suffix.format(self.customer_id))
   res = self.http.request(url, 'POST', body=json.dumps(params))
   return res

実行状況の取得

get_jobs

ADHで実行したjob一覧を取得します。
job履歴が50件を超える場合は、最新50件となります。

def get_jobs(self, suffix: str = 'operations'):
       url = path.join(self.prefix, suffix)
       res = self.http.request(url, 'GET')
       return json.loads(res[1])

戻り値

戻り値はdict形式で、operationsnextPageTokenのkeyを持ちます。

・operations:Operationの配列で、実行時に渡したパラメータや、現在の実行状況が含まれます。
・nextPageToken:取得した一覧の次の一覧を得るためのトークンです。

クエリ文字列

クエリ文字列を与えて戻ってくるジョブを制御することができます。
上記の実装を利用する場合はパラメータとして設定していないので、suffixに明示的に含める形で投げる必要があります。

・pageSize:戻ってくるジョブ数を変更する際に使います。デフォルトの50件から変えたい場合に利用します。
・pageToken:取得ジョブの開始位置を指定するトークンです。
一度得た一覧よりも古い時期の一覧を取得したい場合、nextPageTokenで戻ってきた値をここに設定してあげます。
・filter:ジョブのフィルタに利用するようですが、適切なフォーマットがわからず、使えていません。。

ちなみに、urlの最後に/{jobid}を追加すると、特定のjobについて調べることも可能です。この場合は戻り値がOperationのリストではなく単体になるので注意が必要です。

get_tables

利用可能なテーブル一覧を取得します。

def get_tables(self, suffix: str = 'customers/{}/tables'):
       url = path.join(self.prefix, suffix.format(self.customer_id))
       res = self.http.request(url, 'GET')
       return json.loads(res[1])

戻り値

戻り値はget_jobsと同様にdict形式となります。

・nextPageToken:get_jobsと同様です。
・tables:Tableのリストが戻ってきます。

Tableはdict形式で、以下のような要素を持ちます。

・name:customers/{customerId}/tables/adh.[テーブルID]
・tablePath:adh.[テーブル名]
・columns: カラム情報のリストです。各カラム情報は、カラム名と型を表すdictになっています。

affinityテーブルの場合、Tableは以下のようになっています。

{
   "name": "customers/{customerId}/tables/adh.affinity",
   "tablePath": "adh.affinity",
   "columns": [
       {"columnId": "affinity_id", "columnType": {"type": "INT64"}},
       {"columnId": "affinity_category", "columnType": {"type": "STRING"}},
       {"columnId": "affinity_name", "columnType": {"type": "STRING"}}
   ]
}

クエリ文字列

クエリ文字列を使って戻り値を制御することができます。

・pageSize, pageToken:get_jobsと同様です
・adsDataCustomerId:データ参照用の顧客IDを指定する。

get_temp_tables

ADHでは、BigQueryと同様に一時テーブルを利用することができます。
作成の際はクエリ内でCREATE TABLE temp_table AS (...)とすることで
一時テーブルは72時間保持され、別のクエリから呼び出すことができます。
get_temp_tablesでは、実行時点で存在する一時テーブル一覧を取得します。

def get_temp_tables(self, suffix: str = 'customers/{}/tempTables'):
       return self.get_tables(suffix=suffix)

戻り値

get_tablesと同様に、dict形式になります。

・nextPageToken:get_tablesと同様です。
・tables:TempTableのリストです。

TempTableはdict形式で、以下のような要素を持ちます。

・name: customers/{customerId}/tempTables/[テーブルID]
                 テーブルIDはADH内部で自動的に割り振られた文字列です。
・tablePath:tmp.[テーブル名]
                 テーブル名は、作成時に自分で指定したものです。
・adsDataCustomerId:数値
・ matchDataCustomerId:数値
                 利用するデータを紐づけるための顧客IDです。
・queryType:ANALYSIS | USER_LIST | QUERY_TYPE_UNSPECIFIED
                 queryTypeは参照できるクエリのタイプです。
・columns:カラム情報。get_tablesと同じです。
・createTime:yyyy-mm-dd
・operation:operation/{operationId}
                 tempTableを作成したジョブのIDです。

クエリ文字列

get_tablesと同様にクエリ文字列を使って戻り値を制御することができます。

・pageSize, pageToken:get_jobsと同様です
・adsDataCustomerId, matchDataCustomerId:データ参照用の顧客IDを指定する。
・queryType:利用できるクエリタイプでフィルタを行う。

wait_job

開始したジョブが終了するまで待ちます。
job情報を取得し、`done`が`true`になるまでループを繰り返します。
戻り値はOperationなので、実用の際はdoneがTrueかどうか確認すると良いでしょう。

def wait_job(self, job_id: str, timeout_minutes: int = 10):
    job_id = job_id if job_id.startswith('operations') else path.join('operations', job_id)

    for _ in range(timeout_minutes):
           time.sleep(60)
           res = self.http.request(url, 'POST', body=json.dumps(body))
           status = json.loads(res[1])
           if status.get('done'):
               if status.get('error'):
                   raise AirflowException(status['error']['message'])
               break
    return status

パラメータ

・job_id:ジョブID。原則はoperations/で始まる形ですが、関数内で確認して必要なら追記しています。
・timeout_minutes:最大待機時間。上述したように、最大待機時間を待たずに未終了で戻ってくる場合もあります。ADHジョブの実行時間は仕様上4時間とされていますが、実用で1時間を超えたことはありません。どちらかというと、割り当てメモリの限界を超えることの方が現実的にあり得ます。

APIについて

waitは公式APIでも用意されています。
ただし、リファレンスには以下のような記述があり、かならず終了まで待てる保証は無いようです。

Note that this method is on a best-effort basis. It may return the latest state before the specified timeout (including immediately), meaning even an immediate response is no guarantee that the operation is done.

実際に触ってみたところ、最大10秒程度しか待てなかったため、短期間かつ単純なクエリでないと利用は難しそうです。

フォーマッタ

クエリ作成の際にパラメータを設定したり、クエリ実行時にパラメータを渡す際のフォーマットが若干複雑なので関数化します。

format_param_type

@staticmethod
def format_param_type(types: dict):
    """
    :param types: dict of {name: type} or {name: [type]}
    :return: formatted dictionary
    """
res = dict()
for name, value in types.items():           
    if isinstance(value, list):
           res[name] = {'arrayType': {'type': str(value[0])}}
    else:
           res[name] = {'type': {'type': str(value)}}
    return res

引数として渡すのは{パラメータ名: 値}のようなdict形式で、配列形式の場合は要素数1の配列として、{パラメータ名: [値]}のように渡すことにします。

parameterTypes

{"パラメータ名": ParameterType}の形式で記述します。
ParameterTypeは最低限{"type": {FieldType}}の形式で表され、各FieldTypeは、スカラー値の場合は{"type": 型名string}、配列の場合は{"arrayType": {"type": 型名string}}となります。

string型のパラメータclientNameとint64配列型のパラメータcampaignIdsがある場合、以下のように記述します。

{
   "parameterTypes": {
       "clientName": {"type": {"type": "string"}},
       "campaignIds": {"arrayType": {"type": {"type": "int64"}}}
   }
}

format_param_value

@staticmethod
def format_param_value(values: dict):
       """
       :param values: dict of {name: value}
       :return: formatted dictionary
       """
    res = dict()
    for name, value in values.items():
           if isinstance(value, list):
               key = 'arrayValue'
               val = {'values': [{'value': str(v)} for v in value if v]}
               res[name] = {key: val}
           else:
               res[name] = {'value': str(value)}
    return res

引数として渡すのは{パラメータ名: 値}のようなdict形式で、配列形式もOKです。
ADHのパラメータでは最大1次元配列までなので、スカラー値か配列かで分岐しています。

スカラー値

スカラー値の場合は、をdict化し、{"value": "値"}にします。

配列

配列の場合は、辞書keyを"value"の代わりに"arrayValue"とし、さらにその辞書valueを"values"をkeyとするdictとします。
そのdictの辞書valueは各値の配列となりますが、一つ一つの値は{"value": "値"}のdict形式で渡されます。

string型のパラメータであるclientName"clientA"、int配列型のパラメータであるcampaignIds[0, 100, 200]としたいとき、利用イメージは以下のようになります。

## adh_hook: AdhHook instance
param_values = {"clientName": "clientA", "campaignIds": [0, 100, 200]}
res = adh_hook.format_param_value(param_values)
{
   "clientName": {"value": "clientA"},
   "campaignIds": {"arrayValue": {
       "values:": [
           {"value": "0"},
           {"value": "100"},
           {"value": "200"}        
       ]
   }
}

ADH credentialの取得

hookの実装とは直接は関係ありませんが、httpリクエストヘッダーとして利用するための、ADH credential取得の流れを説明します。

クライアントシークレットの取得

1. ADH連携済みのGCPプロジェクトで、「Ads Data Hub API」を有効化する
2. GCPのAPIとサービス>認証情報画面で認証情報を作成>OAuthクライアントIDを選択し、OAuth 2.0クライアント IDを作成する。アプリケーションの種類は「その他」を選択する
3. 認証情報画面から、いま作成したクライアントIDを選択し、クライアントシークレットをダウンロードする

Credentialの取得

ダウンロードしたクライアントシークレットファイルを利用してCredentialを取得します。
取得には以下のスクリプトを利用します。

import webbrowser
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage

flow = flow_from_clientsecrets(
   'client_secret.json',
   scope='https://www.googleapis.com/auth/adsdatahub',
   redirect_uri='urn:ietf:wg:oauth:2.0:oob')

auth_uri = flow.step1_get_authorize_url()
webbrowser.open(auth_uri)
code = input('Input your code > ')
credentials = flow.step2_exchange(code)
Storage('credentials.json').put(credentials)

Pythonが使えてインターネットに接続できる環境ならどこでも構いませんが、Cloud Shellを使うと環境依存を解消できるのでおすすめです。

1. Cloud Shellを起動し、任意のディレクトリにスクリプトとclient_secret.jsonを置く。
2. gcloud config set project [project-id]を実行し、ADHに紐づけられているプロジェクトを設定する。
3. 上記のスクリプトを実行する。
4. 表示されるリンクをクリックし認証画面で認証する
5. コードが表示されるので、Cloud Shellに貼り付ける
6. 同ディレクトリ内にcredentials.jsonファイルが生成される

まとめ

いかがでしたでしょうか。
今回ご紹介したAPIでは、Web UIからは実行できない一時クエリの実行がありました。
読み込みテーブル名やカラム名といった、クエリパラメータで対応できないレベルの細かい変更を、Pythonなどの言語を通して柔軟に加えることが一つの利点かと思います。
また、今回の実装には入れていませんが、クエリパラメータとして構造体を利用することができるのもAPIの利点となっています。