반응형
# 샘플 데이터를 이용해서 Kinesis Data Streams -> Kinesis Data Firehose -> S3 로 데이터가 정상적으로 수집되는지 확인합니다.
- gen_kinesis_data.py 파일의 소스코드 내용 (아래)
import sys
import csv
import json
import argparse
from collections import OrderedDict
import base64
import traceback
import random
import time
import datetime
import boto3
random.seed(47)
SCHEMA_CONV_TOOL = {
"Invoice": str,
"StockCode": str,
"Description": str,
"Quantity": int,
"InvoiceDate": str,
"Price": float,
"Customer_ID": str,
"Country": str
}
DELIMETER_BY_FORMAT = {
'csv': ',',
'tsv': '\t'
}
def gen_records(options, reader):
def _adjust_date(dt):
n = len('yyyy-mm-dd_HH:')
today = datetime.datetime.today()
return '{}:{}'.format(today.strftime('%Y-%m-%d %H'), dt[n:])
record_list = []
for row in reader:
is_skip = (random.randint(1, 47) % 19 < 5) if options.random_select else False
if is_skip:
continue
if int(row['Quantity']) <= 0:
continue
row['InvoiceDate'] = _adjust_date(row['InvoiceDate'])
if options.out_format in DELIMETER_BY_FORMAT:
delimeter = DELIMETER_BY_FORMAT[options.out_format]
data = delimeter.join([e for e in row.values()])
else:
try:
data = json.dumps(OrderedDict([(k, SCHEMA_CONV_TOOL[k](v)) for k, v in row.items()]), ensure_ascii=False)
except Exception as ex:
traceback.print_exc()
continue
if options.max_count == len(record_list):
yield record_list
record_list = []
# XXX: When records aren't separated by a newline character (\n), SELECT COUNT(*) FROM TABLE returns "1." in Athena
# XXX: Therefore, add a newline character (\n)
# XXX: https://aws.amazon.com/premiumsupport/knowledge-center/select-count-query-athena-json-records/
data = '{}\n'.format(data)
record_list.append(data)
if record_list:
yield record_list
def put_records_to_firehose(client, options, records):
MAX_RETRY_COUNT = 3
for data in records:
if options.dry_run:
print(data)
continue
for _ in range(MAX_RETRY_COUNT):
try:
response = client.put_record(
DeliveryStreamName=options.stream_name,
Record={
'Data': '{}\n'.format(data)
}
)
break
except Exception as ex:
traceback.print_exc()
time.sleep(random.randint(1, 10))
else:
raise RuntimeError('[ERROR] Failed to put_records into stream: {}'.format(options.stream_name))
def put_records_to_kinesis(client, options, records):
MAX_RETRY_COUNT = 3
payload_list = []
for data in records:
partition_key = 'part-{:05}'.format(random.randint(1, 1024))
payload_list.append({'Data': data, 'PartitionKey': partition_key})
if options.dry_run:
print(json.dumps(payload_list, ensure_ascii=False))
return
for _ in range(MAX_RETRY_COUNT):
try:
response = client.put_records(Records=payload_list, StreamName=options.stream_name)
break
except Exception as ex:
traceback.print_exc()
time.sleep(random.randint(1, 10))
else:
raise RuntimeError('[ERROR] Failed to put_records into stream: {}'.format(options.stream_name))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--region-name', action='store', default='us-east-1',
help='aws region name (default: us-east-1)')
parser.add_argument('-I', '--input-file', required=True, help='The input file path ex) ./resources/online_retail.csv')
parser.add_argument('--out-format', default='json', choices=['csv', 'tsv', 'json'])
parser.add_argument('--service-name', required=True, choices=['kinesis', 'firehose', 'console'])
parser.add_argument('--stream-name', help='The name of the stream to put the data record into.')
parser.add_argument('--max-count', default=10, type=int, help='The max number of records to put.')
parser.add_argument('--random-select', action='store_true')
parser.add_argument('--dry-run', action='store_true')
options = parser.parse_args()
COUNT_STEP = 10 if options.dry_run else 100
with open(options.input_file, newline='') as csvfile:
reader = csv.DictReader(csvfile)
client = boto3.client(options.service_name,
region_name=options.region_name) if options.service_name != 'console' else None
counter = 0
for records in gen_records(options, reader):
if options.service_name == 'kinesis':
put_records_to_kinesis(client, options, records)
elif options.service_name == 'firehose':
put_records_to_firehose(client, options, records)
else:
print('\n'.join([e for e in records]))
counter += 1
if counter % COUNT_STEP == 0:
print('[INFO] {} steps are processed'.format(counter), file=sys.stderr)
if options.dry_run:
break
time.sleep(random.choices([0.01, 0.03, 0.05, 0.07, 0.1])[-1])
if __name__ == '__main__':
main()
1. 앞서 생성한 EC2 인스턴스에 SSH로 접속하여 gen_kinesis_data.py을 실행합니다.
python3 gen_kinesis_data.py --help 을 실행하여 옵션을 확인 할 수 있다.
python3 gen_kinesis_data.py -I resources/online_retail.csv \
--region-name ap-northeast-2 \
--service-name kinesis \
--out-format json \
--stream-name retail-trans
2. 위의 명령어를 실행하면 아래와 같이 지속적으로 데이터가 쌓이는 것을 확인 할 수있다. 아마 S3 버켓에 가면 데이터가 지속적으로 INSERT 되고 있을 것 같다.
3. 자 그럼 이제 S3 버켓에 가서 데이터가 생성된 모습을 확인하자. → 해당 S3 버켓으로 이동 후 아래 캡쳐 사진과 같이 데이터가 쌓인 모습을 확인 할 수 있다.
- 자 이제 데이터 파이프라인의 검증이 끝났으므로, 다음시간에는 아마존 아테나를 이용하여 데이터를 분석하는 시간을 갖도록 하겠다.
반응형
'⭐ AWS > Kinesis Data Stream' 카테고리의 다른 글
Kinesis Data Streams - QuickSight (0) | 2021.06.14 |
---|---|
Kinesis Data Streams - Athena를 통한 데이터 분석 (0) | 2021.06.13 |
Kinesis Data Firehoses - 생성 (0) | 2021.06.13 |
Kinesis Data Streams - 생성 (0) | 2021.06.13 |
Kinesis Data Streams - SSH 접속 및 EC2 설정 (0) | 2021.06.13 |