メインコンテンツまでスキップ
最新1mo ago

Athena DynamoDB コネクタ

Body:

DynamoDB テーブルを Redash/Athena に接続し、自動スキーマ検出を実現 — 1 つのコマンドですべてをデプロイします。


機能

  • 自動スキーマ推論 — DynamoDB テーブルをスキャンし、すべてのカラムと型を自動的に検出します

  • ワンコマンドデプロイ — 単一の CLI コマンドで Lambda コネクタ、Athena カタログ、Glue テーブルを作成します

  • SQL でのクエリ — Redash または Athena コンソールで標準 SQL を使用して DynamoDB テーブルをすぐにクエリできます

  • 複数テーブルのサポート — 同じ Glue データベースに無制限のテーブルを追加します

  • クリーンアーキテクチャ — 各テーブルは独自の Lambda コネクタを持ち、適切な IAM 分離が確保されます


クイックスタート

cd tool/redash-dynamodb-connector

# DynamoDB テーブルを追加
pnpm exec tsx src/index.ts add \
--table pascal-alert-history \
--glue-database unit-dev-analytics-database

ツールが実行すること:

  1. ✅ テーブルの存在を確認

  2. ✅ スキーマをスキャンして推論 (すべてのカラムと型を自動的に検出)

  3. ✅ Lambda コネクタ関数を作成

  4. ✅ Athena データカタログを作成

  5. ✅ 検出されたスキーマで Glue テーブルを作成

  6. ✅ CDK 経由でインフラをデプロイ テーブルがクエリ可能になります!


前提条件

  • AWS 認証情報が設定されている (AWS_PROFILE またはデフォルト認証情報経由)

  • Node.js と pnpm がインストール済み

  • CDK CLI が利用可能 (npx cdk が自動的に使用されます)


使用方法

基本コマンド

pnpm exec tsx src/index.ts add --table <table-name> --glue-database <database-name>

コマンドオプション

オプション説明必須デフォルト
--tableDynamoDB テーブル名はい-
--glue-database既存の Glue データベース名いいえ新しいデータベースを作成
--catalog-nameカスタム Athena カタログ名いいえサニタイズされたテーブル名
--column-mapping混合ケースのカラムをマップいいえ-
--product製品名 (unit/spring/plants)いいえunit または $PRODUCT_NAME
--env環境 (dev/stg/prod)いいえdev または $ENV_NAME
--regionAWS リージョンいいえap-northeast-1

自動スキーマ検出でテーブルを追加

pnpm exec tsx src/index.ts add \
--table pascal-alert-history \
--glue-database unit-dev-analytics-database

出力:

✓ Table found
Inferring table schema...
✓ Inferred 12 columns from 10 sample items
- alert_id: string
- code: string
- created_at: string
- level: int
- status: string
...
Deploying connector...
✓ Deployment complete!

複数のテーブルを同じデータベースに追加


# テーブル 1
pnpm exec tsx src/index.ts add \
--table pascal-alert-history \
--glue-database unit-dev-analytics-database

# テーブル 2
pnpm exec tsx src/index.ts add \
--table device-metrics \
--glue-database unit-dev-analytics-database

# テーブル 3
pnpm exec tsx src/index.ts add \
--table user-events \
--glue-database unit-dev-analytics-database

すべてのテーブルは同じ Glue データベースを共有しますが、個別の Lambda コネクタを持ちます。

カラムマッピング付き (混合ケースのカラムの場合)

pnpm exec tsx src/index.ts add \
--table MyDynamoDBTable \
--column-mapping "deviceid=DeviceId,timestamp=TimeStamp" \
--glue-database unit-dev-analytics-database


作成されるもの

テーブルごと

Lambda 関数: {product}-{env}-dynamodb-{table_name}

  • 例: unit-dev-dynamodb-pascal_alert_history

  • Athena からの連合クエリを DynamoDB に処理します

  • 正しい IAM 権限で自動的にプロビジョニングされます

Athena データカタログ: Lambda 関数と同じ名前

  • Lambda コネクタを指します

  • Athena 経由での SQL クエリを有効にします

Glue テーブル: 指定されたデータベース内の {table_name}

  • 自動推論されたカラムスキーマを含みます

  • Athena がテーブル構造を理解するためのメタデータ

共有リソース (スタックごとに 1 回作成)

S3 Spill バケット: {product}-{env}-dynamodb-spill

  • クエリ結果がメモリを超えたときに Lambda で使用

  • 1 日後に自動クリーンアップ

S3 結果バケット: {product}-{env}-athena-results

  • Athena クエリ結果を保存

  • 30 日後に自動クリーンアップ

Athena ワークグループ: {product}-{env}-analytics

  • クエリ実行用に設定

  • CloudWatch メトリクス有効


命名規則

ツールは一貫した命名パターンに従います:

リソースパターン
Lambda 関数{product}-{env}-dynamodb-{table}unit-dev-dynamodb-pascal_alert_history
Athena カタログLambda と同じunit-dev-dynamodb-pascal_alert_history
Glue テーブル{table} (サニタイズ済み)pascal_alert_history
Spill バケット{product}-{env}-dynamodb-spillunit-dev-dynamodb-spill
結果バケット{product}-{env}-athena-resultsunit-dev-athena-results
ワークグループ{product}-{env}-analyticsunit-dev-analytics

注: テーブル名は AWS の命名要件を満たすためにロワーケースに変換され、ハイフンはアンダースコアに置き換えられます。


Redash でのクエリ

1. Redash データソースを作成

Redash で Athena データソースを設定:

設定
名前dynamodb-connector-dev
タイプAmazon Athena
AWS リージョンap-northeast-1
AWS アクセスキーIAM ユーザーのアクセスキー
AWS シークレットキーIAM ユーザーのシークレットキー
S3 ステージングパスs3://unit-dev-athena-results/
スキーマ名unit-dev-analytics-database
Athena ワークグループunit-dev-analytics

追加設定 (セクションを展開):

  • 「Glue データカタログを使用」 をチェック

  • Glue データカタログ ID: unit-dev-dynamodb-pascal_alert_history,unit-dev-dynamodb-device_metrics (すべてのカタログ名をカンマ区切りで指定)

2. データをクエリ

3 部構成の命名形式を使用:

SELECT *
FROM "unit-dev-dynamodb-pascal_alert_history"."unit-dev-analytics-database"."pascal_alert_history"
LIMIT 10;

形式: "catalog_name"."database_name"."table_name"

クエリ例

ステータスでフィルタリング:

SELECT alert_id, code, status, created_at, level
FROM "unit-dev-dynamodb-pascal_alert_history"."unit-dev-analytics-database"."pascal_alert_history"
WHERE status = 'active'
ORDER BY created_at DESC
LIMIT 100;

データを集計:

SELECT
code,
COUNT(*) as alert_count,
AVG(duration_seconds) as avg_duration
FROM "unit-dev-dynamodb-pascal_alert_history"."unit-dev-analytics-database"."pascal_alert_history"
WHERE level > 1
GROUP BY code
ORDER BY alert_count DESC;

他のテーブルと結合:

SELECT
a.alert_id,
a.code,
d.device_name,
a.created_at
FROM "unit-dev-dynamodb-pascal_alert_history"."unit-dev-analytics-database"."pascal_alert_history" a
JOIN "unit-dev-dynamodb-devices"."unit-dev-analytics-database"."devices" d
ON a.device_id = d.device_id
WHERE a.status = 'active'
LIMIT 50;

3. 新しいテーブルを追加

新しいテーブルを追加するとき:

ステップ 1: ツールでデプロイ:

pnpm exec tsx src/index.ts add --table new-table --glue-database unit-dev-analytics-database

ステップ 2: Redash データソースを更新:

  • データソース設定に移動

  • Glue データカタログ ID フィールドに新しいカタログ ID を追加:

unit-dev-dynamodb-pascal_alert_history,unit-dev-dynamodb-device_metrics,unit-dev-dynamodb-new_table


**ステップ 3:** すぐにクエリ:

```sql
SELECT * FROM "unit-dev-dynamodb-new_table"."unit-dev-analytics-database"."new_table" LIMIT 10;


必要な IAM 権限

デプロイ用 (開発者/管理者)

デプロイを実行する AWS 認証情報には以下が必要です:

  • スタックに対する cloudformation:*

  • コネクタ作成用の lambda:*

  • カタログとワークグループ作成用の athena:*

  • データベースとテーブル作成用の glue:*

  • バケット作成用の s3:*

  • Lambda 実行ロール用の iam:CreateRoleiam:AttachRolePolicy

  • SAR からのコネクタデプロイ用の serverlessrepo:CreateCloudFormationTemplate

クエリ用 (Redash IAM ユーザー)

以下の権限を持つ カスタマー管理 IAM ポリシー を作成:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:AbortMultipartUpload"
],
"Resource": [
"arn:aws:s3:::unit-dev-dynamodb-spill/*",
"arn:aws:s3:::unit-dev-athena-results",
"arn:aws:s3:::unit-dev-athena-results/*"
]
},
{
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults",
"athena:StopQueryExecution",
"athena:GetWorkGroup",
"athena:GetDataCatalog",
"athena:GetDatabase",
"athena:GetTableMetadata",
"athena:ListDataCatalogs",
"athena:ListDatabases",
"athena:ListTableMetadata"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetTable",
"glue:GetTables",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": [
"arn:aws:glue:ap-northeast-1:404232320784:catalog",
"arn:aws:glue:ap-northeast-1:404232320784:database/unit-dev-analytics-database",
"arn:aws:glue:ap-northeast-1:404232320784:table/unit-dev-analytics-database/*"
]
},
{
"Effect": "Allow",
"Action": ["lambda:InvokeFunction"],
"Resource": [
"arn:aws:lambda:ap-northeast-1:404232320784:function:unit-dev-dynamodb-*"
]
},
{
"Effect": "Allow",
"Action": [
"dynamodb:List*",
"dynamodb:DescribeStream",
"dynamodb:DescribeTable",
"dynamodb:Get*",
"dynamodb:Query",
"dynamodb:Scan"
],
"Resource": "arn:aws:dynamodb:ap-northeast-1:404232320784:table/*"
}
]
}

注: ワイルドカードパターン (unit-dev-dynamodb-*table/*) により、毎回 IAM 権限を変更することなく、現在および将来のすべてのテーブルをクエリできます。

IAM 権限を追加するステップ

  1. IAM コンソールポリシー に移動

  2. ポリシーを作成JSON タブをクリック

  3. 上記のポリシーを貼り付け (必要に応じてリージョン/アカウント ID を調整)

  4. 名前を付ける: RedashDynamoDBConnectorAccess

  5. ポリシーを作成

  6. Redash IAM ユーザー → 権限ポリシーを接続 に移動

  7. RedashDynamoDBConnectorAccess を検索して接続


自動スキーマ推論

ツールは以下を行うことで DynamoDB テーブルスキーマを自動的に検出します:

  1. サンプル項目をスキャン — テーブルから 10 項目を読み取ります (設定可能)

  2. 型を検出 — JavaScript 型を Athena/Glue 型にマップ:

    • stringstring
    • number (整数) → int
    • number (小数) → double
    • booleanboolean
    • arrayarray\\<type>
    • objectstring (JSON として保存)
  3. スキーマを作成 — 検出されたカラムで Glue テーブルを生成

  4. 混合型を処理 — カラムが複数の型を持つ場合、デフォルトは string

出力例:

Inferring table schema...
Scanning 10 items to infer schema...
✓ Inferred 12 columns from 10 sample items
- alert_data: string
- alert_id: string
- clear_reason: string
- cleared_at: string
- code: string
- created_at: string
- created_month: string
- device_id: string
- duration_seconds: int
- level: int
- severity: string
- status: string

制限事項

  • 空のテーブル — テーブルに項目がない場合、スキーマ推論は警告しますがデプロイは空のスキーマで続行

  • ネストされたオブジェクト — 複雑なネストされた構造は JSON 文字列として表現

  • 型の一貫性 — スキーマはサンプル項目に基づきます; データが矛盾した型を持つ場合、一部のクエリが失敗する可能性があります


アーキテクチャ

┌─────────────────┐
│ Redash Query │
└────────┬────────┘
│ SQL Query

┌─────────────────────────────┐
│ Amazon Athena │
│ (unit-dev-analytics) │
└────────┬────────────────────┘


┌─────────────────────────────┐
│ Athena Data Catalog │
│ (Lambda Connector) │
└────────┬────────────────────┘
│ Invoke

┌─────────────────────────────┐
│ Lambda Function │
│ (AthenaDynamoDBConnector) │
└────────┬────────────────────┘
│ Read

┌─────────────────────────────┐
│ DynamoDB Table │
│ (Your Data) │
└─────────────────────────────┘


┌─────────────────────────────┐
│ S3 Spill Bucket │
│ (Large Result Sets) │
└─────────────────────────────┘


┌─────────────────────────────┐
│ S3 Results Bucket │
│ (Query Results) │
└─────────────────────────────┘

データフロー:

  1. Redash が SQL クエリを Athena に送信

  2. Athena が Glue カタログでテーブルメタデータを検索

  3. Athena が特定のカタログの Lambda コネクタを呼び出す

  4. Lambda が DynamoDB からデータを読み取る

  5. Lambda が結果を返す (データが大きい場合は S3 spill バケットを使用)

  6. Athena が最終結果を S3 結果バケットに書き込む

  7. Redash が結果を取得して表示


トラブルシューティング

接続テスト失敗: S3 へのアクセスが拒否される

問題: IAM ユーザーが結果バケットに書き込めません

解決策: IAM ポリシーに S3 権限を追加 (IAM 権限セクション参照)

クエリを実行するための権限が不足している

問題: Athena/Lambda/Glue/DynamoDB 権限が不足

解決策: 完全な RedashDynamoDBConnectorAccess ポリシーを IAM ユーザーに接続

SCHEMA_NOT_FOUND エラー

問題: カタログ名を指定せずにクエリを実行

解決策: 完全修飾テーブル名を使用: "catalog"."database"."table"

COLUMN_NOT_FOUND エラー

問題: Glue テーブルにカラム定義がない (自動スキーマ推論では発生しないはず)

解決策: テーブルを再デプロイ — ツールがスキーマを再推論します

Redash スキーマブラウザにテーブルが表示されない

問題: カタログ ID が Redash データソースに追加されていない

解決策: データソース設定の「Glue データカタログ ID」フィールドにカタログ ID を追加

クエリが遅い/タイムアウト

問題: 大規模な DynamoDB テーブルで全スキャン

解決策:

  • DynamoDB テーブルに一般的なクエリパターンのインデックスを追加

  • クエリで LIMIT 句を使用

  • スキャンサイズを減らすために WHERE 条件を追加

  • ツール設定で Lambda メモリを増やすことを検討


高度な設定

カスタムコネクタバージョン

src/types.ts を編集して AWS SAR コネクタバージョンを変更:

connectorVersion: z.string().default('2022.47.1'); // このバージョンを変更

カスタム Lambda メモリ/タイムアウト

src/types.ts を編集:

lambdaMemory: z.number().min(512).max(10240).default(3008); // MB でメモリを変更

lambdaTimeout: z.number().min(60).max(900).default(900); // 秒でタイムアウトを変更

カスタムライフサイクルポリシー

src/types.ts を編集:

spillBucketLifecycleDays: z.number().min(1).default(1); // spill データが削除されるまでの日数

resultsBucketLifecycleDays: z.number().min(1).default(30); // クエリ結果が削除されるまでの日数


開発

プロジェクト構造

redash-dynamodb-connector/
├── src/
│ ├── index.ts # CLI エントリポイント
│ ├── types.ts # 型定義とスキーマ
│ ├── aws/
│ │ ├── credentials.ts # AWS 認証情報プロバイダー
│ │ └── dynamodb.ts # DynamoDB 操作とスキーマ推論
│ ├── utils/
│ │ └── shared-arg.ts # 共有引数ユーティリティ
│ └── cdk/
│ ├── app.ts # CDK アプリファクトリ
│ ├── stacks/
│ │ └── connector-stack.ts # メインスタック
│ └── constructs/
│ └── athena-connector.ts # コネクタコンストラクト
├── package.json
├── cdk.json # CDK 設定
├── tsconfig.json
└── README.md

ローカルで実行


# 依存関係をインストール
pnpm install

# リンターを実行
pnpm lint

# 型チェック
pnpm tsc

# テーブルをデプロイ
pnpm exec tsx src/index.ts add --table my-table --glue-database my-db

テスト

ツールは入力を検証しますが、自動テストはまだ含まれていません。テストは実際のデプロイを通じて行われます。


コスト考慮事項

クエリごとのコスト

  • Athena: スキャンされた 1 TB あたり $5

  • Lambda: 1M リクエストあたり $0.20 + コンピュート時間

  • DynamoDB: 消費された読み込みキャパシティユニット

  • S3: ストレージ + リクエスト (最小限)

月額コスト (推定)

1 ヶ月 1000 クエリ、1 GB テーブルの場合:

  • Athena: ~$0.005 (1 GB × 1000 クエリ = 1 TB × $5)

  • Lambda: ~$0.20

  • DynamoDB: テーブルの RCU 設定に依存

  • S3: < $1

合計: ~$1-5/月 (通常の使用)

コスト最適化のヒント

  • スキャンされるデータを減らすために LIMIT 句を使用

  • SELECT * の代わりに特定のカラムをクエリ

  • DynamoDB インデックスを頻繁にクエリされるパターンに追加

  • S3 バケットに適切なライフサイクルポリシーを設定

  • Athena ワークグループを使用してコストを追跡および制限


制限事項

  1. 書き込み操作なし — ツールは読み取りクエリ (SELECT) のみサポート

  2. スキャンパフォーマンス — 大規模テーブル上の全テーブルスキャンは遅い可能性

  3. 型推論 — サンプルデータに基づいており、すべてのエッジケースをキャプチャしないかもしれません

  4. 更新なし — 既存のコネクタ設定を変更できません; 削除して再作成する必要があります

  5. Lambda 同時実行数 — AWS Lambda 同時実行数制限に制限されます

  6. 結果サイズ — 大きな結果セットは S3 spill が必要 (自動的に処理)


FAQ

Q: 単一のクエリで複数の DynamoDB テーブルをクエリできますか? A: はい! 各テーブルに完全修飾テーブル名で JOIN 句を使用します。

Q: DynamoDB スキーマが変更された場合、コネクタを再作成する必要がありますか? A: はい、add コマンドを再実行してスキーマを再推論し、Glue テーブルを更新します。

Q: これは DynamoDB グローバルテーブルで動作しますか? A: はい、リージョン内のテーブルをポイントするだけです。各リージョンは独自のコネクタが必要です。

Q: コネクタを削除するにはどうすればよいですか? A: CloudFormation スタックを削除: aws cloudformation delete-stack --stack-name unit-dev-dynamodb-connector

Q: DynamoDB ストリームをクエリできますか? A: いいえ、このツールはストリームではなく、テーブル自体のみをクエリします。

Q: これは DynamoDB DAX で動作しますか? A: いいえ、Lambda コネクタは DAX 経由ではなく DynamoDB を直接クエリします。

Q: これは LocalStack または DynamoDB Local で動作しますか? A: いいえ、SAR からのコネクタのデプロイに依存しているため直接には動作しません。


リソース


ライセンス

ISC


貢献

Issue と pull request を受け付けています! 送信前にコードがリント検証を通過することを確認してください。


サポート

Issue や質問がある場合:

  1. 上記のトラブルシューティングセクションを確認

  2. デプロイエラーについては AWS CloudFormation スタックイベントを確認

  3. CloudWatch で Lambda 関数ログを確認

  4. このリポジトリで issue を開く

Related Articles