Ingestion layer
The ingestion layer mainly consists of two services: A Kinesis Data Stream, which is the consumer of the real-time logs feature of CloudFront, and a Kinesis Data Firehose Delivery Stream, which will back up the raw data in S3, and also store the data as partitioned parquet files in another S3 bucket. Both S3 buckets are part of the storage layer.
The Kinesis Data Stream (one shard in provisioned mode) provides an ingest capacity of 1 MB/second or 1,000 records/second, for a price of $0.015/hour in us-east-1, and $0.014 per 1M PUT payload units. It forwards the incoming data to the Kinesis Data Firehose Delivery Stream, whose pricing is more complex. The ingestion costs $0.029/GB, the format conversion $0.018/GB, and the dynamic partitioning $0.02/GB. That sums up to $0.067/GB ingested and written to S3, plus the S3 costs of $0.005/1k PUT object calls.
The Kinesis Data Firehose Delivery Stream uses data transformation and dynamic partitioning with a Lambda function, which cleans, transforms and enriches the data so that it can be stored in S3 as parquet files with appropriate Hive partitions.
The Delivery Stream has so-called BufferingHints, which either define from which size (from 1 to 128MB) or in which interval (between 60 to 900 seconds) the data is flushed to S3. The interval defines the minimum latency at which the data gets persisted in the data lake. The Lambda function is part of the processing layer and is discussed below.
The CloudFormation resource definition for the Kinesis Data Firehose Delivery Stream can be found below. It sources its variables from the serverless.yml:
AnalyticsKinesisFirehose: Type: 'AWS::KinesisFirehose::DeliveryStream' Properties: DeliveryStreamName: ${self:custom.kinesis.delivery.name} DeliveryStreamType: KinesisStreamAsSource # Source configuration KinesisStreamSourceConfiguration: KinesisStreamARN: !GetAtt 'AnalyticsKinesisStream.Arn' RoleARN: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn' # Necessary configuration to transfrom and write data to S3 as parquet files ExtendedS3DestinationConfiguration: BucketARN: !GetAtt 'CleanedBucket.Arn' BufferingHints: IntervalInSeconds: ${self:custom.kinesis.delivery.limits.intervalInSeconds} SizeInMBs: ${self:custom.kinesis.delivery.limits.sizeInMB} # This enables logging to CloudWatch for better debugging possibilities CloudWatchLoggingOptions: Enabled: True LogGroupName: ${self:custom.logs.groupName} LogStreamName: ${self:custom.logs.streamName} DataFormatConversionConfiguration: Enabled: True # Define the input format InputFormatConfiguration: Deserializer: OpenXJsonSerDe: CaseInsensitive: True # Define the output format OutputFormatConfiguration: Serializer: ParquetSerDe: Compression: SNAPPY WriterVersion: V1 # The schema configuration based on Glue tables SchemaConfiguration: RoleArn: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn' DatabaseName: '${self:custom.glue.database}' TableName: 'incoming_events' # Enable dynamic partitioning DynamicPartitioningConfiguration: Enabled: True # Enable Lambda function for pre-processing the Kinesis records ProcessingConfiguration: Enabled: True Processors: - Type: Lambda Parameters: - ParameterName: NumberOfRetries ParameterValue: 3 - ParameterName: BufferIntervalInSeconds ParameterValue: 60 - ParameterName: BufferSizeInMBs ParameterValue: 3 - ParameterName: LambdaArn ParameterValue: !GetAtt 'ProcessKinesisRecordsLambdaFunction.Arn' # Enable backups for the raw incoming data S3BackupMode: Enabled S3BackupConfiguration: BucketARN: !GetAtt 'RawBucket.Arn' BufferingHints: IntervalInSeconds: ${self:custom.kinesis.delivery.limits.intervalInSeconds} SizeInMBs: ${self:custom.kinesis.delivery.limits.sizeInMB} # Disable logging to CloudWatch for raw data CloudWatchLoggingOptions: Enabled: false CompressionFormat: GZIP Prefix: '${self:custom.prefixes.raw}' ErrorOutputPrefix: '${self:custom.prefixes.error}' RoleARN: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn' RoleARN: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn' # Define output S3 prefixes Prefix: '${self:custom.prefixes.incoming}/domain_name=!{partitionKeyFromLambda:domain_name}/event_type=!{partitionKeyFromLambda:event_type}/event_date=!{partitionKeyFromLambda:event_date}/' ErrorOutputPrefix: '${self:custom.prefixes.error}'