![見出し画像](https://assets.st-note.com/production/uploads/images/118585941/rectangle_large_type_2_06674af73f5ad088acc2e1ebc3ac110d.png?width=800)
【Wikimediaデータプロセシング】Kafkaを実際に適用する- Springboot3 第14回
はじめに
こんにちは、今日は前回のKafkaの勉強に続き、Wikimediaのデータ処理をするのに実際に適用してみます。Wikimediaには大量のデータがリアルタイムで素早く流通するので、Kafkaを活用することはとても良い勉強になると思います。
![](https://assets.st-note.com/img/1697012273417-Z6DiDdeiRI.png?width=800)
リアルタイム情報が素早く流通する。
![](https://assets.st-note.com/img/1696985469192-Uv2FsuVuKs.png?width=800)
実装過程
1.プロジェクト生成
![](https://assets.st-note.com/img/1696985888371-Nv8IUgDOdm.png?width=800)
KafkaとLombokの依存性を追加します。
このプロジェクトをマルチモジュールにするために必要な変更を行おします。
![](https://assets.st-note.com/img/1696986298233-bq3ypNgLqU.png?width=800)
自分のプロジェクトでマルチモジュールを作成するときは、
必ずパッケージングを親プロジェクトのpomとして提供します。
![](https://assets.st-note.com/img/1696986438361-Cuk71IfBYr.png?width=800)
![](https://assets.st-note.com/img/1696986887089-8rs89FYpkJ.png?width=800)
NameとArchetypeを設定して、
モジュール生成します。
![](https://assets.st-note.com/img/1696987016820-6T8Ckg3qDu.png?width=800)
![](https://assets.st-note.com/img/1696988098036-neUtAf64ig.png?width=800)
このモジュールをSpring Bootプロジェクトとして作ってみよう。
@SpringBootApplication
public class SpringBootProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootProducerApplication.class);
}
}
![](https://assets.st-note.com/img/1696988263906-7A1iu9HSyg.png?width=800)
![](https://assets.st-note.com/img/1696988315419-KuBZwaUQ1P.png?width=800)
![](https://assets.st-note.com/img/1696988639790-vnVuq1ZSh4.png?width=800)
kafkaに与えます。
![](https://assets.st-note.com/img/1696988694010-1mmcjsjkaX.png?width=800)
2.Producerの設定とTopicの作成
![](https://assets.st-note.com/img/1696989714401-2tDZgSGTTY.png?width=800)
Kafkaブローカーへの接続情報を指定します。
メッセージのキーを変換するシリアライザのクラスを指定。
メッセージの値を変換するシリアライザのクラスを指定。
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
![](https://assets.st-note.com/img/1696989950947-2SpblbRjLb.png?width=800)
Kafkaブローカーに「wikimedia_recentchange」という名前の Topicが存在することを確認し、存在しない場合は新しく作成することです。
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic topic() {
return TopicBuilder.name("wikimedia_recentchange")
.build();
}
}
3. Producerの実装と実行
実装
![](https://assets.st-note.com/img/1696990984737-4dewYFnvvd.png)
このクラスはKafkaにメッセージを送信する役割をします。
@Service
public class WikimediaChangesProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
public WikimediaChangesProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage() {
String topic = "wikimedia_recentchange";
// to read real time stream data from wikimedia, we use event source
}
3つのライブラリを追加していきましょう。
https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource/3.0.0
![](https://assets.st-note.com/img/1696991219553-ejP92CRb01.png?width=800)
https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core/2.14.2
![](https://assets.st-note.com/img/1696991335014-VSMImUKWAU.png?width=800)
https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind/2.14.2
<dependencies>
<!-- https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource -->
<dependency>
<groupId>com.launchdarkly</groupId>
<artifactId>okhttp-eventsource</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>
Jackson Databindは、JavaオブジェクトとJSONデータとの間でのデータの結びつけます。JavaオブジェクトをJSONデータに変換するシリアライザ(Serialization)と、逆にJSONデータをJavaオブジェクトに変換するデシリアライザ(Deserialization)を提供します。
![](https://assets.st-note.com/img/1696991778216-xr2kcrlEVc.png?width=800)
![](https://assets.st-note.com/img/1696991816450-yxQdq6QUuO.png)
![](https://assets.st-note.com/img/1696991866408-lz83UqdZAz.png?width=800)
![](https://assets.st-note.com/img/1696992219296-AX8Omy3S7H.png?width=800)
![](https://assets.st-note.com/img/1696992738628-79ptlt4vPd.png?width=800)
下記のコードを完成します。
@Service
public class WikimediaChangesProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
public WikimediaChangesProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage() throws InterruptedException {
String topic = "wikimedia_recentchange";
// to read real time stream data from wikimedia, we use event source
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, topic);
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
sendMessageメソッド
sendMessageメソッドは、指定されたKafkaトピックにWikimediaのリアルタイムストリームデータを送信します。
EventHandlerというインターフェースを実装したクラス WikimediaChangesHandler を使用して、Wikimediaの変更データを処理するためのイベントハンドラを作成します。
EventSourceを使用して、Wikimediaのストリームエンドポイントからのリアルタイムイベントを購読します。これにより、Wikimediaからの変更データを受信できます。
TimeUnit.MINUTES.sleep(10)により、プログラムが10分間スリープすることで、イベントソースが10分間起動し続け、データを受信できるようになります。
実行
![](https://assets.st-note.com/img/1696998529272-X4pkMnCTDY.png?width=800)
CommandLineRunner インターフェースを実装しています。
runメソッド内では、WikimediaChangesProducerを注入し、sendMessageメソッドを呼び出しています。
これにより、WikimediaChangesProducerがKafkaにメッセージを送信する処理が実行されます。
@SpringBootApplication
public class SpringBootProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringBootProducerApplication.class);
}
@Autowired
private WikimediaChangesProducer wikimediaChangesProducer;
@Override
public void run(String... args) throws Exception {
wikimediaChangesProducer.sendMessage();
}
}
![](https://assets.st-note.com/img/1697000040281-n1sF10wuT8.png?width=800)
Zookeeperの実行
![](https://assets.st-note.com/img/1697000065345-L452LC6nVp.png?width=800)
Kafkaブロッカーの実行
![](https://assets.st-note.com/img/1697001026171-TsuKNLmBYM.png?width=800)
トピック名をwikimedia_recentchangeに変更した後、
コンシューマーはイベントを読み込む
![](https://assets.st-note.com/img/1697001151618-HhE44MwFKR.png?width=800)
![](https://assets.st-note.com/img/1697001316157-mgpgOJliXK.png?width=800)
4. Consumerプロジェクト・設定・クラスの実装
![](https://assets.st-note.com/img/1697001925662-PRKIvN6g0r.png?width=800)
![](https://assets.st-note.com/img/1697001878714-AAZfLf98Gm.png?width=800)
![](https://assets.st-note.com/img/1697002390466-Z4PiMizVpv.png?width=800)
![](https://assets.st-note.com/img/1697002413471-P3xHUxvg7m.png?width=800)
![](https://assets.st-note.com/img/1697002456840-QqwxawTaCA.png?width=800)
![](https://assets.st-note.com/img/1697002909421-bfUroLlmR4.png?width=800)
spring.kafka.consumer.bootsrap-servers: localhost:9092
spring.kafka.consumer.group-id: myGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id: myGroup
コンシューマーグループのIDを指定します。同じグループIDを持つコンシューマーは、同じトピックの異なるパーティションを分担して処理します。この設定により、メッセージがどのコンシューマーに届くかが制御されます。
spring.kafka.consumer.auto-offset-reset: earliest
コンシューマーグループが初めてトピックに参加するときや、オフセットが失われた場合に、どのオフセットからメッセージを読み始めるかを指定します。earliestは最も古い利用可能なメッセージから読み始めることを示します。
![](https://assets.st-note.com/img/1697003346705-FbUAIQzyjU.png?width=800)
@Service
public class KafkaDatabaseConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDatabaseConsumer.class);
@KafkaListener(
topics = "wikimedia_recentchange",
groupId = "myGroup"
)
public void consume(String eventMessage){
LOGGER.info(String.format("Message received -> %s", eventMessage));
}
}
@KafkaListener アノテーション:
@KafkaListenerアノテーションは、メソッドがKafkaトピックからメッセージをリッスンすることを示します。
topics属性によって、リッスンするKafkaトピックの名前が指定されています。この例では "wikimedia_recentchange" というトピックを指定しています。
groupId属性によって、このコンシューマーが所属するコンシューマーグループのIDが指定されています。この例では "myGroup" というグループIDを使用しています。
consumeメソッド
consumeメソッドは、Kafkaトピックから受信したメッセージを処理するためのメソッドです。
![](https://assets.st-note.com/img/1697004185683-iUm9HvqcbX.png?width=800)
プロデューサーにはイベントデータに関する情報が、
コンシューマーにはMessage receivedに関するデータが
素早く出力されます。
5. MySQLの設定と格納
![](https://assets.st-note.com/img/1697004509626-hy014Coar4.png?width=800)
create database wikimediaと入力すると、
スキーマにwikimediaが生成されました。
![](https://assets.st-note.com/img/1697004741623-xLN1PdQfeu.png?width=800)
![](https://assets.st-note.com/img/1697004708760-CRrotAiiaF.png?width=800)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
![](https://assets.st-note.com/img/1697004865570-juyLJlOp7S.png?width=800)
![](https://assets.st-note.com/img/1697005255506-bBg9h920y5.png?width=800)
spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia
spring.datasource..username=root
spring.datasource.password=1234
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQLDialect
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.show_sql=true
spring.jpa.properties.hibernate.use_sql_comments=true
spring.jpa.properties.hibernate.format_sql=true
![](https://assets.st-note.com/img/1697005539006-A1aGUhvHen.png?width=800)
@Lobは、RDBのテーブルに、
単独で巨大なサイズになるデータを格納する構造です。
@Entity
@Table(name = "wikimedia_recentchange")
@Getter
@Setter
public class WikimediaData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Lob
private String wikiEventData;
}
エンティティを上記のように作成します。
![](https://assets.st-note.com/img/1697005697198-4bSk61RIsI.png?width=800)
public interface WikimediaDataRepository extends JpaRepository<WikimediaData, Long> {
}
![](https://assets.st-note.com/img/1697005928747-eZu0WJj5XX.png?width=800)
このスプリング・ビンにはパラメータ化されたコンストラクタが1つしかないので、アノテーションを追加する必要はありません。
@Service
public class KafkaDatabaseConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDatabaseConsumer.class);
private WikimediaDataRepository dataRepository;
public KafkaDatabaseConsumer(WikimediaDataRepository dataRepository) {
this.dataRepository = dataRepository;
}
@KafkaListener(
topics = "wikimedia_recentchange",
groupId = "myGroup"
)
public void consume(String eventMessage){
LOGGER.info(String.format("Message received -> %s", eventMessage));
WikimediaData wikimediaData = new WikimediaData();
wikimediaData.setWikiEventData(eventMessage);
dataRepository.save(wikimediaData);
}
}
spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia
spring.datasource..username=root
spring.datasource.password=1234
データがDBに格納されなって、ずっと悩んでいたのですが、フィールドのタイプが変わらなかったのが原因でした。 そうなった最終的な原因が何なのか探り続けて、最終的にタイプミスを発見しました。犯人はapplication.propertiesの"spring.datasource..username"=>点が2回入っていた。 datasource.usernameに修正しました。忍耐力をテストするサプライズバグでした。
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: myGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia
spring.datasource.username=root
spring.datasource.password=1234
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=create-drop
spring.jpa.properties.hibernate.show_sql=true
spring.jpa.properties.hibernate.use_sql_comments=true
spring.jpa.properties.hibernate.format_sql=true
今まで設定ファイルで修正すべき部分は3つあります。
1.bootstarpの誤字をbootstrapに修正する
2. ..userを.userに修正する
3.updateをcreate-dropに修正する
アプリケーションの実行時にカラムサイズが適用されない理由は、Spring Bootがspring.jpa.hibernate.ddl-auto属性をupdateに設定したためです。 この属性は、アプリケーションの起動時、Spring Bootが既存のテーブルを変更しないように指定します。カラムサイズを適用するには、spring.jpa.hibernate.ddl-auto属性をcreateまたはcreate-dropに設定する必要があります。 この属性は、アプリケーションの起動時、Spring Bootが既存のテーブルを削除して再作成するように指定します。
![](https://assets.st-note.com/img/1697010372903-kAIUCMUNsj.png?width=800)
![](https://assets.st-note.com/img/1697010483441-FzornYSjVB.png?width=800)
6. リファクタリング: TopicNameの外部化/ハードコーディングの削除
![](https://assets.st-note.com/img/1697010856387-36PV7mjS4f.png?width=800)
@Value アノテーションで活用します。
![](https://assets.st-note.com/img/1697010979504-XOC4XSO0wI.png?width=800)
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.topic.name}")
private String topicName;
@Bean
public NewTopic topic() {
return TopicBuilder.name(topicName)
.build();
}
}
![](https://assets.st-note.com/img/1697011110758-Y2kJZKWKEv.png?width=800)
@Service
public class WikimediaChangesProducer {
@Value("${spring.kafka.topic.name}")
private String topicName;
private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
public WikimediaChangesProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage() throws InterruptedException {
String topic = topicName;
// to read real time stream data from wikimedia, we use event source
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, topic);
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
![](https://assets.st-note.com/img/1697011359435-hbIWAeegrC.png?width=800)
spring.kafka.consumer.group-idもつかわれます。
![](https://assets.st-note.com/img/1697011326518-BxZV4V1LcA.png?width=800)
修正しました!
![](https://assets.st-note.com/img/1697011796607-or0GQ6jb3D.png?width=800)
最後に
今日はKafkaを使ってproducerサービスとconsumerサービス、二つのマイクロサービスでWikimediaストリームデータをmysqlデータベースに格納するプロジェクトをやってみました。
![](https://assets.st-note.com/img/1697006834222-LuHc7etr8f.png?width=800)
実際に流通されるデータを使ってみると、実感が湧きますね。 このようにKafkaは大量のデータを処理するのに特化したプラットフォームであることがはっきり分かりました!次回はEvent Drivenという観点でKafkaを勉強してみたいと思います。
エンジニアファーストの会社 株式会社CRE-CO
ソンさん
【参考】
[Udemy] Building Microservices with Spring Boot & Spring Cloud
この記事が気に入ったらサポートをしてみませんか?