AWS資料遷移案例分析
Customer Profile
資料來源
地端機房的 MySQL DB cluster (Version 8.xx)。
資料概況
- 資料每日會產生50M records 且每筆 records約為 2K。
需求: 在AWS上建立報表系統
- 以一般使用者(包含 power user)為目標,使用QuickSight Enterprise版本建立Data Dashboard。
- 資料類型一 : 10M records — Real time report on QuickSight
- 資料類型二: 40M records — Daily report on QuickSight
- 以上兩類型資料在來源端並沒有先分離,需要後續加工處裡
所使用的AWS服務
- Direct Connect
- DMS(Data Migration Service)
- KDS(Kinesis Data Stream)
- Lambda
- KDF(Kinesis Data Firehouse)
- S3
- Redshift
- QuickSight Enterprise
在此架構圖中,由於Kinesis、S3、Lambda與QuickSight都是全託管式服務。所以在這些服務中我們需要加入Interface endpoint(也稱PrivateLink),放在同一個VPC中,以達到資料的安全、快速與節費。終端使用者透過Internet的來查看QuickSight,因為在都是Mobile users居多。
網路服務 — Direct Connect
由於需要資料的即時性,故不使用Internet(不穩定、高延遲性)傳輸資料。使用AWS的網路專線(Direct Connect)將資料由地端機房傳送至AWS。
每日資料50M / 86400(每天秒數) = 每秒約為579筆資料
579 x 2K = 約為1.2M(每秒資料)
所需最小網路頻寬 1.2M x 8(因為網路頻寬單位是 Bit Per Second) = 約為10M。考量資料量並不會是平穩的產生所以需要一些buffer來對應流量的Burst。目前AWS專線最小為50M,故採用此規格來"專門"將資料傳送到AWS。此規格不考量將歷史資料遷移到AWS
PS:地端機房與AWS VPC網路區段不可以overlap,若IPv4需要overlap則可能需要IPv6來解決,更多的AWS IPv6設計請參閱本部落格"AWS的IPv6實踐"一文。
資料傳輸服務 — DMS(Data Migration Service)
使用此服務將資料及時或批次的傳送到AWS雲端中,DMS的原生CDC( change data capture )功能可將地端機房的任何資料異動及時的傳送。
所謂CDC 是指識別和攫取對資料庫中的資料異動,然後將這些資料異動即時傳送到下游系統的過程。 攫取來源資料庫中transactions的每個變更並將其及時移動到目標可以保持系統同步,這助於即時分析和零停機資料庫遷移的需求。關於將地端MySQL作為DMS的事前先決條件請參閱AWS文件庫"Using any MySQL-compatible database as a source for AWS DMS"
由於不確定來源端的MySQL cluster的負載狀況為何,建議在該DB Cluster新增一個real-only節點來讓DMS攫取異動資料。使用DMS的Source filter來篩選將資料
以下為一個在AWS DMS中將Kinesis streams設定為data target的範本,首先,建立一個具有最小存取權限的 IAM 角色:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords",
"kinesis:DescribeStream"
],
"Resource": <streamArn>
}
]
}
Trust Relationships
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "dms.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
完成之後,接下來在DMS中設定 source endpoint、 target endpoint與replication instance。關於在DMS將KDS作為data target的詳細設定,請參閱AWS的文件庫"Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service"。
資料串流 — KDS(Kinesis Data Stream)
在KDS的Data Stream Capacity Mode我們將選擇"On-demand",因為它提供最小的latency以達到資料即時的功能。如何建立KDS stream請參閱AWS文件庫"Creating a Stream via the AWS Management Console"。
資料篩選 —使用 Lambda函數將資料寫入QuickSight SPEIC
以下為一個使用Lambda函數範例從Kinesis streams資料,並將篩選好的資料寫入到QuickSight SPEIC API Endpoints。步驟為:
- 在 Lambda 中準備資料:
編寫 Lambda 函數來準備資料以匯入 QuickSight。 這可能涉及從來源檢索資料、執行任何必要的資料轉換或聚合,以及以 QuickSight 可以使用的方式格式化資料。 - 呼叫 QuickSight API:
使用 QuickSight API 以程式設計方式將準備好的資料上傳到 SPICE。 需要使用 CreateIngestion API endpoint來啟動ingestion process。 - 監控Ingestions狀態:
使用 QuickSight API 監控ingestion過程的狀態。 可以使用DescribeIngestion API endpoint來檢查提取的狀態並確保其成功完成。
import boto3
import json
def lambda_handler(event, context):
# Prepare your data for ingestion
prepared_data = prepare_data()
# Call QuickSight API to start ingestion
ingestion_id = start_ingestion(prepared_data)
# Monitor the ingestion status
monitor_ingestion(ingestion_id)
def prepare_data():
# Your data preparation logic here
# This could involve querying a database, performing transformations, etc.
return prepared_data
def start_ingestion(data):
client = boto3.client('quicksight')
# Define parameters for ingestion
dataset_id = 'your-dataset-id'
data_source_arn = 'arn:aws:quicksight:us-east-1:123456789012:datasource/dataset_id'
ingestion_type = 'FULL_REFRESH' # or 'INCREMENTAL_REFRESH'
ingestion_role_arn = 'arn:aws:iam::123456789012:role/QuickSightIngestionRole'
# Start ingestion
response = client.create_ingestion(
DataSetId=dataset_id,
DataSourceId=data_source_arn,
IngestionId='string', # Generate a unique ID for the ingestion
IngestionType=ingestion_type,
AwsAccountId='123456789012',
RoleArn=ingestion_role_arn,
IngestionDefinition={
'S3Source': {
'DataSourceArn': data_source_arn,
'IngestionS3SourceArn': 'arn:aws:s3:::your-bucket/your-data.csv'
}
}
)
# Return the ingestion ID
return response['IngestionId']
def monitor_ingestion(ingestion_id):
client = boto3.client('quicksight')
# Monitor ingestion status
while True:
response = client.describe_ingestion(
DataSetId='your-dataset-id',
IngestionId=ingestion_id,
AwsAccountId='123456789012'
)
status = response['Ingestion']['IngestionStatus']
if status == 'COMPLETED':
print('Ingestion completed successfully.')
break
elif status == 'FAILED':
print('Ingestion failed.')
break
else:
print('Ingestion in progress...')
中繼資料 — AWS KDF
使用KDF將串流資料從KDS擷取到Redshift(Data Warehouse)服務,做為日報與月報的資料儲存地。關於如何將資料傳送到Redshift請參閱"Choose Amazon Redshift for Your Destination"文件庫以及如何將資料從KDS寫到KDF請參閱”Writing to Amazon Data Firehose Using Kinesis Data Streams”文件庫。
KDF會使用S3作為資料的中繼, 首先將資料傳輸到 S3 bucket,然後發出 Redshift COPY command將資料載入到 Amazon Redshift 中。 將資料載入到 Redshift Cluster後,KDF不會將資料從 S3 bucket中刪除。 需要使用lifecycle configuration來管理 S3 buckets中的資料。
由於可能需要做一些資料篩選與轉換,使用KDF中的"Data Transformation"。KDF的Data Transofmation是呼叫Lambda函數進行資料轉換。以下為一個KDF使用Lambda的資料轉換範例代碼。
import base64
import json
def lambda_handler(event, context):
output_records = []
for record in event['records']:
# Decode the base64 encoded data
payload = base64.b64decode(record['data']).decode('utf-8')
# Parse the JSON data
data = json.loads(payload)
# Perform data transformation
transformed_data = transform_data(data)
# Encode the transformed data
output_payload = json.dumps(transformed_data).encode('utf-8')
output_records.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(output_payload).decode('utf-8')
})
return {'records': output_records}
def transform_data(data):
# Sample transformation logic
transformed_data = {}
transformed_data['transformed_attribute'] = data['original_attribute'] * 2
# Add more transformation logic as needed
return transformed_data
- lambda_handler 函數是 Lambda 函數的entry point。 它接收包含來自 KDF 所記錄的event。
- 它迭代每個incoming record,解碼 Base64 編碼資料,並將其解析為 JSON。
- Transform_data 函數包含根據要求轉換傳入資料的邏輯。 在此範例中,它只是將屬性值加倍,可以修改它以滿足您的轉換需求。
- 轉換後的資料被編碼回 Base64,並與record ID 和指示成功的結果(“Ok”)一起加入到output records list中。
- 該函數傳回一個包含轉換後的record的dictionary,然後 Firehose 將其傳送到配置的目的地。
記住將transform_data 函數替換為實際轉換邏輯。 此外,確保 Lambda 函數具有讀取和寫入 Kinesis Data Firehose 所需的權限。
日報或月報資料來源 — Redshift
由於無法確定終端使用者是是需要看前一天的日報或是當天下班前的日報,這裡假設終端使用者需要看的是當天下班前的日報,所以使用Redshift來加速日報的處理。
由於尚不清楚需要報表格式與需要查詢與分析何種類型的資料,為了一開始能達到較好的分析效能,故在Redshift上的sort key(就是資料實體分佈的方式)設定為”AUTO”。詳細設定方式請參閱AWS文件庫"Working with automatic table optimization"。而在Redshift資料查詢的最佳實踐請參閱"Amazon Redshift best practices for designing queries"。
由於需要日報功能,我們需要在Redshift定期產生報表出來。可以使用"Scheduling a query with query editor v2"定期產生日報。若要從Redshift歸檔歷史資料,則可以使用"Unload to S3 command"將資料歸檔。
資料視覺化 — QuickSight Enterprise
選擇Quicksight Enterprise的原因為:
- 只有Enterprise版本才能與地端Windows AD做終端使用者的驗整整合
- 需要將即時資料匯入到QuickSight SPICE(Super-fast, Parallel, In-memory Calculation Engine)中,並且資料才有靜態加密
- Enterprise版本才有"incremental refresh queries”功能
因為使用SPICE,我們直接對即時資料進行查詢或加總計算,由於是採用API的方式將資料匯入SPICE,所以資料的更新是基於Event Driven with Full refresh(因為資料都在memory中),並且我們是使用API calls with CreateIngestion,所以每次的資料寫入都會自動更新其資料。
對即時儀錶板與日報表,我們建立兩種不同的Dashboard View。關於如何建立Dashboard,請參閱AWS文件庫"Visualizing data in Amazon QuickSight"。通常即時儀錶板通常是檢視目標有沒有到位,可以使用KPI visual type來呈現,而日報或一段時間的資料呈現通常尋找趨勢型態,可以使用bar chart。
費用估算
- 專線網路 — 由於AWS不接受1G以下的專線,而是委託給Local ISP。在此案例中,我們預先選擇1G頻寬。因為Local ISP 50M的價格一比這個低
- DMS Instance — 根據資料概況與需求,資料需要持續不斷複製到AWS中。所以我們選擇R4系列的Instance,以達到資料即時性的需求。
- Lambda A — 從KDS流出的request約每秒有600個request。
- Lambda B — 處理KDF的資料轉換