Data Pipelineで各種ログをRedshiftにロードする

AWSのログで遊ぼうシリーズ第4弾 – Data Pipeline x Redshift。
Redshiftは第1弾~第3弾で説明した通り、COPYコマンドでデータをロードすることができるので、単純にロードだけを考えるならRedshiftを単独で使うだけで十分です。
でも、Data Pipelineと連携して使うと、スケジューリングやエラー処理、通知など高度なデータ連携が可能です。
今回はData Pipelineを使って、ELB、S3、CloudFrontのログをRedshiftにロードします。

datapipeline_redshift


前提知識

Data Pipelineの利用イメージ

Data Pipelineは簡単に説明するとデータ連携のためのサービスです。

GUIで表現されたマップ上でアイコンを配置していき、簡単にデータ連携を定義できます。
GUIで定義したPipelineは内部的にJSONで表現され、1クリックでJSONをExportできます。

edit_pipeline

ただ、個人的にはGUIではなく生のJSONを直接扱うほうが好きなので、今回はJSONベースで説明していきます。

Data PipelineにおけるRedshift

Data Pipelineは2013年11月にRedshiftに対応し、直近では2014年10月にさらなる機能改善が実施されました。
Data PipelineでRedshiftを扱おうとすると下記の3種類のオブジェクトが必要になります。

  • RedshiftDataNode
  • RedshiftDatabase
  • RedshiftCopyActivity
RedshiftDataNode

RedshiftDataNodeではRedshift Cluster上のテーブルを設定します。

RedshiftDatabase

各種ログをロードする対象のRedshift Clusterを定義します。

RedshiftCopyActivity

RedshiftCopyActivityでは、CSV/TSVのファイルをRedshiftにCOPYします。
ちなみに、今回ターゲットにしているログのうち、ELBログとS3ログが空白区切りであるため、そのままでは使用できません。
ただし、2014年10月から新たに導入されたcommandOptionsを使えばRedshiftのCOPY文で使用する各種オプションを利用できるので、delimiterを指定したCOPYが可能になります。

事前準備

データロード先のRedshift Clusterをあらかじめ作成します。
テーブルはPipeline処理の内部で作成するため、事前に作成する必要はありません。

Pipelineの定義

ELBログ用のS3DataNode

S3DataNode
    {
      "id": "S3DataNodeElbLog",
      "name": "S3DataNodeElbLog",
      "type": "S3DataNode",
      "directoryPath": "s3://path_to_elb_log/",
      "schedule": {
        "ref": "ScheduleId1"
      }
    },

S3ログ用のS3DataNode

S3DataNode
    {
      "id": "S3DataNodeS3Log",
      "name": "S3DataNodeS3Log",
      "type": "S3DataNode",
      "directoryPath": "s3://path_to_s3_log/",
      "schedule": {
        "ref": "ScheduleId1"
      }
    },

CloudFrontログ用のS3DataNode

S3DataNode
    {
      "id": "S3DataNodeCloudFrontLog",
      "name": "S3DataNodeCloudFrontLog",
      "type": "S3DataNode",
      "directoryPath": "s3://path_to_cloudfront_log/",
      "schedule": {
        "ref": "ScheduleId1"
      }
    },

ポイントはdataFormatを指定していない点です。
CloudFrontのログはTSV形式で出力されるため、本来S3DataNodeではビルトインのdataFormatであるTSVのオブジェクトを指定すればよいはずです。
しかし、後述のRedshiftCopyActivityでcommandOptionsを使用したいため、あえてdataFormatの指定を外しています。

共用のRedshiftDatabase

各種ログをロードする対象のRedshift Clusterを定義します。
今回は3種類いずれも同一のデータベースにロードするため、共用のRedshiftDatabaseを下記のように定義します。

RedshiftDatabase
    {
      "id": "RedshiftDatabaseId1",
      "name": "DefaultRedshiftDatabase1",
      "type": "RedshiftDatabase",
      "clusterId": "hoge",
      "databaseName": "dev",
      "username": "hogehoge",
      "*password": "fugafuga"
    },

clusterId, databaseName, username, passwordを指定していることからも、Redshift ClusterがData PipelineのオブジェクトとしてのRedshiftDatabaseに対応することがわかるかと思います。

ELBログ用のRedshiftDataNodeとRedshiftCopyActivity

ELBログ用に、RedshiftDataNodeとRedshiftCopyActivityを設定します。
RedshiftDataNodeではRedshift Cluster上のテーブルを設定し、RedshiftCopyActivityでは実際にロードする際に使用するCOPYコマンドを設定するイメージです。

RedshiftDataNode
    {
      "id": "RedshiftDataNodeElbLog",
      "name": "RedshiftDataNodeElbLog",
      "type": "RedshiftDataNode",
      "database": {
        "ref": "RedshiftDatabaseId1"
      },
      "tableName": "elb_access_logs",
      "createTableSql": "CREATE TABLE elb_access_logs ( request_timestamp DateTime encode lzo, elb varchar(30) encode lzo, client_port varchar(22) encode lzo, backend_port varchar(22) encode lzo, request_processing_time FLOAT encode bytedict, backend_processing_time FLOAT encode bytedict, response_prosessing_time FLOAT encode bytedict, elb_status_code varchar(3) encode lzo, backend_status_code varchar(3) encode lzo, received_bytes BIGINT encode lzo, sent_bytes BIGINT encode lzo, request varchar(MAX) encode lzo)sortkey(request_timestamp) ;",
      "schedule": {
        "ref": "ScheduleId1"
      }
    },
RedshiftCopyActivity
    {
      "id": "RedshiftCopyActivityElbLog",
      "name": "RedshiftCopyActivityElbLog",
      "type": "RedshiftCopyActivity",
      "input": {
        "ref": "S3DataNodeElbLog"
      },
      "commandOptions": [
        "DELIMITER ' '",
        "TRUNCATECOLUMNS",
        "TRIMBLANKS",
        "REMOVEQUOTES",
        "TIMEFORMAT as 'auto'",
        "ACCEPTINVCHARS",
        "MAXERROR as 100000"
      ],
      "insertMode": "TRUNCATE",
      "runsOn": {
        "ref": "Ec2ResourceId1"
      },
      "schedule": {
        "ref": "ScheduleId1"
      },
      "output": {
        "ref": "RedshiftDataNodeElbLog"
      }
    },

ポイントはcommandOptionsでdelimiterを明示的に指定している点です。

S3ログ用のRedshiftDataNodeとRedshiftCopyActivity

S3ログ用に、RedshiftDataNodeとRedshiftCopyActivityを設定します。
大体、ELBログの定義と同じようなイメージになります。

RedshiftDataNode
    {
      "id": "RedshiftDataNodeS3Log",
      "name": "RedshiftDataNodeS3Log",
      "type": "RedshiftDataNode",
      "database": {
        "ref": "RedshiftDatabaseId1"
      },
      "tableName": "s3_access_logs",
      "createTableSql": "CREATE TABLE s3_access_logs ( bucket_owner VARCHAR(MAX) ENCODE RUNLENGTH, bucket VARCHAR(255) ENCODE LZO, request_timestamp VARCHAR(MAX) SORTKEY ENCODE LZO, request_timestamp_delta VARCHAR(MAX) ENCODE LZO, remote_ip VARCHAR(50) ENCODE LZO, requestor VARCHAR(MAX) ENCODE LZO, request_id VARCHAR(MAX) ENCODE LZO, operation VARCHAR(MAX) ENCODE LZO, key VARCHAR(MAX) ENCODE LZO, request_uri VARCHAR(MAX) DISTKEY ENCODE LZO, http_status_code VARCHAR(MAX) ENCODE LZO, error_code VARCHAR(MAX) ENCODE LZO, sent_bytes VARCHAR(MAX) ENCODE LZO, object_size VARCHAR(MAX) ENCODE LZO, total_time VARCHAR(MAX) ENCODE LZO, turn_around_time VARCHAR(MAX) ENCODE LZO, referer VARCHAR(MAX) ENCODE LZO, user_agent VARCHAR(MAX) ENCODE LZO, version_id VARCHAR(10) ENCODE LZO);",
      "schedule": {
        "ref": "ScheduleId1"
      }
    },
RedshiftCopyActivity
    {
      "id": "RedshiftCopyActivityS3Log",
      "name": "RedshiftCopyActivityS3Log",
      "type": "RedshiftCopyActivity",
      "input": {
        "ref": "S3DataNodeS3Log"
      },
      "commandOptions": [
        "DELIMITER ' '",
        "TRUNCATECOLUMNS",
        "TRIMBLANKS",
        "REMOVEQUOTES",
        "ACCEPTINVCHARS",
        "MAXERROR as 100000"
      ],
      "insertMode": "TRUNCATE",
      "runsOn": {
        "ref": "Ec2ResourceId1"
      },
      "schedule": {
        "ref": "ScheduleId1"
      },
      "output": {
        "ref": "RedshiftDataNodeS3Log"
      }
    },

ポイントはcommandOptionsでdelimiterを明示的に指定している点です。

CloudFrontログ用のRedshiftDataNodeとRedshiftCopyActivity

CloudFrontログ用に、RedshiftDataNodeとRedshiftCopyActivityを設定します。
そろそろ慣れてきますね。

RedshiftDataNode
    {
      "id": "RedshiftDataNodeCloudFrontLog",
      "name": "RedshiftDataNodeCloudFrontLog",
      "type": "RedshiftDataNode",
      "database": {
        "ref": "RedshiftDatabaseId1"
      },
      "tableName": "cloudfront_access_logs",
      "createTableSql": "CREATE TABLE cloudfront_access_logs ( request_date VARCHAR(MAX) SORTKEY, request_time VARCHAR(MAX) ENCODE LZO, x_edge_location VARCHAR(40) ENCODE LZO, sc_bytes INT ENCODE LZO, remote_ip VARCHAR(50) ENCODE LZO, cs_method VARCHAR(50) ENCODE LZO, cs_host VARCHAR(MAX) ENCODE LZO, cs_uri_stem VARCHAR(MAX) DISTKEY ENCODE LZO, sc_status VARCHAR(20) ENCODE LZO, cs_referrer VARCHAR(MAX) ENCODE LZO, cs_useragent VARCHAR(MAX) ENCODE LZO, cs_uri_query VARCHAR(MAX) ENCODE LZO, cs_cookie VARCHAR(MAX) ENCODE LZO, x_edge_result_type VARCHAR(MAX) ENCODE LZO, x_edge_request_id VARCHAR(MAX) ENCODE LZO, x_host_header VARCHAR(MAX) ENCODE LZO, cs_protocol VARCHAR(10) ENCODE LZO, cs_bytes INT ENCODE LZO, time_taken VARCHAR(MAX) ENCODE LZO);",
      "schedule": {
        "ref": "ScheduleId1"
      }
    },
RedshiftCopyActivity
    {
      "id": "RedshiftCopyActivityCloudFrontLog",
      "name": "RedshiftCopyActivityCloudFrontLog",
      "type": "RedshiftCopyActivity",
      "input": {
        "ref": "S3DataNodeCloudFrontLog"
      },
      "commandOptions": [
        "DELIMITER '\t'",
        "IGNOREHEADER 2",
        "TRUNCATECOLUMNS",
        "TRIMBLANKS",
        "ACCEPTINVCHARS",
        "MAXERROR as 100000",
        "gzip"
      ],
      "insertMode": "TRUNCATE",
      "runsOn": {
        "ref": "Ec2ResourceId1"
      },
      "schedule": {
        "ref": "ScheduleId1"
      },
      "output": {
        "ref": "RedshiftDataNodeCloudFrontLog"
      }
    },

ポイントはELBやS3の場合と同じように、commandOptionsを指定している点です。
CloudFrontのログはTSV形式で出力されるため、inputに指定するS3DataNodeにビルトインのdataFormatであるTSVのオブジェクトが指定されていれば、commandOptionsは不要です。
逆に、inputのS3DataNodeに何らかのdataFormatが指定されていると、commandOptionsが無効となります。
今回は、CloudFrontのログをパースするためにcommandOptions(特にIGNOREHEADERとgzip)を使用したいため、あえてこのようにしています。

その他の共用のオブジェクト

Default
    {
      "id": "Default",
      "scheduleType": "TIMESERIES",
      "failureAndRerunMode": "CASCADE",
      "name": "Default",
      "pipelineLogUri": "s3://path_to_log",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },

Pipeline定義には必須です。

Schedule
    {
      "id": "ScheduleId1",
      "name": "DefaultSchedule1",
      "startAt": "FIRST_ACTIVATION_DATE_TIME",
      "type": "Schedule",
      "period": "15 Minutes"
    },

Pipeline定義には必須です。

Ec2Resource
    {
      "id": "Ec2ResourceId1",
      "name": "DefaultEc2Resource1",
      "type": "Ec2Resource",
      "terminateAfter": "1 HOURS",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "securityGroups": "default",
      "logUri": "s3://path_to_log",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },

RedshiftCopyActivityを使う上で、Activityを実行するResourceとして必要になります。
ハマりがちなポイントとして、Ec2ResourceのsecurityGroupsに指定するのはSecurity Group IDではなくSecurity Group Nameなのでご注意を。

以上、長々と説明してきましたが、最終的に完成したPipeline定義をGistに用意しました。

動作確認

Pipelineの実行

$ aws datapipeline create-pipeline --name "elb2redshift" --unique-id "elb2redshift"
{
    "pipelineId": "df-00901072TQD44FKNKAR3"
}
$ aws datapipeline put-pipeline-definition --pipeline-id df-00901072TQD44FKNKAR3 --pipeline-definition file://./elblog2redshift.json
$ aws datapipeline activate-pipeline --pipeline-id df-00901072TQD44FKNKAR3

あとは実際にPipelineが実行されるのを15分ほど待ちます。

結果の確認

Data PipelineコンソールでPipelineの実行状況を確認します。
こんな感じで、StatusがFINISHEDになっていればPipelineの実行は正常に完了しています。
view_execution_details

あとはインポート先のRedshiftクラスタにアクセスして煮るなり焼くなり好きにクエリすればいいかと。

参考

  1. トラックバックはまだありません。

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト / 変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト / 変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト / 変更 )

Google+ フォト

Google+ アカウントを使ってコメントしています。 ログアウト / 変更 )

%s と連携中

%d人のブロガーが「いいね」をつけました。