「Amazon価格ツイート」アプリ作成④ Pythonでデータ取得
どうも、タケオです。
PythonでAmazon APIからデータ取得を行なってみたいと思います。
Pythonのインストール
まずはPythonが動くように公式の説明に従ってインストールしておきます。
複数のバージョンを利用する予定がなければPATHを追加するようにしましょう。
データの取得
ライブラリの利用
本当は「bottlenose」というAmazon用のライブラリがあるので使いたかったのですが、現在のAPIには対応していないとのことなのでPHPのサンプルコードをPythonに作り直してみたいと思います。
他にも「Product Advertising API(PA-API)」というライブラリもあるそうですが、多機能すぎるので今回は様子見ということで。
サンプルコード
とりあえずPythonに書き換えたサンプルを貼り付けます。動作は保証できませんので、その点はご了承ください。
import datetime
import hashlib
import hmac
import json
import requests
SERVICE_NAME = "ProductAdvertisingAPI"
REGION = "us-west-2"
PARTNER_TAG = "パートナータグ"
ACCESS_KEY = "アクセスキー"
SECRET_KEY = "シークレットキー"
HOST = "webservices.amazon.co.jp"
URI_PATH = "/paapi5/getitems"
OPERATION = "GetItems"
RESULT = "ItemsResult"
ITEM_ID = "B08PJD4WWX" # search item
def getMessage(): # 受信結果から出力内容を形成
responses = getResponse()
message=''
if (RESULT in responses):
for item in responses["ItemsResult"]["Items"]:
message += "商品名 : " + item["ItemInfo"]["Title"]["DisplayValue"] + "\r\n"
message += "メーカー: " + item["ItemInfo"]["ByLineInfo"]["Brand"]["DisplayValue"] + "\r\n"
else:
message += "データの取得に失敗しました。"
return message
def getResponse(): # Amazon API からデータを取得
payload = "{"
payload += " \"ItemIds\": ["
payload += " \"" + ITEM_ID + "\""
payload += " ],"
payload += " \"Resources\": ["
payload += " \"ItemInfo.ByLineInfo\","
#payload += " \"ItemInfo.Features\","
payload += " \"ItemInfo.ManufactureInfo\","
payload += " \"ItemInfo.ProductInfo\","
payload += " \"ItemInfo.Title\","
payload += " \"ParentASIN\""
payload += " ],"
payload += " \"Merchant\": \"Amazon\","
payload += " \"PartnerTag\": \"" + PARTNER_TAG + "\","
payload += " \"PartnerType\": \"Associates\","
payload += " \"Marketplace\": \"www.amazon.co.jp\","
payload += " \"Operation\": \"" + OPERATION + "\""
payload += "}"
awsv4 = AwsV4(ACCESS_KEY, SECRET_KEY)
awsv4.setRegionName(REGION)
awsv4.setServiceName(SERVICE_NAME)
awsv4.setPath(URI_PATH)
awsv4.setPayload (payload)
awsv4.setRequestMethod ("POST")
awsv4.addHeader('content-encoding', 'amz-1.0')
awsv4.addHeader('content-type', 'application/json; charset=utf-8')
awsv4.addHeader('host', HOST)
awsv4.addHeader('x-amz-target', 'com.amazon.paapi5.v1.ProductAdvertisingAPIv1.' + OPERATION)
headers = awsv4.getHeaders()
try:
response = requests.post('https://' + HOST + URI_PATH, headers=headers, data=payload)
json_response = json.loads(response.text)
return json_response
except :
pass
return {"Result": "error"}
class AwsV4: # AWS API
def __init__(self):
self.accessKey = ""
self.secretKey = ""
self.path = ""
self.regionName = ""
self.serviceName = ""
self.httpMethodName = ""
self.queryParametes = {}
self.awsHeaders = {}
self.payload = ""
self.HMACAlgorithm = "AWS4-HMAC-SHA256"
self.aws4Request = "aws4_request"
self.strSignedHeader = ""
self.xAmzDate = ""
self.currentDate = ""
def __init__(self, accessKey, secretKey):
self.accessKey = accessKey
self.secretKey = secretKey
self.path = ""
self.regionName = ""
self.serviceName = ""
self.httpMethodName = ""
self.queryParametes = {}
self.awsHeaders = {}
self.payload = ""
self.HMACAlgorithm = "AWS4-HMAC-SHA256"
self.aws4Request = "aws4_request"
self.strSignedHeader = ""
self.xAmzDate = self.__getTimeStamp()
self.currentDate = self.__getDate()
def setPath(self, path):
self.path = path
def setServiceName(self, serviceName):
self.serviceName = serviceName;
def setRegionName(self, regionName):
self.regionName = regionName
def setPayload(self, payload):
self.payload = payload
def setRequestMethod(self, method):
self.httpMethodName = method
def addHeader(self, headerName, headerValue):
self.awsHeaders[headerName] = headerValue
def __prepareCanonicalRequest(self):
canonicalURL = ""
canonicalURL += self.httpMethodName + "\n"
canonicalURL += self.path + "\n" + "\n"
signedHeaders = ''
for key, value in self.awsHeaders.items():
signedHeaders += key + ";"
canonicalURL += key + ":" + value + "\n"
canonicalURL += "\n"
self.strSignedHeader = signedHeaders[0:len(signedHeaders)-1]
canonicalURL += self.strSignedHeader + "\n"
canonicalURL += self.__generateHex(self.payload)
return canonicalURL
def __prepareStringToSign(self, canonicalURL):
stringToSign = ''
stringToSign += self.HMACAlgorithm + "\n"
stringToSign += self.xAmzDate + "\n"
stringToSign += self.currentDate + "/" + self.regionName + "/" + self.serviceName + "/" + self.aws4Request + "\n"
stringToSign += self.__generateHex(canonicalURL)
return stringToSign
def __calculateSignature(self, stringToSign):
signatureKey = self.__getSignatureKey (self.secretKey, self.currentDate, self.regionName, self.serviceName)
signature = hmac.new(signatureKey, stringToSign.encode('utf-8'), hashlib.sha256).hexdigest()
return signature;
def getHeaders(self):
self.awsHeaders['x-amz-date'] = self.xAmzDate
self.awsHeaders = sorted(self.awsHeaders.items()) #ソート
self.awsHeaders = dict((x, y) for x, y in self.awsHeaders) #タプルを辞書に変換
# Step 1: CREATE A CANONICAL REQUEST
canonicalURL = self.__prepareCanonicalRequest()
# Step 2: CREATE THE STRING TO SIGN
stringToSign = self.__prepareStringToSign(canonicalURL)
# Step 3: CALCULATE THE SIGNATURE
signature = self.__calculateSignature(stringToSign)
# Step 4: CALCULATE AUTHORIZATION HEADER
if signature != "":
self.awsHeaders['Authorization'] = self.__buildAuthorizationString(signature)
return self.awsHeaders
def __buildAuthorizationString(self, strSignature):
return self.HMACAlgorithm + " " + "Credential=" + self.accessKey + "/" + self.__getDate() + "/" + self.regionName + "/" + self.serviceName + "/" + self.aws4Request + "," + "SignedHeaders=" + self.strSignedHeader + "," + "Signature=" + strSignature;
def __generateHex(self, data):
return hashlib.sha256(data.encode('utf-8')).hexdigest()
def __sign(self, key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def __getSignatureKey(self, key, date, regionName, serviceName):
strSecret = "AWS4" + key
kDate = self.__sign(strSecret.encode('utf-8'), date)
kRegion = self.__sign(kDate, regionName)
kService = self.__sign(kRegion, serviceName)
kSigning = self.__sign(kService, self.aws4Request)
return kSigning
def __getTimeStamp(self):
return datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
def __getDate(self):
return datetime.datetime.utcnow().strftime('%Y%m%d')
if __name__ == '__main__':
print(getMessage())
取得の事項
上記コードをファイルに保存する際にパートナータグ等を書き換え、検索したい商品の商品コード(ASIN)を「ITEM_ID」に設定してコマンドプロンプトから実行します。
>Python (ファイル名).py
うまく動けば商品名とメーカー名が表示されると思います。
文字化けした場合は
この時、文字化けして文字が読めない可能性があります。
その場合は「chcp 65001」をコマンドプロンプト上で実行し、表示する文字をUTF-8に変更してください。
それでもダメな場合はコマンドプロンプトのプロパティからフォントを「ラスターフォント」に変更してみてください。
取得は出来たが
なんとか商品情報の取得には成功したのですが、スクラッチパッドでは取得できるのにPythonからは取得できない情報が出てきてしまったので、そちらの原因追求も行う必要があります。
また上記サンプルは特定の商品を検索するだけのサンプルなので、検索条件を指定した場合などは設定内容が異なるのでそれにも対応する必要があります。
念の為
もしご自身でソースを書こうと思ってられる方がいれば苦労した点を共有しておきたいと思います。
まず最初の難関「Signature」です。これはAmazonの公式ページを見てなんとか処理を乗り切りました。
次にヘッダー情報です。getHeaders内で最初にソート処理が行われていますが、これがないとなぜか承認エラーになって返ってきます。
なので最初は上記のSignatureの処理ばかり直していたのですが、ソートの処理を修正したところ正しく動く様になりました。
そもそもPythonの辞書はソート機能が付いていないのでどのように処理の修正してよいか分からず大変苦労しました。
というわけで
ちょっと問題は残っていますが、次はTwitterに取得した商品情報をツイート出来ればなと思います。
いやー、お疲れ様でした。
気になっているゲーミングイヤホン
この記事が気に入ったらサポートをしてみませんか?