見出し画像

AWS Glue からDynamoDBに直接書き込む

はじめに

こんにちは。
株式会社POLのエンジニアの渡辺です。

7/17に、AWS Glue の PySpark ジョブから DynamoDB に直接書き込みできるようになったので、それについて、書かせていただきます。

当初は英語のドキュメントしかなかったけど、今は日本語のドキュメントもあるので、そこ読めば誰でもできるとおもうのですが、備忘録的な感じです笑

ノートブックの準備

今回は、SageMakerノートブック上で試すので、開発エンドポイントとそのエンドポイント上にアタッチするSageMaker ノートブックを準備します。

スクリーンショット 2020-09-13 17.48.49

スクリーンショット 2020-09-13 17.49.52

ノートブックを立ち上げて、PySparkのファイルを作成し、起動します。

スクリーンショット 2020-09-13 18.02.58

ソースコード

実際のソースコードに入っていきます。今回はRDSからデータを取得し、DataFrameに変換。その後、そのデータをDynamoDBにインサートします。実際の運用だとつかうこと少ないかもですが、データ加工して、DynamoDBに保存とかが簡単にできるようになります。

まず、RDSやDynamoにつなぐのに必要なファイルのインポートと初期化を行う。connection_nameの部分はGlueで接続を作成することができ、それを利用しています。

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.functions import *
from pyspark.sql.types import StringType

import datetime

# initialization
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
src_jdbc = glueContext.extract_jdbc_conf(connection_name='connection-name')

次に、RDSからデータを取得し、DataFrameにして返す関数をつくります。urlの部分のdatabase_nameの部分はご自身のデータベースの名前に置き換えて、お使いください。

def fetch_rds_table_dataframe(tablename):
   
   ## RDS から DynmicFrame の形式で取得し、DataFrame に変換
   df = glueContext.create_dynamic_frame_from_options(
       connection_type=src_jdbc['vendor'],
       connection_options={
           "url": src_jdbc['url'] + "/database_name",
           "user": src_jdbc['user'],
           "password": src_jdbc['password'],
           "dbtable": tablename
       }
   ).toDF()

   return df

続いて、Dynamoにinsertする関数をつくります。

def insert_dynamodb_table_dataframe(tablename, df):

   ## DataFrame から DynamicFrame に変換
   dyf = DynamicFrame.fromDF(df, glueContext, "nested")

   glueContext.write_dynamic_frame_from_options(
   frame=dyf,
   connection_type="dynamodb",
   connection_options={
       "dynamodb.output.tableName": tablename,
       "dynamodb.throughput.write.percent": "1.0"
   }
)

これらをノートブックで実行します。

スクリーンショット 2020-09-18 17.13.08

最後に、定義した関数をつなげて、実行します。dynamo_db_table_nameの部分はご自身のDynamoDBのテーブル名に置き換えてください。

## ITEM_DATAの取得
df = fetch_rds_table_dataframe("item_data").alias("item_data")
).select(
   col("item_data.id").cast("string").alias("ItemId"),
).limit(10)

## 取得したデータのインサート
insert_dynamodb_table_dataframe("dynamo_db_table_name", df)

DynamoDBの方をマネジメントコンソールから、データが入っていることが確認できました。

スクリーンショット 2020-09-13 18.25.08

おわりに

特にハマりポイントとかもないと思うのですが、なにか疑問があったり、POLに興味がある方はお気軽にお声がけください!

POLではエンジニアを募集しているので、興味がある方ぜひ!!


この記事が気に入ったらサポートをしてみませんか?