VerticaにおけるApache Kafka連携とは?

Verticaへのデータロードは、COPYコマンドを用いたCSVファイルのバルクロードで実施する方法が一般的です。INSERT文も使用できますが、Verticaのような列指向データベースは1件1件のINSERT処理よりも、ある程度まとまった件数をロードするバルクロードに向いています。
そのため、これまでのバージョンでは、IoT端末が生成する大量のログデータやTwitterのつぶやきデータ等の逐次発生するデータをリアルタイムで取り込むことは難しいというのが現実でした。
(ETLツール等を用いて、少量のデータを逐次COPYコマンドでロードするような仕組みを作れば実現できますが、そういった仕組みはデータ同期等のことを考えると複雑になりがちです)

Vertica 7.2では、オープンソース分散型メッセージングシステムの「Apache Kafka」(以下Kafka)のネイティブサポートを提供することで、これらのデータをニア・リアルタイムで取り込むことが可能になりました。

Kafka連携機能を利用してTwitterのつぶやきデータを取り込む場合の構成例

Kafka連携アーキテクチャ

例えばTwitterのつぶやきデータを分析したい場合は、図のような構成で実現することができます。TwitterのつぶやきデータをApache Flumeやfluentdなどを使って収集し、Kafkaに送信します。Kafkaには送られてきたデータがキューとしてKafka Cluster内に次々と溜まっていきます。このキューに溜まっているデータの中から、分析に使用するデータをVerticaが一定間隔(図の場合は10秒間隔)で受け取りに行き、内部的にCOPYコマンドを使ってVerticaにロードします。
このように、INSERTではなくVerticaが得意とするCOPYコマンドを使用したバルクロードとして処理することで、ログデータ等の逐次発生するようなデータをVerticaにリアルタイムに取り込むことが可能です。ロードしたデータについては通常のテーブルに格納されるので、すぐに分析を行うことができます。

設定手順

以下にKafka連携機能を利用するための具体的な手順を記載します。

0.事前準備

事前に任意のサーバにKafkaをインストールし、起動しておいてください。

1.Kafka スケジューラの定義

vkconfig schedulerコマンドを使用して、接続先のKafkaサーバの情報や、Kafkaにデータを取得しに行く間隔等を指定します。
設定例)

各パラメータの詳細は以下をご確認ください。

列名内容
add新しくスケジューラを定義する場合に使用します
config-schemaKafka連携でスケジューラごとにVerticaが使用するスキーマを指定します。スキーマは自動で作成されるため事前に作成しておく必要はありません。
usernameVerticaに接続する際に使用するユーザ名を指定します
passwordusernameに指定したユーザのログインパスワードを指定します
dbhost接続するVerticaのホスト名(IPアドレス)を指定します
dbport接続するVerticaのポート番号を指定します
frame-durationVerticaがKafkaにデータを取得しにいく間隔(例の場合は10秒)を指定します
brokers接続するKafka brokerのホスト名、ポートを指定します

2.データロード用のターゲットテーブルの作成

Kafkaから取得したデータをロードするためのテーブルを作成します。
通常のCREATE TABLE文を使用してテーブルを作成することができます。
例)

なお、ロードされるデータの内容が不明(半構造)な場合は、通常のテーブルではなく、Flex tableとして作成してください。
例)

3.Kafka Topicとターゲットテーブルの関連付け

KafkaではデータをTopicという単位で分類して格納しています。vkconfig topicコマンドを使用して取り込みたいKafkaのTopicとロード先のVerticaのテーブルとの関連付けを行います。
設定例)

各パラメータの詳細は以下をご確認ください。

列名内容
add新しくtopicとターゲットテーブルを関連付ける場合に指定します
config-schema「1.Kafka スケジューラの定義」のconfig-schemaで指定したスキーマと同名のスキーマを指定します
usernameVerticaに接続する際に使用するユーザ名を指定します
passwordusernameに指定したユーザのログインパスワードを指定します
dbhost接続するVerticaのホスト名(IPアドレス)を指定します
dbport接続するVerticaのポート番号を指定します
targetVerticaのターゲットテーブル(データのロード先)を指定します
topicデータを取得するKafkaのTopicを指定します
rejection-tableロードに失敗したデータの情報を格納するテーブルを指定します

4.Kafkaスケジューラの実行

設定した内容でKafka連携を開始するためにvkconfig launchコマンドを実行します。
本コマンドを実行すると、Kafka連携がスタートします。
設定例)

各パラメータの詳細は以下をご確認ください。

列名内容
config-schema「1.Kafka スケジューラの定義」のconfig-schemaで指定したスキーマと同名のスキーマを指定します
usernameVerticaに接続する際に使用するユーザ名を指定します
passwordusernameに指定したユーザのログインパスワードを指定します
dbhost接続するVerticaのホスト名(IPアドレス)を指定します
dbport接続するVerticaのポート番号を指定します

なお、定義の中にはパスワードが含まれているため、実際に運用する際は設定内容をプロパティファイルに記述し、ファイルを指定して実行する方法が推奨されます。
例えば、vkconfig launchの場合は以下のように記述します。

configFile.propertiesファイルの記述内容


また、今回使用していないその他のオプションについては、マニュアルの以下のページをご参考ください。
Kafka Utility Options

動作確認

今回はKafkaに付属している「kafka-console-producer.sh」を使用してKafkaにデータを送信した場合の動作を確認します。

1.Kafkaへのデータ送信

「kafka-console-producer.sh」から以下のように「アシスト」「Vertica」「データベース」と、3つのメッセージを入力します。

2.Verticaのテーブルを検索

上記「2.データロード用のターゲットテーブルの作成」で作成したテーブルを検索すると、Kafkaに送信したデータがVerticaにロードされていることが確認できます。


なお、Kafka連携を終了する場合は以下のコマンドを実行します。


検証バージョンについて

この記事の内容はVertica 7.2、Kafka 0.9で確認しています。